#!/usr/bin/env python3 """ Libravatar Deployment Verification Script This script verifies that Libravatar deployments are working correctly by: - Checking version endpoint - Testing avatar functionality with various sizes - Verifying stats endpoint - Testing redirect behavior Usage: python3 check_deployment.py --dev # Test dev deployment python3 check_deployment.py --prod # Test production deployment python3 check_deployment.py --endpoint # Test custom endpoint python3 check_deployment.py --dev --prod # Test both deployments """ import argparse import json import os import random import ssl import subprocess import sys import tempfile import time from typing import Dict, Optional, Tuple from urllib.parse import urljoin from urllib.request import urlopen, Request from urllib.error import HTTPError, URLError try: from PIL import Image PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False # Configuration DEV_URL = "https://dev.libravatar.org" PROD_URL = "https://libravatar.org" MAX_RETRIES = 5 RETRY_DELAY = 10 # ANSI color codes class Colors: RED = "\033[0;31m" GREEN = "\033[0;32m" YELLOW = "\033[1;33m" BLUE = "\033[0;34m" NC = "\033[0m" # No Color def colored_print(message: str, color: str = Colors.NC) -> None: """Print a colored message with immediate flush.""" print(f"{color}{message}{Colors.NC}", flush=True) def get_current_commit_hash() -> Optional[str]: """Get the current commit hash from git or CI environment.""" # First try GitLab CI environment variable (most reliable in CI) ci_commit = os.environ.get("CI_COMMIT_SHA") if ci_commit: colored_print(f"Using CI commit hash: {ci_commit}", Colors.BLUE) return ci_commit # Fallback to git command try: result = subprocess.run( ["git", "rev-parse", "HEAD"], capture_output=True, text=True, check=True, ) commit_hash = result.stdout.strip() colored_print(f"Using git commit hash: {commit_hash}", Colors.BLUE) return commit_hash except (subprocess.CalledProcessError, FileNotFoundError): colored_print("Could not determine current commit hash", Colors.RED) return None def is_commit_newer_or_equal(commit1: str, commit2: str) -> Optional[bool]: """ Check if commit1 is newer than or equal to commit2 in git history. Returns: True if commit1 is newer or equal to commit2 False if commit1 is older than commit2 None if comparison fails """ try: # First try to get commit timestamps for comparison try: result1 = subprocess.run( ["git", "show", "-s", "--format=%ct", commit1], capture_output=True, text=True, check=True, ) result2 = subprocess.run( ["git", "show", "-s", "--format=%ct", commit2], capture_output=True, text=True, check=True, ) timestamp1 = int(result1.stdout.strip()) timestamp2 = int(result2.stdout.strip()) colored_print(f"Commit {commit1[:8]} timestamp: {timestamp1}", Colors.BLUE) colored_print(f"Commit {commit2[:8]} timestamp: {timestamp2}", Colors.BLUE) # commit1 is newer if it has a later timestamp return timestamp1 >= timestamp2 except (subprocess.CalledProcessError, ValueError): # Fallback to merge-base if timestamp comparison fails colored_print("Timestamp comparison failed, trying merge-base", Colors.YELLOW) # Use git merge-base to check if commit2 is ancestor of commit1 subprocess.run( ["git", "merge-base", "--is-ancestor", commit2, commit1], capture_output=True, check=True, ) return True except subprocess.CalledProcessError: # If the above fails, try the reverse try: subprocess.run( ["git", "merge-base", "--is-ancestor", commit1, commit2], capture_output=True, check=True, ) return False except subprocess.CalledProcessError: colored_print("Git comparison failed - shallow clone or missing commits", Colors.YELLOW) return None except Exception as e: colored_print(f"Git comparison error: {e}", Colors.RED) return None def make_request( url: str, method: str = "GET", headers: Optional[Dict[str, str]] = None, follow_redirects: bool = True, binary: bool = False, ) -> Tuple[bool, Optional[bytes], Optional[Dict[str, str]]]: """ Make an HTTP request and return success status, content, and headers. Args: url: URL to request method: HTTP method headers: Additional headers follow_redirects: Whether to follow redirects automatically Returns: Tuple of (success, content, headers) """ req = Request(url, headers=headers or {}) req.get_method = lambda: method # Create SSL context that handles certificate verification issues ssl_context = ssl.create_default_context() # Try with SSL verification first try: opener = urlopen if not follow_redirects: # Create a custom opener that doesn't follow redirects import urllib.request class NoRedirectHandler(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): return None opener = urllib.request.build_opener(NoRedirectHandler) if follow_redirects: with opener(req, context=ssl_context) as response: content = response.read() if not binary: try: content = content.decode("utf-8") except UnicodeDecodeError: # If content is not text (e.g., binary image), return empty string content = "" headers = dict(response.headers) return True, content, headers else: response = opener.open(req) content = response.read() if not binary: try: content = content.decode("utf-8") except UnicodeDecodeError: content = "" headers = dict(response.headers) return True, content, headers except URLError as url_err: # Check if this is an SSL error wrapped in URLError if isinstance(url_err.reason, ssl.SSLError): # If SSL fails, try with unverified context (less secure but works for testing) ssl_context_unverified = ssl.create_default_context() ssl_context_unverified.check_hostname = False ssl_context_unverified.verify_mode = ssl.CERT_NONE try: if follow_redirects: with urlopen(req, context=ssl_context_unverified) as response: content = response.read() if not binary: try: content = content.decode("utf-8") except UnicodeDecodeError: content = "" headers = dict(response.headers) return True, content, headers else: import urllib.request class NoRedirectHandler(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): return None opener = urllib.request.build_opener(NoRedirectHandler) response = opener.open(req) content = response.read() if not binary: try: content = content.decode("utf-8") except UnicodeDecodeError: content = "" headers = dict(response.headers) return True, content, headers except Exception: return False, None, None else: return False, None, None except HTTPError: return False, None, None def check_version_endpoint(base_url: str) -> Tuple[bool, Optional[Dict]]: """Check the version endpoint and return deployment info.""" version_url = urljoin(base_url, "/deployment/version/") success, content, _ = make_request(version_url) if not success or not content: return False, None try: version_info = json.loads(content) return True, version_info except json.JSONDecodeError: return False, None def test_avatar_redirect(base_url: str) -> bool: """Test that invalid avatar requests redirect to default image.""" avatar_url = urljoin(base_url, "/avatar/test@example.com") # Use a simple approach: check if the final URL after redirect contains deadbeef.png try: req = Request(avatar_url) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE with urlopen(req, context=ssl_context) as response: final_url = response.geturl() return "deadbeef.png" in final_url except Exception: return False def test_avatar_sizing(base_url: str) -> bool: """Test avatar endpoint with random sizes.""" # Use a known test hash for consistent testing test_hash = "63a75a80e6b1f4adfdb04c1ca02e596c" # Generate random sizes between 50-250 sizes = [random.randint(50, 250) for _ in range(2)] for size in sizes: avatar_url = urljoin(base_url, f"/avatar/{test_hash}?s={size}") # Download image to temporary file success, content, _ = make_request(avatar_url, binary=True) if not success or not content: colored_print(f"❌ Avatar endpoint failed for size {size}", Colors.RED) return False # Check image dimensions if PIL_AVAILABLE: try: with tempfile.NamedTemporaryFile(suffix=".jpg") as temp_file: temp_file.write(content) temp_file.flush() with Image.open(temp_file.name) as img: width, height = img.size if width == size and height == size: colored_print( f"✅ Avatar size {size}x{size} verified", Colors.GREEN ) else: colored_print( f"❌ Avatar wrong size: expected {size}x{size}, got {width}x{height}", Colors.RED, ) return False except Exception as e: colored_print(f"❌ Error checking image dimensions: {e}", Colors.RED) return False else: # Fallback: just check if we got some content if len(content) > 100: # Assume valid image if we got substantial content colored_print( f"✅ Avatar size {size} downloaded (PIL not available for verification)", Colors.YELLOW, ) else: colored_print( f"❌ Avatar endpoint returned insufficient content for size {size}", Colors.RED, ) return False return True def test_stats_endpoint(base_url: str) -> bool: """Test that the stats endpoint is accessible.""" stats_url = urljoin(base_url, "/stats/") success, _, _ = make_request(stats_url) return success def test_deployment( base_url: str, name: str, max_retries: int = MAX_RETRIES, retry_delay: int = RETRY_DELAY, ) -> bool: """ Test a deployment with retry logic. Args: base_url: Base URL of the deployment name: Human-readable name for the deployment max_retries: Maximum number of retry attempts Returns: True if all tests pass, False otherwise """ colored_print(f"Testing {name} deployment at {base_url}", Colors.YELLOW) for attempt in range(1, max_retries + 1): colored_print( f"Attempt {attempt}/{max_retries}: Checking {name} deployment...", Colors.BLUE, ) # Check if site is responding success, version_info = check_version_endpoint(base_url) if success and version_info: colored_print( f"{name} site is responding, checking version...", Colors.GREEN ) # Display version information deployed_commit = version_info.get("commit_hash", "Unknown") branch = version_info.get("branch", "Unknown") version = version_info.get("version", "Unknown") colored_print(f"Deployed commit: {deployed_commit}", Colors.BLUE) colored_print(f"Deployed branch: {branch}", Colors.BLUE) colored_print(f"Deployed version: {version}", Colors.BLUE) # Check if we're looking for a specific version and compare current_commit = get_current_commit_hash() version_ok = True if current_commit and deployed_commit != "Unknown": colored_print(f"Expected commit: {current_commit[:8]}...", Colors.BLUE) colored_print(f"Deployed commit: {deployed_commit[:8]}...", Colors.BLUE) if deployed_commit == current_commit: colored_print( "✅ Exact version match - deployment is up to date!", Colors.GREEN, ) elif deployed_commit.startswith(current_commit[:8]) or current_commit.startswith(deployed_commit[:8]): # Handle case where we have short vs long commit hashes colored_print( "✅ Version match (short hash) - deployment is up to date!", Colors.GREEN, ) else: # Check if deployed version is newer using git comparison = is_commit_newer_or_equal( deployed_commit, current_commit ) colored_print(f"Commit comparison result: {comparison}", Colors.BLUE) if comparison is True: colored_print( "ℹ️ Note: A newer version is already deployed (this is fine!)", Colors.YELLOW, ) elif comparison is False: colored_print( f"⚠️ Deployed version ({deployed_commit[:8]}) is older than expected ({current_commit[:8]})", Colors.YELLOW, ) colored_print( f"Waiting for deployment to update... (attempt {attempt}/{max_retries})", Colors.BLUE, ) version_ok = False else: # Git comparison failed - use simple string comparison as fallback colored_print( "⚠️ Git comparison failed - using string comparison fallback", Colors.YELLOW, ) # If commits are different, assume we need to wait # This is safer than proceeding with wrong version colored_print( f"⚠️ Deployed version ({deployed_commit[:8]}) differs from expected ({current_commit[:8]})", Colors.YELLOW, ) colored_print( f"Waiting for deployment to update... (attempt {attempt}/{max_retries})", Colors.BLUE, ) version_ok = False # Only proceed with functionality tests if version is correct if not version_ok: # Version is not correct, skip tests and retry pass # Will continue to retry logic below else: # Run functionality tests colored_print("Running basic functionality tests...", Colors.YELLOW) # Test avatar redirect if test_avatar_redirect(base_url): colored_print("✅ Invalid avatar redirects correctly", Colors.GREEN) else: colored_print("❌ Invalid avatar redirect failed", Colors.RED) return False # Test avatar sizing if test_avatar_sizing(base_url): pass # Success messages are printed within the function else: return False # Test stats endpoint if test_stats_endpoint(base_url): colored_print("✅ Stats endpoint working", Colors.GREEN) else: colored_print("❌ Stats endpoint failed", Colors.RED) return False colored_print( f"🎉 {name} deployment verification completed successfully!", Colors.GREEN, ) return True else: colored_print(f"{name} site not responding yet...", Colors.YELLOW) if attempt < max_retries: colored_print( f"Waiting {retry_delay} seconds before next attempt...", Colors.BLUE ) # Show progress during wait for remaining in range(retry_delay, 0, -1): print(f"\r⏳ Retrying in {remaining:2d} seconds...", end="", flush=True) time.sleep(1) print("\r" + " " * 30 + "\r", end="", flush=True) # Clear the line colored_print( f"❌ FAILED: {name} deployment verification timed out after {max_retries} attempts", Colors.RED, ) return False def main(): """Main function with command-line argument parsing.""" parser = argparse.ArgumentParser( description="Libravatar Deployment Verification Script", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 check_deployment.py --dev # Test dev deployment python3 check_deployment.py --prod # Test production deployment python3 check_deployment.py --endpoint # Test custom endpoint python3 check_deployment.py --dev --prod # Test both deployments """, ) parser.add_argument( "--dev", action="store_true", help="Test dev deployment (https://dev.libravatar.org)", ) parser.add_argument( "--prod", action="store_true", help="Test production deployment (https://libravatar.org)", ) parser.add_argument("--endpoint", type=str, help="Test custom endpoint URL") parser.add_argument( "--max-retries", type=int, default=MAX_RETRIES, help=f"Maximum number of retry attempts (default: {MAX_RETRIES})", ) parser.add_argument( "--retry-delay", type=int, default=RETRY_DELAY, help=f"Delay between retry attempts in seconds (default: {RETRY_DELAY})", ) args = parser.parse_args() # Validate arguments if not any([args.dev, args.prod, args.endpoint]): parser.error("At least one of --dev, --prod, or --endpoint must be specified") # Update configuration if custom values provided max_retries = args.max_retries retry_delay = args.retry_delay colored_print("Libravatar Deployment Verification Script", Colors.BLUE) colored_print("=" * 50, Colors.BLUE) # Check dependencies if not PIL_AVAILABLE: colored_print( "⚠️ Warning: PIL/Pillow not available. Image dimension verification will be limited.", Colors.YELLOW, ) colored_print(" Install with: pip install Pillow", Colors.YELLOW) results = [] # Test dev deployment if args.dev: colored_print("", Colors.NC) dev_result = test_deployment(DEV_URL, "Dev", max_retries, retry_delay) results.append(("Dev", dev_result)) # Test production deployment if args.prod: colored_print("", Colors.NC) prod_result = test_deployment(PROD_URL, "Production", max_retries, retry_delay) results.append(("Production", prod_result)) # Test custom endpoint if args.endpoint: colored_print("", Colors.NC) custom_result = test_deployment( args.endpoint, "Custom", max_retries, retry_delay ) results.append(("Custom", custom_result)) # Summary colored_print("", Colors.NC) colored_print("=" * 50, Colors.BLUE) colored_print("Deployment Verification Summary:", Colors.BLUE) colored_print("=" * 50, Colors.BLUE) all_passed = True for name, result in results: if result: colored_print(f"✅ {name} deployment: PASSED", Colors.GREEN) else: colored_print(f"❌ {name} deployment: FAILED", Colors.RED) all_passed = False if all_passed: colored_print("🎉 All deployment verifications passed!", Colors.GREEN) sys.exit(0) else: colored_print("❌ Some deployment verifications failed!", Colors.RED) sys.exit(1) if __name__ == "__main__": main()