mirror of
https://git.linux-kernel.at/oliver/ivatar.git
synced 2025-11-12 19:26:23 +00:00
- Always initialize OpenTelemetry in Django settings (instrumentation always enabled) - Remove ENABLE_OPENTELEMETRY feature flag from config.py - Simplify views.py OpenTelemetry imports to always use real implementation - Export control now handled by OTEL_EXPORT_ENABLED environment variable only This ensures OpenTelemetry is properly initialized during Django startup and the Prometheus metrics server starts correctly.
525 lines
18 KiB
Python
Executable File
525 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Libravatar Deployment Verification Script
|
||
|
||
This script verifies that Libravatar deployments are working correctly by:
|
||
- Checking version endpoint
|
||
- Testing avatar functionality with various sizes
|
||
- Verifying stats endpoint
|
||
- Testing redirect behavior
|
||
|
||
Usage:
|
||
python3 check_deployment.py --dev # Test dev deployment
|
||
python3 check_deployment.py --prod # Test production deployment
|
||
python3 check_deployment.py --endpoint <url> # Test custom endpoint
|
||
python3 check_deployment.py --dev --prod # Test both deployments
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import random
|
||
import ssl
|
||
import subprocess
|
||
import sys
|
||
import tempfile
|
||
import time
|
||
from typing import Dict, Optional, Tuple
|
||
from urllib.parse import urljoin
|
||
from urllib.request import urlopen, Request
|
||
from urllib.error import HTTPError, URLError
|
||
|
||
try:
|
||
from PIL import Image
|
||
|
||
PIL_AVAILABLE = True
|
||
except ImportError:
|
||
PIL_AVAILABLE = False
|
||
|
||
# Configuration
|
||
DEV_URL = "https://dev.libravatar.org"
|
||
PROD_URL = "https://libravatar.org"
|
||
MAX_RETRIES = 5
|
||
RETRY_DELAY = 10
|
||
|
||
# ANSI color codes
|
||
|
||
|
||
class Colors:
|
||
RED = "\033[0;31m"
|
||
GREEN = "\033[0;32m"
|
||
YELLOW = "\033[1;33m"
|
||
BLUE = "\033[0;34m"
|
||
NC = "\033[0m" # No Color
|
||
|
||
|
||
def colored_print(message: str, color: str = Colors.NC) -> None:
|
||
"""Print a colored message."""
|
||
print(f"{color}{message}{Colors.NC}")
|
||
|
||
|
||
def get_current_commit_hash() -> Optional[str]:
|
||
"""Get the current commit hash from git."""
|
||
try:
|
||
result = subprocess.run(
|
||
["git", "rev-parse", "HEAD"],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
return result.stdout.strip()
|
||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||
return None
|
||
|
||
|
||
def is_commit_newer_or_equal(commit1: str, commit2: str) -> Optional[bool]:
|
||
"""
|
||
Check if commit1 is newer than or equal to commit2 in git history.
|
||
|
||
Returns:
|
||
True if commit1 is newer or equal to commit2
|
||
False if commit1 is older than commit2
|
||
None if comparison fails
|
||
"""
|
||
try:
|
||
# Use git merge-base to check if commit1 is reachable from commit2
|
||
# If commit1 is newer or equal, it should be reachable from commit2
|
||
subprocess.run(
|
||
["git", "merge-base", "--is-ancestor", commit2, commit1],
|
||
capture_output=True,
|
||
check=True,
|
||
)
|
||
return True
|
||
except subprocess.CalledProcessError:
|
||
# If the above fails, try the reverse - check if commit2 is newer
|
||
try:
|
||
result = subprocess.run(
|
||
["git", "merge-base", "--is-ancestor", commit1, commit2],
|
||
capture_output=True,
|
||
check=True,
|
||
)
|
||
return False
|
||
except subprocess.CalledProcessError:
|
||
# If both fail, we can't determine the relationship
|
||
return None
|
||
|
||
|
||
def make_request(
|
||
url: str,
|
||
method: str = "GET",
|
||
headers: Optional[Dict[str, str]] = None,
|
||
follow_redirects: bool = True,
|
||
binary: bool = False,
|
||
) -> Tuple[bool, Optional[bytes], Optional[Dict[str, str]]]:
|
||
"""
|
||
Make an HTTP request and return success status, content, and headers.
|
||
|
||
Args:
|
||
url: URL to request
|
||
method: HTTP method
|
||
headers: Additional headers
|
||
follow_redirects: Whether to follow redirects automatically
|
||
|
||
Returns:
|
||
Tuple of (success, content, headers)
|
||
"""
|
||
req = Request(url, headers=headers or {})
|
||
req.get_method = lambda: method
|
||
|
||
# Create SSL context that handles certificate verification issues
|
||
ssl_context = ssl.create_default_context()
|
||
|
||
# Try with SSL verification first
|
||
try:
|
||
opener = urlopen
|
||
if not follow_redirects:
|
||
# Create a custom opener that doesn't follow redirects
|
||
import urllib.request
|
||
|
||
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
|
||
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||
return None
|
||
|
||
opener = urllib.request.build_opener(NoRedirectHandler)
|
||
|
||
if follow_redirects:
|
||
with opener(req, context=ssl_context) as response:
|
||
content = response.read()
|
||
if not binary:
|
||
try:
|
||
content = content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
# If content is not text (e.g., binary image), return empty string
|
||
content = ""
|
||
headers = dict(response.headers)
|
||
return True, content, headers
|
||
else:
|
||
response = opener.open(req)
|
||
content = response.read()
|
||
if not binary:
|
||
try:
|
||
content = content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
content = ""
|
||
headers = dict(response.headers)
|
||
return True, content, headers
|
||
except URLError as url_err:
|
||
# Check if this is an SSL error wrapped in URLError
|
||
if isinstance(url_err.reason, ssl.SSLError):
|
||
# If SSL fails, try with unverified context (less secure but works for testing)
|
||
ssl_context_unverified = ssl.create_default_context()
|
||
ssl_context_unverified.check_hostname = False
|
||
ssl_context_unverified.verify_mode = ssl.CERT_NONE
|
||
|
||
try:
|
||
if follow_redirects:
|
||
with urlopen(req, context=ssl_context_unverified) as response:
|
||
content = response.read()
|
||
if not binary:
|
||
try:
|
||
content = content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
content = ""
|
||
headers = dict(response.headers)
|
||
return True, content, headers
|
||
else:
|
||
import urllib.request
|
||
|
||
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
|
||
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||
return None
|
||
|
||
opener = urllib.request.build_opener(NoRedirectHandler)
|
||
response = opener.open(req)
|
||
content = response.read()
|
||
if not binary:
|
||
try:
|
||
content = content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
content = ""
|
||
headers = dict(response.headers)
|
||
return True, content, headers
|
||
except Exception:
|
||
return False, None, None
|
||
else:
|
||
return False, None, None
|
||
except HTTPError:
|
||
return False, None, None
|
||
|
||
|
||
def check_version_endpoint(base_url: str) -> Tuple[bool, Optional[Dict]]:
|
||
"""Check the version endpoint and return deployment info."""
|
||
version_url = urljoin(base_url, "/deployment/version/")
|
||
success, content, _ = make_request(version_url)
|
||
|
||
if not success or not content:
|
||
return False, None
|
||
|
||
try:
|
||
version_info = json.loads(content)
|
||
return True, version_info
|
||
except json.JSONDecodeError:
|
||
return False, None
|
||
|
||
|
||
def test_avatar_redirect(base_url: str) -> bool:
|
||
"""Test that invalid avatar requests redirect to default image."""
|
||
avatar_url = urljoin(base_url, "/avatar/test@example.com")
|
||
|
||
# Use a simple approach: check if the final URL after redirect contains deadbeef.png
|
||
try:
|
||
req = Request(avatar_url)
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
with urlopen(req, context=ssl_context) as response:
|
||
final_url = response.geturl()
|
||
return "deadbeef.png" in final_url
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def test_avatar_sizing(base_url: str) -> bool:
|
||
"""Test avatar endpoint with random sizes."""
|
||
# Use a known test hash for consistent testing
|
||
test_hash = "63a75a80e6b1f4adfdb04c1ca02e596c"
|
||
|
||
# Generate random sizes between 50-250
|
||
sizes = [random.randint(50, 250) for _ in range(2)]
|
||
|
||
for size in sizes:
|
||
avatar_url = urljoin(base_url, f"/avatar/{test_hash}?s={size}")
|
||
|
||
# Download image to temporary file
|
||
success, content, _ = make_request(avatar_url, binary=True)
|
||
if not success or not content:
|
||
colored_print(f"❌ Avatar endpoint failed for size {size}", Colors.RED)
|
||
return False
|
||
|
||
# Check image dimensions
|
||
if PIL_AVAILABLE:
|
||
try:
|
||
with tempfile.NamedTemporaryFile(suffix=".jpg") as temp_file:
|
||
temp_file.write(content)
|
||
temp_file.flush()
|
||
|
||
with Image.open(temp_file.name) as img:
|
||
width, height = img.size
|
||
if width == size and height == size:
|
||
colored_print(
|
||
f"✅ Avatar size {size}x{size} verified", Colors.GREEN
|
||
)
|
||
else:
|
||
colored_print(
|
||
f"❌ Avatar wrong size: expected {size}x{size}, got {width}x{height}",
|
||
Colors.RED,
|
||
)
|
||
return False
|
||
except Exception as e:
|
||
colored_print(f"❌ Error checking image dimensions: {e}", Colors.RED)
|
||
return False
|
||
else:
|
||
# Fallback: just check if we got some content
|
||
if len(content) > 100: # Assume valid image if we got substantial content
|
||
colored_print(
|
||
f"✅ Avatar size {size} downloaded (PIL not available for verification)",
|
||
Colors.YELLOW,
|
||
)
|
||
else:
|
||
colored_print(
|
||
f"❌ Avatar endpoint returned insufficient content for size {size}",
|
||
Colors.RED,
|
||
)
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def test_stats_endpoint(base_url: str) -> bool:
|
||
"""Test that the stats endpoint is accessible."""
|
||
stats_url = urljoin(base_url, "/stats/")
|
||
success, _, _ = make_request(stats_url)
|
||
return success
|
||
|
||
|
||
def test_deployment(
|
||
base_url: str,
|
||
name: str,
|
||
max_retries: int = MAX_RETRIES,
|
||
retry_delay: int = RETRY_DELAY,
|
||
) -> bool:
|
||
"""
|
||
Test a deployment with retry logic.
|
||
|
||
Args:
|
||
base_url: Base URL of the deployment
|
||
name: Human-readable name for the deployment
|
||
max_retries: Maximum number of retry attempts
|
||
|
||
Returns:
|
||
True if all tests pass, False otherwise
|
||
"""
|
||
colored_print(f"Testing {name} deployment at {base_url}", Colors.YELLOW)
|
||
|
||
for attempt in range(1, max_retries + 1):
|
||
colored_print(
|
||
f"Attempt {attempt}/{max_retries}: Checking {name} deployment...",
|
||
Colors.BLUE,
|
||
)
|
||
|
||
# Check if site is responding
|
||
success, version_info = check_version_endpoint(base_url)
|
||
if success and version_info:
|
||
colored_print(
|
||
f"{name} site is responding, checking version...", Colors.GREEN
|
||
)
|
||
|
||
# Display version information
|
||
deployed_commit = version_info.get("commit_hash", "Unknown")
|
||
branch = version_info.get("branch", "Unknown")
|
||
version = version_info.get("version", "Unknown")
|
||
|
||
colored_print(f"Deployed commit: {deployed_commit}", Colors.BLUE)
|
||
colored_print(f"Deployed branch: {branch}", Colors.BLUE)
|
||
colored_print(f"Deployed version: {version}", Colors.BLUE)
|
||
|
||
# Check if we're looking for a specific version and compare
|
||
current_commit = get_current_commit_hash()
|
||
if current_commit and deployed_commit != "Unknown":
|
||
if deployed_commit == current_commit:
|
||
colored_print(
|
||
"✅ Exact version match - deployment is up to date!",
|
||
Colors.GREEN,
|
||
)
|
||
else:
|
||
# Check if deployed version is newer
|
||
comparison = is_commit_newer_or_equal(
|
||
deployed_commit, current_commit
|
||
)
|
||
if comparison is True:
|
||
colored_print(
|
||
"ℹ️ Note: A newer version is already deployed (this is fine!)",
|
||
Colors.YELLOW,
|
||
)
|
||
elif comparison is False:
|
||
colored_print(
|
||
"⚠️ Warning: Deployed version appears to be older than expected",
|
||
Colors.YELLOW,
|
||
)
|
||
else:
|
||
colored_print(
|
||
"⚠️ Warning: Could not determine version relationship",
|
||
Colors.YELLOW,
|
||
)
|
||
|
||
# Run functionality tests
|
||
colored_print("Running basic functionality tests...", Colors.YELLOW)
|
||
|
||
# Test avatar redirect
|
||
if test_avatar_redirect(base_url):
|
||
colored_print("✅ Invalid avatar redirects correctly", Colors.GREEN)
|
||
else:
|
||
colored_print("❌ Invalid avatar redirect failed", Colors.RED)
|
||
return False
|
||
|
||
# Test avatar sizing
|
||
if test_avatar_sizing(base_url):
|
||
pass # Success messages are printed within the function
|
||
else:
|
||
return False
|
||
|
||
# Test stats endpoint
|
||
if test_stats_endpoint(base_url):
|
||
colored_print("✅ Stats endpoint working", Colors.GREEN)
|
||
else:
|
||
colored_print("❌ Stats endpoint failed", Colors.RED)
|
||
return False
|
||
|
||
colored_print(
|
||
f"🎉 {name} deployment verification completed successfully!",
|
||
Colors.GREEN,
|
||
)
|
||
return True
|
||
else:
|
||
colored_print(f"{name} site not responding yet...", Colors.YELLOW)
|
||
|
||
if attempt < max_retries:
|
||
colored_print(
|
||
f"Waiting {retry_delay} seconds before next attempt...", Colors.BLUE
|
||
)
|
||
time.sleep(retry_delay)
|
||
|
||
colored_print(
|
||
f"❌ FAILED: {name} deployment verification timed out after {max_retries} attempts",
|
||
Colors.RED,
|
||
)
|
||
return False
|
||
|
||
|
||
def main():
|
||
"""Main function with command-line argument parsing."""
|
||
parser = argparse.ArgumentParser(
|
||
description="Libravatar Deployment Verification Script",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Examples:
|
||
python3 check_deployment.py --dev # Test dev deployment
|
||
python3 check_deployment.py --prod # Test production deployment
|
||
python3 check_deployment.py --endpoint <url> # Test custom endpoint
|
||
python3 check_deployment.py --dev --prod # Test both deployments
|
||
""",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--dev",
|
||
action="store_true",
|
||
help="Test dev deployment (https://dev.libravatar.org)",
|
||
)
|
||
parser.add_argument(
|
||
"--prod",
|
||
action="store_true",
|
||
help="Test production deployment (https://libravatar.org)",
|
||
)
|
||
parser.add_argument("--endpoint", type=str, help="Test custom endpoint URL")
|
||
parser.add_argument(
|
||
"--max-retries",
|
||
type=int,
|
||
default=MAX_RETRIES,
|
||
help=f"Maximum number of retry attempts (default: {MAX_RETRIES})",
|
||
)
|
||
parser.add_argument(
|
||
"--retry-delay",
|
||
type=int,
|
||
default=RETRY_DELAY,
|
||
help=f"Delay between retry attempts in seconds (default: {RETRY_DELAY})",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Validate arguments
|
||
if not any([args.dev, args.prod, args.endpoint]):
|
||
parser.error("At least one of --dev, --prod, or --endpoint must be specified")
|
||
|
||
# Update configuration if custom values provided
|
||
max_retries = args.max_retries
|
||
retry_delay = args.retry_delay
|
||
|
||
colored_print("Libravatar Deployment Verification Script", Colors.BLUE)
|
||
colored_print("=" * 50, Colors.BLUE)
|
||
|
||
# Check dependencies
|
||
if not PIL_AVAILABLE:
|
||
colored_print(
|
||
"⚠️ Warning: PIL/Pillow not available. Image dimension verification will be limited.",
|
||
Colors.YELLOW,
|
||
)
|
||
colored_print(" Install with: pip install Pillow", Colors.YELLOW)
|
||
|
||
results = []
|
||
|
||
# Test dev deployment
|
||
if args.dev:
|
||
colored_print("", Colors.NC)
|
||
dev_result = test_deployment(DEV_URL, "Dev", max_retries, retry_delay)
|
||
results.append(("Dev", dev_result))
|
||
|
||
# Test production deployment
|
||
if args.prod:
|
||
colored_print("", Colors.NC)
|
||
prod_result = test_deployment(PROD_URL, "Production", max_retries, retry_delay)
|
||
results.append(("Production", prod_result))
|
||
|
||
# Test custom endpoint
|
||
if args.endpoint:
|
||
colored_print("", Colors.NC)
|
||
custom_result = test_deployment(
|
||
args.endpoint, "Custom", max_retries, retry_delay
|
||
)
|
||
results.append(("Custom", custom_result))
|
||
|
||
# Summary
|
||
colored_print("", Colors.NC)
|
||
colored_print("=" * 50, Colors.BLUE)
|
||
colored_print("Deployment Verification Summary:", Colors.BLUE)
|
||
colored_print("=" * 50, Colors.BLUE)
|
||
|
||
all_passed = True
|
||
for name, result in results:
|
||
if result:
|
||
colored_print(f"✅ {name} deployment: PASSED", Colors.GREEN)
|
||
else:
|
||
colored_print(f"❌ {name} deployment: FAILED", Colors.RED)
|
||
all_passed = False
|
||
|
||
if all_passed:
|
||
colored_print("🎉 All deployment verifications passed!", Colors.GREEN)
|
||
sys.exit(0)
|
||
else:
|
||
colored_print("❌ Some deployment verifications failed!", Colors.RED)
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|