mirror of
https://git.linux-kernel.at/oliver/ivatar.git
synced 2025-11-11 10:46:24 +00:00
606 lines
22 KiB
Python
Executable File
606 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Libravatar Deployment Verification Script
|
||
|
||
This script verifies that Libravatar deployments are working correctly by:
|
||
- Checking version endpoint
|
||
- Testing avatar functionality with various sizes
|
||
- Verifying stats endpoint
|
||
- Testing redirect behavior
|
||
|
||
Usage:
|
||
python3 check_deployment.py --dev # Test dev deployment
|
||
python3 check_deployment.py --prod # Test production deployment
|
||
python3 check_deployment.py --endpoint <url> # Test custom endpoint
|
||
python3 check_deployment.py --dev --prod # Test both deployments
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import random
|
||
import ssl
|
||
import subprocess
|
||
import sys
|
||
import tempfile
|
||
import time
|
||
from typing import Dict, Optional, Tuple
|
||
from urllib.parse import urljoin
|
||
from urllib.request import urlopen, Request
|
||
from urllib.error import HTTPError, URLError
|
||
|
||
try:
|
||
from PIL import Image
|
||
|
||
PIL_AVAILABLE = True
|
||
except ImportError:
|
||
PIL_AVAILABLE = False
|
||
|
||
# Configuration
|
||
DEV_URL = "https://dev.libravatar.org"
|
||
PROD_URL = "https://libravatar.org"
|
||
MAX_RETRIES = 5
|
||
RETRY_DELAY = 10
|
||
|
||
# ANSI color codes
|
||
|
||
|
||
class Colors:
|
||
RED = "\033[0;31m"
|
||
GREEN = "\033[0;32m"
|
||
YELLOW = "\033[1;33m"
|
||
BLUE = "\033[0;34m"
|
||
NC = "\033[0m" # No Color
|
||
|
||
|
||
def colored_print(message: str, color: str = Colors.NC) -> None:
|
||
"""Print a colored message with immediate flush."""
|
||
print(f"{color}{message}{Colors.NC}", flush=True)
|
||
|
||
|
||
def get_current_commit_hash() -> Optional[str]:
|
||
"""Get the current commit hash from git or CI environment."""
|
||
# First try GitLab CI environment variable (most reliable in CI)
|
||
ci_commit = os.environ.get("CI_COMMIT_SHA")
|
||
if ci_commit:
|
||
colored_print(f"Using CI commit hash: {ci_commit}", Colors.BLUE)
|
||
return ci_commit
|
||
|
||
# Fallback to git command
|
||
try:
|
||
result = subprocess.run(
|
||
["git", "rev-parse", "HEAD"],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
commit_hash = result.stdout.strip()
|
||
colored_print(f"Using git commit hash: {commit_hash}", Colors.BLUE)
|
||
return commit_hash
|
||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||
colored_print("Could not determine current commit hash", Colors.RED)
|
||
return None
|
||
|
||
|
||
def is_commit_newer_or_equal(commit1: str, commit2: str) -> Optional[bool]:
|
||
"""
|
||
Check if commit1 is newer than or equal to commit2 in git history.
|
||
|
||
Returns:
|
||
True if commit1 is newer or equal to commit2
|
||
False if commit1 is older than commit2
|
||
None if comparison fails
|
||
"""
|
||
try:
|
||
# First try to get commit timestamps for comparison
|
||
try:
|
||
result1 = subprocess.run(
|
||
["git", "show", "-s", "--format=%ct", commit1],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
result2 = subprocess.run(
|
||
["git", "show", "-s", "--format=%ct", commit2],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
|
||
timestamp1 = int(result1.stdout.strip())
|
||
timestamp2 = int(result2.stdout.strip())
|
||
|
||
colored_print(f"Commit {commit1[:8]} timestamp: {timestamp1}", Colors.BLUE)
|
||
colored_print(f"Commit {commit2[:8]} timestamp: {timestamp2}", Colors.BLUE)
|
||
|
||
# commit1 is newer if it has a later timestamp
|
||
return timestamp1 >= timestamp2
|
||
|
||
except (subprocess.CalledProcessError, ValueError):
|
||
# Fallback to merge-base if timestamp comparison fails
|
||
colored_print("Timestamp comparison failed, trying merge-base", Colors.YELLOW)
|
||
|
||
# Use git merge-base to check if commit2 is ancestor of commit1
|
||
subprocess.run(
|
||
["git", "merge-base", "--is-ancestor", commit2, commit1],
|
||
capture_output=True,
|
||
check=True,
|
||
)
|
||
return True
|
||
|
||
except subprocess.CalledProcessError:
|
||
# If the above fails, try the reverse
|
||
try:
|
||
subprocess.run(
|
||
["git", "merge-base", "--is-ancestor", commit1, commit2],
|
||
capture_output=True,
|
||
check=True,
|
||
)
|
||
return False
|
||
except subprocess.CalledProcessError:
|
||
colored_print("Git comparison failed - shallow clone or missing commits", Colors.YELLOW)
|
||
return None
|
||
except Exception as e:
|
||
colored_print(f"Git comparison error: {e}", Colors.RED)
|
||
return None
|
||
|
||
|
||
def make_request(
|
||
url: str,
|
||
method: str = "GET",
|
||
headers: Optional[Dict[str, str]] = None,
|
||
follow_redirects: bool = True,
|
||
binary: bool = False,
|
||
) -> Tuple[bool, Optional[bytes], Optional[Dict[str, str]]]:
|
||
"""
|
||
Make an HTTP request and return success status, content, and headers.
|
||
|
||
Args:
|
||
url: URL to request
|
||
method: HTTP method
|
||
headers: Additional headers
|
||
follow_redirects: Whether to follow redirects automatically
|
||
|
||
Returns:
|
||
Tuple of (success, content, headers)
|
||
"""
|
||
req = Request(url, headers=headers or {})
|
||
req.get_method = lambda: method
|
||
|
||
# Create SSL context that handles certificate verification issues
|
||
ssl_context = ssl.create_default_context()
|
||
|
||
# Try with SSL verification first
|
||
try:
|
||
opener = urlopen
|
||
if not follow_redirects:
|
||
# Create a custom opener that doesn't follow redirects
|
||
import urllib.request
|
||
|
||
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
|
||
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||
return None
|
||
|
||
opener = urllib.request.build_opener(NoRedirectHandler)
|
||
|
||
if follow_redirects:
|
||
with opener(req, context=ssl_context) as response:
|
||
content = response.read()
|
||
if not binary:
|
||
try:
|
||
content = content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
# If content is not text (e.g., binary image), return empty string
|
||
content = ""
|
||
headers = dict(response.headers)
|
||
return True, content, headers
|
||
else:
|
||
response = opener.open(req)
|
||
content = response.read()
|
||
if not binary:
|
||
try:
|
||
content = content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
content = ""
|
||
headers = dict(response.headers)
|
||
return True, content, headers
|
||
except URLError as url_err:
|
||
# Check if this is an SSL error wrapped in URLError
|
||
if isinstance(url_err.reason, ssl.SSLError):
|
||
# If SSL fails, try with unverified context (less secure but works for testing)
|
||
ssl_context_unverified = ssl.create_default_context()
|
||
ssl_context_unverified.check_hostname = False
|
||
ssl_context_unverified.verify_mode = ssl.CERT_NONE
|
||
|
||
try:
|
||
if follow_redirects:
|
||
with urlopen(req, context=ssl_context_unverified) as response:
|
||
content = response.read()
|
||
if not binary:
|
||
try:
|
||
content = content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
content = ""
|
||
headers = dict(response.headers)
|
||
return True, content, headers
|
||
else:
|
||
import urllib.request
|
||
|
||
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
|
||
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||
return None
|
||
|
||
opener = urllib.request.build_opener(NoRedirectHandler)
|
||
response = opener.open(req)
|
||
content = response.read()
|
||
if not binary:
|
||
try:
|
||
content = content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
content = ""
|
||
headers = dict(response.headers)
|
||
return True, content, headers
|
||
except Exception:
|
||
return False, None, None
|
||
else:
|
||
return False, None, None
|
||
except HTTPError:
|
||
return False, None, None
|
||
|
||
|
||
def check_version_endpoint(base_url: str) -> Tuple[bool, Optional[Dict]]:
|
||
"""Check the version endpoint and return deployment info."""
|
||
version_url = urljoin(base_url, "/deployment/version/")
|
||
success, content, _ = make_request(version_url)
|
||
|
||
if not success or not content:
|
||
return False, None
|
||
|
||
try:
|
||
version_info = json.loads(content)
|
||
return True, version_info
|
||
except json.JSONDecodeError:
|
||
return False, None
|
||
|
||
|
||
def test_avatar_redirect(base_url: str) -> bool:
|
||
"""Test that invalid avatar requests redirect to default image."""
|
||
avatar_url = urljoin(base_url, "/avatar/test@example.com")
|
||
|
||
# Use a simple approach: check if the final URL after redirect contains deadbeef.png
|
||
try:
|
||
req = Request(avatar_url)
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
with urlopen(req, context=ssl_context) as response:
|
||
final_url = response.geturl()
|
||
return "deadbeef.png" in final_url
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def test_avatar_sizing(base_url: str) -> bool:
|
||
"""Test avatar endpoint with random sizes."""
|
||
# Use a known test hash for consistent testing
|
||
test_hash = "63a75a80e6b1f4adfdb04c1ca02e596c"
|
||
|
||
# Generate random sizes between 50-250
|
||
sizes = [random.randint(50, 250) for _ in range(2)]
|
||
|
||
for size in sizes:
|
||
avatar_url = urljoin(base_url, f"/avatar/{test_hash}?s={size}")
|
||
|
||
# Download image to temporary file
|
||
success, content, _ = make_request(avatar_url, binary=True)
|
||
if not success or not content:
|
||
colored_print(f"❌ Avatar endpoint failed for size {size}", Colors.RED)
|
||
return False
|
||
|
||
# Check image dimensions
|
||
if PIL_AVAILABLE:
|
||
try:
|
||
with tempfile.NamedTemporaryFile(suffix=".jpg") as temp_file:
|
||
temp_file.write(content)
|
||
temp_file.flush()
|
||
|
||
with Image.open(temp_file.name) as img:
|
||
width, height = img.size
|
||
if width == size and height == size:
|
||
colored_print(
|
||
f"✅ Avatar size {size}x{size} verified", Colors.GREEN
|
||
)
|
||
else:
|
||
colored_print(
|
||
f"❌ Avatar wrong size: expected {size}x{size}, got {width}x{height}",
|
||
Colors.RED,
|
||
)
|
||
return False
|
||
except Exception as e:
|
||
colored_print(f"❌ Error checking image dimensions: {e}", Colors.RED)
|
||
return False
|
||
else:
|
||
# Fallback: just check if we got some content
|
||
if len(content) > 100: # Assume valid image if we got substantial content
|
||
colored_print(
|
||
f"✅ Avatar size {size} downloaded (PIL not available for verification)",
|
||
Colors.YELLOW,
|
||
)
|
||
else:
|
||
colored_print(
|
||
f"❌ Avatar endpoint returned insufficient content for size {size}",
|
||
Colors.RED,
|
||
)
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def test_stats_endpoint(base_url: str) -> bool:
|
||
"""Test that the stats endpoint is accessible."""
|
||
stats_url = urljoin(base_url, "/stats/")
|
||
success, _, _ = make_request(stats_url)
|
||
return success
|
||
|
||
|
||
def test_deployment(
|
||
base_url: str,
|
||
name: str,
|
||
max_retries: int = MAX_RETRIES,
|
||
retry_delay: int = RETRY_DELAY,
|
||
) -> bool:
|
||
"""
|
||
Test a deployment with retry logic.
|
||
|
||
Args:
|
||
base_url: Base URL of the deployment
|
||
name: Human-readable name for the deployment
|
||
max_retries: Maximum number of retry attempts
|
||
|
||
Returns:
|
||
True if all tests pass, False otherwise
|
||
"""
|
||
colored_print(f"Testing {name} deployment at {base_url}", Colors.YELLOW)
|
||
|
||
for attempt in range(1, max_retries + 1):
|
||
colored_print(
|
||
f"Attempt {attempt}/{max_retries}: Checking {name} deployment...",
|
||
Colors.BLUE,
|
||
)
|
||
|
||
# Check if site is responding
|
||
success, version_info = check_version_endpoint(base_url)
|
||
if success and version_info:
|
||
colored_print(
|
||
f"{name} site is responding, checking version...", Colors.GREEN
|
||
)
|
||
|
||
# Display version information
|
||
deployed_commit = version_info.get("commit_hash", "Unknown")
|
||
branch = version_info.get("branch", "Unknown")
|
||
version = version_info.get("version", "Unknown")
|
||
|
||
colored_print(f"Deployed commit: {deployed_commit}", Colors.BLUE)
|
||
colored_print(f"Deployed branch: {branch}", Colors.BLUE)
|
||
colored_print(f"Deployed version: {version}", Colors.BLUE)
|
||
|
||
# Check if we're looking for a specific version and compare
|
||
current_commit = get_current_commit_hash()
|
||
version_ok = True
|
||
|
||
if current_commit and deployed_commit != "Unknown":
|
||
colored_print(f"Expected commit: {current_commit[:8]}...", Colors.BLUE)
|
||
colored_print(f"Deployed commit: {deployed_commit[:8]}...", Colors.BLUE)
|
||
|
||
if deployed_commit == current_commit:
|
||
colored_print(
|
||
"✅ Exact version match - deployment is up to date!",
|
||
Colors.GREEN,
|
||
)
|
||
elif deployed_commit.startswith(current_commit[:8]) or current_commit.startswith(deployed_commit[:8]):
|
||
# Handle case where we have short vs long commit hashes
|
||
colored_print(
|
||
"✅ Version match (short hash) - deployment is up to date!",
|
||
Colors.GREEN,
|
||
)
|
||
else:
|
||
# Check if deployed version is newer using git
|
||
comparison = is_commit_newer_or_equal(
|
||
deployed_commit, current_commit
|
||
)
|
||
colored_print(f"Commit comparison result: {comparison}", Colors.BLUE)
|
||
|
||
if comparison is True:
|
||
colored_print(
|
||
"ℹ️ Note: A newer version is already deployed (this is fine!)",
|
||
Colors.YELLOW,
|
||
)
|
||
elif comparison is False:
|
||
colored_print(
|
||
f"⚠️ Deployed version ({deployed_commit[:8]}) is older than expected ({current_commit[:8]})",
|
||
Colors.YELLOW,
|
||
)
|
||
colored_print(
|
||
f"Waiting for deployment to update... (attempt {attempt}/{max_retries})",
|
||
Colors.BLUE,
|
||
)
|
||
version_ok = False
|
||
else:
|
||
# Git comparison failed - use simple string comparison as fallback
|
||
colored_print(
|
||
"⚠️ Git comparison failed - using string comparison fallback",
|
||
Colors.YELLOW,
|
||
)
|
||
|
||
# If commits are different, assume we need to wait
|
||
# This is safer than proceeding with wrong version
|
||
colored_print(
|
||
f"⚠️ Deployed version ({deployed_commit[:8]}) differs from expected ({current_commit[:8]})",
|
||
Colors.YELLOW,
|
||
)
|
||
colored_print(
|
||
f"Waiting for deployment to update... (attempt {attempt}/{max_retries})",
|
||
Colors.BLUE,
|
||
)
|
||
version_ok = False
|
||
|
||
# Only proceed with functionality tests if version is correct
|
||
if not version_ok:
|
||
# Version is not correct, skip tests and retry
|
||
pass # Will continue to retry logic below
|
||
else:
|
||
# Run functionality tests
|
||
colored_print("Running basic functionality tests...", Colors.YELLOW)
|
||
|
||
# Test avatar redirect
|
||
if test_avatar_redirect(base_url):
|
||
colored_print("✅ Invalid avatar redirects correctly", Colors.GREEN)
|
||
else:
|
||
colored_print("❌ Invalid avatar redirect failed", Colors.RED)
|
||
return False
|
||
|
||
# Test avatar sizing
|
||
if test_avatar_sizing(base_url):
|
||
pass # Success messages are printed within the function
|
||
else:
|
||
return False
|
||
|
||
# Test stats endpoint
|
||
if test_stats_endpoint(base_url):
|
||
colored_print("✅ Stats endpoint working", Colors.GREEN)
|
||
else:
|
||
colored_print("❌ Stats endpoint failed", Colors.RED)
|
||
return False
|
||
|
||
colored_print(
|
||
f"🎉 {name} deployment verification completed successfully!",
|
||
Colors.GREEN,
|
||
)
|
||
return True
|
||
else:
|
||
colored_print(f"{name} site not responding yet...", Colors.YELLOW)
|
||
|
||
if attempt < max_retries:
|
||
colored_print(
|
||
f"Waiting {retry_delay} seconds before next attempt...", Colors.BLUE
|
||
)
|
||
# Show progress during wait
|
||
for remaining in range(retry_delay, 0, -1):
|
||
print(f"\r⏳ Retrying in {remaining:2d} seconds...", end="", flush=True)
|
||
time.sleep(1)
|
||
print("\r" + " " * 30 + "\r", end="", flush=True) # Clear the line
|
||
|
||
colored_print(
|
||
f"❌ FAILED: {name} deployment verification timed out after {max_retries} attempts",
|
||
Colors.RED,
|
||
)
|
||
return False
|
||
|
||
|
||
def main():
|
||
"""Main function with command-line argument parsing."""
|
||
parser = argparse.ArgumentParser(
|
||
description="Libravatar Deployment Verification Script",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Examples:
|
||
python3 check_deployment.py --dev # Test dev deployment
|
||
python3 check_deployment.py --prod # Test production deployment
|
||
python3 check_deployment.py --endpoint <url> # Test custom endpoint
|
||
python3 check_deployment.py --dev --prod # Test both deployments
|
||
""",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--dev",
|
||
action="store_true",
|
||
help="Test dev deployment (https://dev.libravatar.org)",
|
||
)
|
||
parser.add_argument(
|
||
"--prod",
|
||
action="store_true",
|
||
help="Test production deployment (https://libravatar.org)",
|
||
)
|
||
parser.add_argument("--endpoint", type=str, help="Test custom endpoint URL")
|
||
parser.add_argument(
|
||
"--max-retries",
|
||
type=int,
|
||
default=MAX_RETRIES,
|
||
help=f"Maximum number of retry attempts (default: {MAX_RETRIES})",
|
||
)
|
||
parser.add_argument(
|
||
"--retry-delay",
|
||
type=int,
|
||
default=RETRY_DELAY,
|
||
help=f"Delay between retry attempts in seconds (default: {RETRY_DELAY})",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Validate arguments
|
||
if not any([args.dev, args.prod, args.endpoint]):
|
||
parser.error("At least one of --dev, --prod, or --endpoint must be specified")
|
||
|
||
# Update configuration if custom values provided
|
||
max_retries = args.max_retries
|
||
retry_delay = args.retry_delay
|
||
|
||
colored_print("Libravatar Deployment Verification Script", Colors.BLUE)
|
||
colored_print("=" * 50, Colors.BLUE)
|
||
|
||
# Check dependencies
|
||
if not PIL_AVAILABLE:
|
||
colored_print(
|
||
"⚠️ Warning: PIL/Pillow not available. Image dimension verification will be limited.",
|
||
Colors.YELLOW,
|
||
)
|
||
colored_print(" Install with: pip install Pillow", Colors.YELLOW)
|
||
|
||
results = []
|
||
|
||
# Test dev deployment
|
||
if args.dev:
|
||
colored_print("", Colors.NC)
|
||
dev_result = test_deployment(DEV_URL, "Dev", max_retries, retry_delay)
|
||
results.append(("Dev", dev_result))
|
||
|
||
# Test production deployment
|
||
if args.prod:
|
||
colored_print("", Colors.NC)
|
||
prod_result = test_deployment(PROD_URL, "Production", max_retries, retry_delay)
|
||
results.append(("Production", prod_result))
|
||
|
||
# Test custom endpoint
|
||
if args.endpoint:
|
||
colored_print("", Colors.NC)
|
||
custom_result = test_deployment(
|
||
args.endpoint, "Custom", max_retries, retry_delay
|
||
)
|
||
results.append(("Custom", custom_result))
|
||
|
||
# Summary
|
||
colored_print("", Colors.NC)
|
||
colored_print("=" * 50, Colors.BLUE)
|
||
colored_print("Deployment Verification Summary:", Colors.BLUE)
|
||
colored_print("=" * 50, Colors.BLUE)
|
||
|
||
all_passed = True
|
||
for name, result in results:
|
||
if result:
|
||
colored_print(f"✅ {name} deployment: PASSED", Colors.GREEN)
|
||
else:
|
||
colored_print(f"❌ {name} deployment: FAILED", Colors.RED)
|
||
all_passed = False
|
||
|
||
if all_passed:
|
||
colored_print("🎉 All deployment verifications passed!", Colors.GREEN)
|
||
sys.exit(0)
|
||
else:
|
||
colored_print("❌ Some deployment verifications failed!", Colors.RED)
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|