#!/usr/bin/env python3 """ Performance testing script for Libravatar CI/CD pipeline This script runs automated performance tests to catch regressions and ensure the application meets performance requirements. """ import os import sys import time import statistics import hashlib import random import string from typing import Dict, List, Any, Optional, Tuple # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from libravatar import libravatar_url from urllib.parse import urlsplit from prettytable import PrettyTable def random_string(length=10): """Return some random string with default length 10""" return "".join( random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(length) ) # Try to import Django utilities for local testing, fallback to local implementation try: from ivatar.utils import generate_random_email except ImportError: # Use local version for external testing def generate_random_email(): """Generate a random email address using the same pattern as test_views.py""" username = random_string() domain = random_string() tld = random_string(2) return f"{username}@{domain}.{tld}" # Django setup - only for local testing def setup_django() -> None: """Setup Django for local testing""" os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ivatar.settings") import django django.setup() class PerformanceTestRunner: """Main performance test runner""" # Define all avatar styles and sizes to test AVATAR_STYLES: List[str] = [ "identicon", "monsterid", "robohash", "pagan", "retro", "wavatar", "mm", "mmng", ] AVATAR_SIZES: List[int] = [80, 256] def __init__( self, base_url: str = "http://localhost:8000", concurrent_users: int = 10, test_cache: bool = True, remote_testing: bool = False, ) -> None: self.base_url: str = base_url self.concurrent_users: int = concurrent_users self.test_cache: bool = test_cache self.remote_testing: bool = remote_testing self.client: Optional[Any] = None # Django test client self.results: Dict[str, Any] = {} # Determine if we're testing locally or remotely if remote_testing or not base_url.startswith("http://localhost"): self.remote_testing = True print(f"🌐 Remote testing mode: {base_url}") else: print(f"🏠 Local testing mode: {base_url}") # Only setup Django and create client for local testing setup_django() from django.test import Client self.client = Client() def setup_test_data(self) -> None: """Create test data for performance tests""" print("Setting up test data...") # Import Django models only when needed from django.contrib.auth.models import User from ivatar.ivataraccount.models import ConfirmedEmail # Create test users and emails test_emails = [f"perftest{i}@example.com" for i in range(100)] for i, email in enumerate(test_emails): if not User.objects.filter(username=f"perftest{i}").exists(): user = User.objects.create_user( username=f"perftest{i}", email=email, password="testpass123" ) # Create confirmed email ConfirmedEmail.objects.create( user=user, email=email, ip_address="127.0.0.1" ) print(f"Created {len(test_emails)} test users and emails") def _generate_test_cases(self) -> List[Dict[str, Any]]: """Generate test cases for all avatar styles and sizes""" test_cases = [] for style in self.AVATAR_STYLES: for size in self.AVATAR_SIZES: test_cases.append({"default": style, "size": size}) return test_cases def _test_single_avatar_request( self, case: Dict[str, Any], email: str, use_requests: bool = False ) -> Dict[str, Any]: """Test a single avatar request - shared logic for local and remote testing""" # Use libravatar library to generate the URL full_url = libravatar_url( email=email, size=case["size"], default=case["default"] ) # Extract path and query from the full URL urlobj = urlsplit(full_url) url_path = f"{urlobj.path}?{urlobj.query}" start_time = time.time() if use_requests: # Remote testing with requests import requests url = f"{self.base_url}{url_path}" try: response = requests.get(url, timeout=10) end_time = time.time() duration = (end_time - start_time) * 1000 # Determine cache status from response headers cache_detail = response.headers.get("x-cache-detail", "").lower() age = response.headers.get("age", "0") cache_status = "unknown" if "cache hit" in cache_detail or int(age) > 0: cache_status = "hit" elif "cache miss" in cache_detail or age == "0": cache_status = "miss" return { "test": f"{case['default']}_{case['size']}px", "duration_ms": duration, "status_code": response.status_code, "content_length": len(response.content) if response.content else 0, "success": response.status_code == 200, "cache_status": cache_status, "cache_detail": cache_detail, "age": age, "full_url": full_url, "email": email, } except Exception as e: end_time = time.time() duration = (end_time - start_time) * 1000 return { "test": f"{case['default']}_{case['size']}px", "duration_ms": duration, "status_code": 0, "success": False, "error": str(e), "cache_status": "error", "full_url": full_url, "email": email, } else: # Local testing with Django test client if self.client is None: raise RuntimeError("Django test client not initialized") response = self.client.get(url_path, follow=True) end_time = time.time() duration = (end_time - start_time) * 1000 # Check for cache information in response headers cache_status = "unknown" if hasattr(response, "get") and callable(getattr(response, "get", None)): cache_control = response.get("Cache-Control", "") age = response.get("Age", "0") if age and int(age) > 0: cache_status = "hit" elif "no-cache" in cache_control: cache_status = "miss" else: cache_status = "miss" # Default assumption for first generation # Handle content length for different response types content_length = 0 if hasattr(response, "content"): content_length = len(response.content) if response.content else 0 elif hasattr(response, "streaming_content"): # For FileResponse, we can't easily get content length without consuming the stream content_length = 1 # Just indicate there's content return { "test": f"{case['default']}_{case['size']}px", "duration_ms": duration, "status_code": response.status_code, "content_length": content_length, "cache_status": cache_status, "success": response.status_code == 200, "full_url": full_url, "email": email, } def _display_avatar_results(self, results: List[Dict[str, Any]]) -> None: """Display avatar test results using prettytable for perfect alignment""" # Group results by avatar style style_results: Dict[str, List[Dict[str, Any]]] = {} for result in results: style = result["test"].split("_")[0] # Extract style from test name if style not in style_results: style_results[style] = [] style_results[style].append(result) # Create table table = PrettyTable() table.field_names = ["Avatar Style", "Size", "Time (ms)", "Status", "Cache"] table.align["Avatar Style"] = "l" table.align["Size"] = "r" table.align["Time (ms)"] = "r" table.align["Status"] = "c" table.align["Cache"] = "c" # Add data to table styles_with_data = [ style for style in self.AVATAR_STYLES if style in style_results ] for i, style in enumerate(styles_with_data): style_data = style_results[style] successful_results = [r for r in style_data if r.get("success", True)] failed_results = [r for r in style_data if not r.get("success", True)] if successful_results: # Calculate average avg_duration = statistics.mean( [r["duration_ms"] for r in successful_results] ) # Determine overall cache status cache_statuses = [ r["cache_status"] for r in successful_results if r["cache_status"] != "unknown" ] if not cache_statuses: cache_summary = "unknown" elif all(status == "hit" for status in cache_statuses): cache_summary = "hit" elif all(status == "miss" for status in cache_statuses): cache_summary = "miss" else: cache_summary = "mixed" # Determine status icon for average line if len(failed_results) == 0: avg_status_icon = "✅" # All successful elif len(successful_results) == 0: avg_status_icon = "❌" # All failed else: avg_status_icon = "⚠️" # Mixed results # Add average row table.add_row( [ f"{style} (avg)", "", f"{avg_duration:.2f}", avg_status_icon, cache_summary, ] ) # Add individual size rows for result in style_data: size = result["test"].split("_")[1] # Extract size from test name status_icon = "✅" if result.get("success", True) else "❌" cache_status = result["cache_status"] if result.get("success", True): table.add_row( [ "", size, f"{result['duration_ms']:.2f}", status_icon, cache_status, ] ) else: error_msg = result.get("error", "Failed") table.add_row(["", size, error_msg, status_icon, cache_status]) else: # All requests failed table.add_row([f"{style} (avg)", "", "Failed", "❌", "error"]) for result in style_data: size = result["test"].split("_")[1] error_msg = result.get("error", "Failed") table.add_row(["", size, error_msg, "❌", "error"]) # Add divider line between styles (except after the last style) if i < len(styles_with_data) - 1: table.add_row(["-" * 15, "-" * 5, "-" * 9, "-" * 6, "-" * 5]) print(table) def test_avatar_generation_performance(self) -> None: """Test avatar generation performance""" print("\n=== Avatar Generation Performance Test ===") # Generate test cases for all avatar styles and sizes test_cases = self._generate_test_cases() results = [] # Generate random email for testing test_email = generate_random_email() print(f" Testing with email: {test_email}") for case in test_cases: result = self._test_single_avatar_request( case, test_email, use_requests=False ) results.append(result) # Show example URL from first result if results: print(f" Example URL: {results[0]['full_url']}") # Display results grouped by style self._display_avatar_results(results) # Calculate statistics successful_results = [r for r in results if r.get("success", True)] if successful_results: durations = [r["duration_ms"] for r in successful_results] avg_duration = statistics.mean(durations) max_duration = max(durations) else: avg_duration = 0 max_duration = 0 print(f"\n Average: {avg_duration:.2f}ms") print(f" Maximum: {max_duration:.2f}ms") # Performance thresholds if avg_duration > 1000: # 1 second print(" ⚠️ WARNING: Average avatar generation time exceeds 1s") elif avg_duration > 500: # 500ms print(" ⚠️ CAUTION: Average avatar generation time exceeds 500ms") else: print(" ✅ Avatar generation performance is good") self.results["avatar_generation"] = { "average_ms": avg_duration, "maximum_ms": max_duration, "results": results, } def test_concurrent_load(self, response_threshold: int = 1000, p95_threshold: int = 2000) -> None: """Test concurrent load handling""" print("\n=== Concurrent Load Test ===") num_requests = 20 if self.remote_testing: print(f" Running {num_requests} HTTP requests to {self.base_url}...") results = self._test_remote_concurrent_load(num_requests) else: print(f" Running {num_requests} local avatar generations...") results = self._test_local_concurrent_load(num_requests) # Analyze results successful_requests = [r for r in results if r["success"]] failed_requests = [r for r in results if not r["success"]] # Analyze cache performance cache_hits = [r for r in results if r.get("cache_status") == "hit"] cache_misses = [r for r in results if r.get("cache_status") == "miss"] cache_errors = [r for r in results if r.get("cache_status") == "error"] total_duration = ( sum(r["duration_ms"] for r in results) / 1000 ) # Convert to seconds print(f" Total time: {total_duration:.2f}s") print(f" Successful requests: {len(successful_requests)}/{num_requests}") print(f" Failed requests: {len(failed_requests)}") # Show cache statistics if available if cache_hits or cache_misses: print(f" Cache hits: {len(cache_hits)}") print(f" Cache misses: {len(cache_misses)}") if cache_errors: print(f" Cache errors: {len(cache_errors)}") cache_hit_rate = ( len(cache_hits) / (len(cache_hits) + len(cache_misses)) * 100 if (cache_hits or cache_misses) else 0 ) print(f" Cache hit rate: {cache_hit_rate:.1f}%") if successful_requests: durations = [r["duration_ms"] for r in successful_requests] avg_duration = statistics.mean(durations) # Calculate p95 safely if len(durations) >= 2: try: p95_duration = statistics.quantiles(durations, n=20)[ 18 ] # 95th percentile except (ValueError, IndexError): p95_duration = max(durations) else: p95_duration = max(durations) print(f" Average response time: {avg_duration:.2f}ms") print(f" 95th percentile: {p95_duration:.2f}ms") print( f" Operations per second: {len(successful_requests) / total_duration:.2f}" ) # Performance evaluation if len(failed_requests) > 0: print(" ⚠️ WARNING: Some operations failed under load") elif p95_duration > p95_threshold: print(f" ⚠️ WARNING: 95th percentile response time exceeds {p95_threshold}ms") elif avg_duration > response_threshold: print(f" ⚠️ CAUTION: Average response time exceeds {response_threshold}ms under load") else: print(" ✅ Load handling is good") else: avg_duration = 0 p95_duration = 0 print(" ❌ All operations failed") self.results["concurrent_load"] = { "total_duration_s": total_duration, "successful_requests": len(successful_requests), "failed_requests": len(failed_requests), "average_ms": avg_duration, "p95_ms": p95_duration, "requests_per_second": ( len(successful_requests) / total_duration if total_duration > 0 else 0 ), "cache_hits": len(cache_hits), "cache_misses": len(cache_misses), "cache_errors": len(cache_errors), "cache_hit_rate": ( len(cache_hits) / (len(cache_hits) + len(cache_misses)) * 100 if (cache_hits or cache_misses) else 0 ), } def _test_remote_concurrent_load(self, num_requests: int) -> List[Dict[str, Any]]: """Test concurrent load against remote server""" import requests # noqa: F401 from concurrent.futures import ThreadPoolExecutor, as_completed def make_remote_request(thread_id): test_email = generate_random_email() # Use libravatar library to generate the URL full_url = libravatar_url(email=test_email, size=80, default="identicon") urlobj = urlsplit(full_url) url_path = f"{urlobj.path}?{urlobj.query}" url = f"{self.base_url}{url_path}" start_time = time.time() try: response = requests.get(url, timeout=10) end_time = time.time() # Determine cache status cache_detail = response.headers.get("x-cache-detail", "").lower() age = response.headers.get("age", "0") cache_status = "unknown" if "cache hit" in cache_detail or int(age) > 0: cache_status = "hit" elif "cache miss" in cache_detail or age == "0": cache_status = "miss" return { "thread_id": thread_id, "duration_ms": (end_time - start_time) * 1000, "status_code": response.status_code, "success": response.status_code == 200, "cache_status": cache_status, } except Exception as e: end_time = time.time() return { "thread_id": thread_id, "duration_ms": (end_time - start_time) * 1000, "success": False, "error": str(e), "cache_status": "error", } results = [] with ThreadPoolExecutor(max_workers=self.concurrent_users) as executor: futures = [ executor.submit(make_remote_request, i) for i in range(num_requests) ] for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: print(f" Request failed: {e}") return results def _test_local_concurrent_load(self, num_requests: int) -> List[Dict[str, Any]]: """Test concurrent load locally using avatar generation functions""" results = [] # Import avatar generation functions try: import Identicon for i in range(num_requests): test_email = generate_random_email() email_hash = hashlib.md5(test_email.encode()).hexdigest() request_start = time.time() try: # Test identicon generation directly identicon_data = Identicon.render(email_hash) request_end = time.time() results.append( { "thread_id": i, "duration_ms": (request_end - request_start) * 1000, "success": len(identicon_data) > 0, "cache_status": "miss", # Direct generation is always a cache miss } ) except Exception as e: request_end = time.time() results.append( { "thread_id": i, "duration_ms": (request_end - request_start) * 1000, "success": False, "error": str(e), "cache_status": "error", } ) except ImportError: # Fallback: just test database queries print( " Avatar generators not available, testing database queries instead..." ) for i in range(num_requests): request_start = time.time() try: from django.contrib.auth.models import User User.objects.count() request_end = time.time() results.append( { "thread_id": i, "duration_ms": (request_end - request_start) * 1000, "success": True, "cache_status": "n/a", # Database queries don't use image cache } ) except Exception as e: request_end = time.time() results.append( { "thread_id": i, "duration_ms": (request_end - request_start) * 1000, "success": False, "error": str(e), "cache_status": "error", } ) return results def test_database_performance(self) -> None: """Test database query performance""" print("\n=== Database Performance Test ===") from django.db import connection from django.contrib.auth.models import User from ivatar.ivataraccount.models import ConfirmedEmail, Photo # Reset query log connection.queries_log.clear() test_queries = [ {"name": "User count", "query": lambda: User.objects.count()}, { "name": "Email lookup by digest", "query": lambda: ConfirmedEmail.objects.filter( digest="5d41402abc4b2a76b9719d911017c592" ).first(), }, { "name": "Top 10 photos by access count", "query": lambda: list(Photo.objects.order_by("-access_count")[:10]), }, ] for test in test_queries: start_time = time.time() try: test["query"]() end_time = time.time() duration = (end_time - start_time) * 1000 print(f" {test['name']}: {duration:.2f}ms") if duration > 100: # 100ms threshold print(" ⚠️ WARNING: Query exceeds 100ms threshold") except Exception as e: print(f" {test['name']}: ERROR - {e}") # Check for N+1 queries query_count = len(connection.queries) if query_count > 10: print( f" ⚠️ WARNING: {query_count} database queries executed (potential N+1 problem)" ) else: print(f" ✅ Database query count is reasonable ({query_count} queries)") def test_cache_performance(self) -> None: """Test caching effectiveness""" if not self.test_cache: print("\n=== Cache Performance Test ===") print(" ⏭️ Cache testing disabled") return print("\n=== Cache Performance Test ===") # Generate a random email address for cache testing test_email = generate_random_email() print(f" Testing with: {test_email}") if self.remote_testing: first_duration, second_duration = self._test_remote_cache_performance( test_email ) else: first_duration, second_duration = self._test_local_cache_performance( test_email ) print(f" First request: {first_duration:.2f}ms") print(f" Second request: {second_duration:.2f}ms") improvement_ratio = ( first_duration / second_duration if second_duration > 0 else 0 ) # Analyze cache effectiveness based on headers AND timing cache_working = False cache_status = "unknown" if self.remote_testing and hasattr(self, "cache_info"): # For remote testing, check actual cache headers first_cached = self.cache_info["first_request"]["is_cached"] second_cached = self.cache_info["second_request"]["is_cached"] if not first_cached and second_cached: cache_status = "✅ Cache working correctly (miss → hit)" cache_working = True elif first_cached and second_cached: cache_status = "✅ Cache working (both requests cached)" cache_working = True elif not first_cached and not second_cached: cache_status = "⚠️ No cache hits detected" cache_working = False else: cache_status = "⚠️ Unexpected cache behavior" cache_working = False else: # For local testing, fall back to timing-based analysis if improvement_ratio >= 1.5: cache_status = "✅ Caching appears to be working (timing-based)" cache_working = True else: cache_status = ( "⚠️ Caching may not be working as expected (timing-based)" ) cache_working = False print(f" {cache_status}") if improvement_ratio > 1: print(f" Performance improvement: {improvement_ratio:.1f}x faster") self.results["cache_performance"] = { "first_request_ms": first_duration, "second_request_ms": second_duration, "improvement_ratio": improvement_ratio, "cache_working": cache_working, "cache_status": cache_status, "cache_headers": getattr(self, "cache_info", {}), } def _test_remote_cache_performance(self, email: str) -> Tuple[float, float]: """Test cache performance against remote server""" import requests # Use libravatar library to generate the URL full_url = libravatar_url(email=email, size=80, default="identicon") urlobj = urlsplit(full_url) url_path = f"{urlobj.path}?{urlobj.query}" url = f"{self.base_url}{url_path}" # First request (should be cache miss or fresh) start_time = time.time() response1 = requests.get(url, timeout=10) first_duration = (time.time() - start_time) * 1000 # Check first request headers first_cache_detail = response1.headers.get("x-cache-detail", "unknown") first_age = response1.headers.get("age", "0") first_cache_control = response1.headers.get("cache-control", "none") print(" First request headers:") print(f" x-cache-detail: {first_cache_detail}") print(f" age: {first_age}") print(f" cache-control: {first_cache_control}") # Small delay to ensure any processing is complete time.sleep(0.1) # Second request (should be cache hit) start_time = time.time() response2 = requests.get(url, timeout=10) second_duration = (time.time() - start_time) * 1000 # Check second request headers second_cache_detail = response2.headers.get("x-cache-detail", "unknown") second_age = response2.headers.get("age", "0") second_cache_control = response2.headers.get("cache-control", "none") print(" Second request headers:") print(f" x-cache-detail: {second_cache_detail}") print(f" age: {second_age}") print(f" cache-control: {second_cache_control}") # Determine if we actually got cache hits first_is_cached = ( "cache hit" in first_cache_detail.lower() or int(first_age) > 0 ) second_is_cached = ( "cache hit" in second_cache_detail.lower() or int(second_age) > 0 ) print(" Cache analysis:") print( f" First request: {'Cache HIT' if first_is_cached else 'Cache MISS'}" ) print( f" Second request: {'Cache HIT' if second_is_cached else 'Cache MISS'}" ) # Store cache information for analysis self.cache_info = { "first_request": { "cache_detail": first_cache_detail, "age": first_age, "is_cached": first_is_cached, }, "second_request": { "cache_detail": second_cache_detail, "age": second_age, "is_cached": second_is_cached, }, } return first_duration, second_duration def _test_local_cache_performance(self, email: str) -> Tuple[float, float]: """Test cache performance locally""" # Use libravatar library to generate the URL full_url = libravatar_url(email=email, size=80, default="identicon") urlobj = urlsplit(full_url) url_path = f"{urlobj.path}?{urlobj.query}" # First request (cache miss) start_time = time.time() if self.client: self.client.get(url_path) first_duration = (time.time() - start_time) * 1000 # Second request (should be cache hit) start_time = time.time() if self.client: self.client.get(url_path) second_duration = (time.time() - start_time) * 1000 return first_duration, second_duration def run_all_tests(self, avatar_threshold: int = 1000, response_threshold: int = 1000, p95_threshold: int = 2000, ignore_cache_warnings: bool = False) -> Optional[Dict[str, Any]]: """Run all performance tests""" print("Starting Libravatar Performance Tests") print("=" * 50) start_time = time.time() try: # Only setup test data for local testing if not self.remote_testing: self.setup_test_data() # Run tests based on mode if self.remote_testing: print("🌐 Running remote server tests...") self.test_remote_avatar_performance(response_threshold) else: print("🏠 Running local tests...") self.test_avatar_generation_performance() self.test_database_performance() # Always test concurrent load self.test_concurrent_load(response_threshold, p95_threshold) # Test cache performance if enabled self.test_cache_performance() end_time = time.time() total_duration = end_time - start_time print("\n" + "=" * 50) print(f"Performance tests completed in {total_duration:.2f}s") # Overall assessment self.assess_overall_performance(avatar_threshold, response_threshold, p95_threshold, ignore_cache_warnings) return self.results except Exception as e: print(f"Performance test failed: {e}") return None def test_remote_avatar_performance(self, response_threshold: int = 1000) -> None: """Test avatar generation performance on remote server""" print("\n=== Remote Avatar Performance Test ===") # Generate test cases for all avatar styles and sizes test_cases = self._generate_test_cases() results = [] # Generate random email for testing test_email = generate_random_email() print(f" Testing with email: {test_email}") for case in test_cases: result = self._test_single_avatar_request( case, test_email, use_requests=True ) results.append(result) # Show example URL from first result if results: print(f" Example URL: {results[0]['full_url']}") # Display results grouped by style self._display_avatar_results(results) # Calculate statistics for successful requests successful_results = [r for r in results if r["success"]] if successful_results: durations = [r["duration_ms"] for r in successful_results] avg_duration = statistics.mean(durations) max_duration = max(durations) print(f"\n Average: {avg_duration:.2f}ms") print(f" Maximum: {max_duration:.2f}ms") print(f" Success rate: {len(successful_results)}/{len(results)}") # Performance thresholds for remote testing if avg_duration > (response_threshold * 2): # 2x threshold for warning print(f" ⚠️ WARNING: Average response time exceeds {response_threshold * 2}ms") elif avg_duration > response_threshold: print(f" ⚠️ CAUTION: Average response time exceeds {response_threshold}ms") else: print(" ✅ Remote avatar performance is good") else: avg_duration = 0 max_duration = 0 print(" ❌ All remote requests failed") self.results["avatar_generation"] = { "average_ms": avg_duration, "maximum_ms": max_duration, "results": results, "success_rate": len(successful_results) / len(results) if results else 0, } def assess_overall_performance(self, avatar_threshold: int = 1000, response_threshold: int = 1000, p95_threshold: int = 2000, ignore_cache_warnings: bool = False) -> bool: """Provide overall performance assessment""" print("\n=== OVERALL PERFORMANCE ASSESSMENT ===") warnings = [] # Check avatar generation if "avatar_generation" in self.results: avg_gen = self.results["avatar_generation"]["average_ms"] if avg_gen > avatar_threshold: warnings.append(f"Avatar generation is slow ({avg_gen:.0f}ms average, threshold: {avatar_threshold}ms)") # Check concurrent load if "concurrent_load" in self.results: failed = self.results["concurrent_load"]["failed_requests"] if failed > 0: warnings.append(f"{failed} requests failed under concurrent load") # Check cache performance if "cache_performance" in self.results and not ignore_cache_warnings: cache_working = self.results["cache_performance"].get( "cache_working", False ) if not cache_working: warnings.append("Caching may not be working effectively") if warnings: print("⚠️ Performance Issues Found:") for warning in warnings: print(f" - {warning}") # noqa: E221 print("\nRecommendations:") print(" - Review database indexes and query optimization") print(" - Check caching configuration") print(" - Consider async processing for heavy operations") else: print("✅ Overall performance is good!") print(" - Avatar generation is responsive") print(" - Application handles concurrent load well") print(" - Caching is working effectively") # Store warnings in results for main function to check self.results["warnings"] = warnings return len(warnings) > 0 def main() -> Optional[Dict[str, Any]]: """Main entry point""" import argparse parser = argparse.ArgumentParser(description="Run Libravatar performance tests") parser.add_argument( "--base-url", default="http://localhost:8000", help="Base URL for testing (default: http://localhost:8000)", ) parser.add_argument( "--concurrent-users", type=int, default=10, help="Number of concurrent users to simulate (default: 10)", ) parser.add_argument("--output", help="Output file for results (JSON)") parser.add_argument( "--no-cache-test", action="store_true", help="Disable cache performance testing (useful for local development)", ) parser.add_argument( "--remote", action="store_true", help="Force remote testing mode (auto-detected for non-localhost URLs)", ) parser.add_argument( "--avatar-threshold", type=int, default=1000, help="Avatar generation threshold in ms (default: 1000ms, use 2500 for dev environments)", ) parser.add_argument( "--response-threshold", type=int, default=1000, help="Response time threshold in ms (default: 1000ms, use 2500 for dev environments)", ) parser.add_argument( "--p95-threshold", type=int, default=2000, help="95th percentile threshold in ms (default: 2000ms, use 5000 for dev environments)", ) parser.add_argument( "--ignore-cache-warnings", action="store_true", help="Don't fail on cache performance warnings (useful for dev environments)", ) args = parser.parse_args() # Determine if we should test cache performance test_cache = not args.no_cache_test # Determine if we're doing remote testing remote_testing = args.remote or not args.base_url.startswith("http://localhost") runner = PerformanceTestRunner( base_url=args.base_url, concurrent_users=args.concurrent_users, test_cache=test_cache, remote_testing=remote_testing, ) results = runner.run_all_tests(args.avatar_threshold, args.response_threshold, args.p95_threshold, args.ignore_cache_warnings) if args.output and results: import json with open(args.output, "w") as f: json.dump(results, f, indent=2) print(f"\nResults saved to {args.output}") # Exit with error code if there were performance issues if results and "warnings" in results and len(results["warnings"]) > 0: sys.exit(1) return results if __name__ == "__main__": main()