#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Performance testing script for Libravatar CI/CD pipeline This script runs automated performance tests to catch regressions and ensure the application meets performance requirements. """ import os import sys import time import statistics import hashlib # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Django setup - only for local testing def setup_django(): """Setup Django for local testing""" os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ivatar.settings") import django django.setup() class PerformanceTestRunner: """Main performance test runner""" def __init__( self, base_url="http://localhost:8000", concurrent_users=10, test_cache=True, remote_testing=False, ): self.base_url = base_url self.concurrent_users = concurrent_users self.test_cache = test_cache self.remote_testing = remote_testing self.client = None self.results = {} # Determine if we're testing locally or remotely if remote_testing or not base_url.startswith("http://localhost"): self.remote_testing = True print(f"🌐 Remote testing mode: {base_url}") else: print(f"🏠 Local testing mode: {base_url}") # Only setup Django and create client for local testing setup_django() from django.test import Client self.client = Client() def setup_test_data(self): """Create test data for performance tests""" print("Setting up test data...") # Import Django models only when needed from django.contrib.auth.models import User from ivatar.ivataraccount.models import ConfirmedEmail # Create test users and emails test_emails = [f"perftest{i}@example.com" for i in range(100)] for i, email in enumerate(test_emails): if not User.objects.filter(username=f"perftest{i}").exists(): user = User.objects.create_user( username=f"perftest{i}", email=email, password="testpass123" ) # Create confirmed email ConfirmedEmail.objects.create( user=user, email=email, ip_address="127.0.0.1" ) print(f"Created {len(test_emails)} test users and emails") def test_avatar_generation_performance(self): """Test avatar generation performance""" print("\n=== Avatar Generation Performance Test ===") # Test different avatar types and sizes test_cases = [ {"default": "identicon", "size": 80}, {"default": "monsterid", "size": 80}, {"default": "robohash", "size": 80}, {"default": "identicon", "size": 256}, {"default": "monsterid", "size": 256}, ] results = [] for case in test_cases: # Generate test hash test_email = "perftest@example.com" email_hash = hashlib.md5(test_email.encode()).hexdigest() # Build URL url = f"/avatar/{email_hash}" params = {"d": case["default"], "s": case["size"]} # Time the request start_time = time.time() response = self.client.get(url, params) end_time = time.time() duration = (end_time - start_time) * 1000 # Convert to ms results.append( { "test": f"{case['default']}_{case['size']}px", "duration_ms": duration, "status_code": response.status_code, "content_length": len(response.content) if response.content else 0, } ) print(f" {case['default']} ({case['size']}px): {duration:.2f}ms") # Calculate statistics durations = [r["duration_ms"] for r in results] avg_duration = statistics.mean(durations) max_duration = max(durations) print(f"\n Average: {avg_duration:.2f}ms") print(f" Maximum: {max_duration:.2f}ms") # Performance thresholds if avg_duration > 1000: # 1 second print(" ⚠️ WARNING: Average avatar generation time exceeds 1s") elif avg_duration > 500: # 500ms print(" ⚠️ CAUTION: Average avatar generation time exceeds 500ms") else: print(" ✅ Avatar generation performance is good") self.results["avatar_generation"] = { "average_ms": avg_duration, "maximum_ms": max_duration, "results": results, } def test_concurrent_load(self): """Test concurrent load handling""" print("\n=== Concurrent Load Test ===") num_requests = 20 if self.remote_testing: print(f" Running {num_requests} HTTP requests to {self.base_url}...") results = self._test_remote_concurrent_load(num_requests) else: print(f" Running {num_requests} local avatar generations...") results = self._test_local_concurrent_load(num_requests) # Analyze results successful_requests = [r for r in results if r["success"]] failed_requests = [r for r in results if not r["success"]] total_duration = ( sum(r["duration_ms"] for r in results) / 1000 ) # Convert to seconds print(f" Total time: {total_duration:.2f}s") print(f" Successful requests: {len(successful_requests)}/{num_requests}") print(f" Failed requests: {len(failed_requests)}") if successful_requests: durations = [r["duration_ms"] for r in successful_requests] avg_duration = statistics.mean(durations) # Calculate p95 safely if len(durations) >= 2: try: p95_duration = statistics.quantiles(durations, n=20)[ 18 ] # 95th percentile except (ValueError, IndexError): p95_duration = max(durations) else: p95_duration = max(durations) print(f" Average response time: {avg_duration:.2f}ms") print(f" 95th percentile: {p95_duration:.2f}ms") print( f" Operations per second: {len(successful_requests) / total_duration:.2f}" ) # Performance evaluation if len(failed_requests) > 0: print(" ⚠️ WARNING: Some operations failed under load") elif p95_duration > 2000: # 2 seconds print(" ⚠️ WARNING: 95th percentile response time exceeds 2s") elif avg_duration > 1000: # 1 second print(" ⚠️ CAUTION: Average response time exceeds 1s under load") else: print(" ✅ Load handling is good") else: avg_duration = 0 p95_duration = 0 print(" ❌ All operations failed") self.results["concurrent_load"] = { "total_duration_s": total_duration, "successful_requests": len(successful_requests), "failed_requests": len(failed_requests), "average_ms": avg_duration, "p95_ms": p95_duration, "requests_per_second": ( len(successful_requests) / total_duration if total_duration > 0 else 0 ), } def _test_remote_concurrent_load(self, num_requests): """Test concurrent load against remote server""" import requests # noqa: F401 from concurrent.futures import ThreadPoolExecutor, as_completed def make_remote_request(thread_id): test_email = f"perftest{thread_id % 10}@example.com" email_hash = hashlib.md5(test_email.encode()).hexdigest() url = f"{self.base_url}/avatar/{email_hash}" params = {"d": "identicon", "s": 80} start_time = time.time() try: response = requests.get(url, params=params, timeout=10) end_time = time.time() return { "thread_id": thread_id, "duration_ms": (end_time - start_time) * 1000, "status_code": response.status_code, "success": response.status_code == 200, } except Exception as e: end_time = time.time() return { "thread_id": thread_id, "duration_ms": (end_time - start_time) * 1000, "success": False, "error": str(e), } results = [] with ThreadPoolExecutor(max_workers=self.concurrent_users) as executor: futures = [ executor.submit(make_remote_request, i) for i in range(num_requests) ] for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: print(f" Request failed: {e}") return results def _test_local_concurrent_load(self, num_requests): """Test concurrent load locally using avatar generation functions""" results = [] # Import avatar generation functions try: import Identicon for i in range(num_requests): test_email = f"perftest{i % 10}@example.com" email_hash = hashlib.md5(test_email.encode()).hexdigest() request_start = time.time() try: # Test identicon generation directly identicon_data = Identicon.render(email_hash) request_end = time.time() results.append( { "thread_id": i, "duration_ms": (request_end - request_start) * 1000, "success": len(identicon_data) > 0, } ) except Exception as e: request_end = time.time() results.append( { "thread_id": i, "duration_ms": (request_end - request_start) * 1000, "success": False, "error": str(e), } ) except ImportError: # Fallback: just test database queries print( " Avatar generators not available, testing database queries instead..." ) for i in range(num_requests): request_start = time.time() try: from django.contrib.auth.models import User User.objects.count() request_end = time.time() results.append( { "thread_id": i, "duration_ms": (request_end - request_start) * 1000, "success": True, } ) except Exception as e: request_end = time.time() results.append( { "thread_id": i, "duration_ms": (request_end - request_start) * 1000, "success": False, "error": str(e), } ) return results def test_database_performance(self): """Test database query performance""" print("\n=== Database Performance Test ===") from django.db import connection from django.contrib.auth.models import User from ivatar.ivataraccount.models import ConfirmedEmail, Photo # Reset query log connection.queries_log.clear() test_queries = [ {"name": "User count", "query": lambda: User.objects.count()}, { "name": "Email lookup by digest", "query": lambda: ConfirmedEmail.objects.filter( digest="5d41402abc4b2a76b9719d911017c592" ).first(), }, { "name": "Top 10 photos by access count", "query": lambda: list(Photo.objects.order_by("-access_count")[:10]), }, ] for test in test_queries: start_time = time.time() try: test["query"]() end_time = time.time() duration = (end_time - start_time) * 1000 print(f" {test['name']}: {duration:.2f}ms") if duration > 100: # 100ms threshold print(" ⚠️ WARNING: Query exceeds 100ms threshold") except Exception as e: print(f" {test['name']}: ERROR - {e}") # Check for N+1 queries query_count = len(connection.queries) if query_count > 10: print( f" ⚠️ WARNING: {query_count} database queries executed (potential N+1 problem)" ) else: print(f" ✅ Database query count is reasonable ({query_count} queries)") def test_cache_performance(self): """Test caching effectiveness""" if not self.test_cache: print("\n=== Cache Performance Test ===") print(" ⏭️ Cache testing disabled") return print("\n=== Cache Performance Test ===") # Use an actual email address that exists in the system test_email = "dev@libravatar.org" email_hash = hashlib.md5(test_email.encode()).hexdigest() print(f" Testing with: {test_email}") if self.remote_testing: first_duration, second_duration = self._test_remote_cache_performance( email_hash ) else: first_duration, second_duration = self._test_local_cache_performance( email_hash ) print(f" First request: {first_duration:.2f}ms") print(f" Second request: {second_duration:.2f}ms") improvement_ratio = ( first_duration / second_duration if second_duration > 0 else 0 ) # Analyze cache effectiveness based on headers AND timing cache_working = False cache_status = "unknown" if self.remote_testing and hasattr(self, "cache_info"): # For remote testing, check actual cache headers first_cached = self.cache_info["first_request"]["is_cached"] second_cached = self.cache_info["second_request"]["is_cached"] if not first_cached and second_cached: cache_status = "✅ Cache working correctly (miss → hit)" cache_working = True elif first_cached and second_cached: cache_status = "✅ Cache working (both requests cached)" cache_working = True elif not first_cached and not second_cached: cache_status = "⚠️ No cache hits detected" cache_working = False else: cache_status = "⚠️ Unexpected cache behavior" cache_working = False else: # For local testing, fall back to timing-based analysis if improvement_ratio >= 1.5: cache_status = "✅ Caching appears to be working (timing-based)" cache_working = True else: cache_status = ( "⚠️ Caching may not be working as expected (timing-based)" ) cache_working = False print(f" {cache_status}") if improvement_ratio > 1: print(f" Performance improvement: {improvement_ratio:.1f}x faster") self.results["cache_performance"] = { "first_request_ms": first_duration, "second_request_ms": second_duration, "improvement_ratio": improvement_ratio, "cache_working": cache_working, "cache_status": cache_status, "cache_headers": getattr(self, "cache_info", {}), } def _test_remote_cache_performance(self, email_hash): """Test cache performance against remote server""" import requests url = f"{self.base_url}/avatar/{email_hash}" params = {"d": "identicon", "s": 80} # First request (should be cache miss or fresh) start_time = time.time() response1 = requests.get(url, params=params, timeout=10) first_duration = (time.time() - start_time) * 1000 # Check first request headers first_cache_detail = response1.headers.get("x-cache-detail", "unknown") first_age = response1.headers.get("age", "0") first_cache_control = response1.headers.get("cache-control", "none") print(" First request headers:") print(f" x-cache-detail: {first_cache_detail}") print(f" age: {first_age}") print(f" cache-control: {first_cache_control}") # Small delay to ensure any processing is complete time.sleep(0.1) # Second request (should be cache hit) start_time = time.time() response2 = requests.get(url, params=params, timeout=10) second_duration = (time.time() - start_time) * 1000 # Check second request headers second_cache_detail = response2.headers.get("x-cache-detail", "unknown") second_age = response2.headers.get("age", "0") second_cache_control = response2.headers.get("cache-control", "none") print(" Second request headers:") print(f" x-cache-detail: {second_cache_detail}") print(f" age: {second_age}") print(f" cache-control: {second_cache_control}") # Determine if we actually got cache hits first_is_cached = ( "cache hit" in first_cache_detail.lower() or int(first_age) > 0 ) second_is_cached = ( "cache hit" in second_cache_detail.lower() or int(second_age) > 0 ) print(" Cache analysis:") print( f" First request: {'Cache HIT' if first_is_cached else 'Cache MISS'}" ) print( f" Second request: {'Cache HIT' if second_is_cached else 'Cache MISS'}" ) # Store cache information for analysis self.cache_info = { "first_request": { "cache_detail": first_cache_detail, "age": first_age, "is_cached": first_is_cached, }, "second_request": { "cache_detail": second_cache_detail, "age": second_age, "is_cached": second_is_cached, }, } return first_duration, second_duration def _test_local_cache_performance(self, email_hash): """Test cache performance locally""" url = f"/avatar/{email_hash}" params = {"d": "identicon", "s": 80} # First request (cache miss) start_time = time.time() self.client.get(url, params) first_duration = (time.time() - start_time) * 1000 # Second request (should be cache hit) start_time = time.time() self.client.get(url, params) second_duration = (time.time() - start_time) * 1000 return first_duration, second_duration def run_all_tests(self): """Run all performance tests""" print("Starting Libravatar Performance Tests") print("=" * 50) start_time = time.time() try: # Only setup test data for local testing if not self.remote_testing: self.setup_test_data() # Run tests based on mode if self.remote_testing: print("🌐 Running remote server tests...") self.test_remote_avatar_performance() else: print("🏠 Running local tests...") self.test_avatar_generation_performance() self.test_database_performance() # Always test concurrent load self.test_concurrent_load() # Test cache performance if enabled self.test_cache_performance() end_time = time.time() total_duration = end_time - start_time print("\n" + "=" * 50) print(f"Performance tests completed in {total_duration:.2f}s") # Overall assessment self.assess_overall_performance() return self.results except Exception as e: print(f"Performance test failed: {e}") return None def test_remote_avatar_performance(self): """Test avatar generation performance on remote server""" print("\n=== Remote Avatar Performance Test ===") import requests # Test different avatar types and sizes test_cases = [ {"default": "identicon", "size": 80}, {"default": "monsterid", "size": 80}, {"default": "robohash", "size": 80}, {"default": "identicon", "size": 256}, {"default": "monsterid", "size": 256}, ] results = [] for case in test_cases: # Generate test hash test_email = "perftest@example.com" email_hash = hashlib.md5(test_email.encode()).hexdigest() # Build URL url = f"{self.base_url}/avatar/{email_hash}" params = {"d": case["default"], "s": case["size"]} # Time the request start_time = time.time() try: response = requests.get(url, params=params, timeout=10) end_time = time.time() duration = (end_time - start_time) * 1000 # Convert to ms results.append( { "test": f"{case['default']}_{case['size']}px", "duration_ms": duration, "status_code": response.status_code, "content_length": ( len(response.content) if response.content else 0 ), "success": response.status_code == 200, } ) status = "✅" if response.status_code == 200 else "❌" print( f" {case['default']} ({case['size']}px): {duration:.2f}ms {status}" ) except Exception as e: print(f" {case['default']} ({case['size']}px): ❌ Failed - {e}") results.append( { "test": f"{case['default']}_{case['size']}px", "duration_ms": 0, "status_code": 0, "success": False, "error": str(e), } ) # Calculate statistics for successful requests successful_results = [r for r in results if r["success"]] if successful_results: durations = [r["duration_ms"] for r in successful_results] avg_duration = statistics.mean(durations) max_duration = max(durations) print(f"\n Average: {avg_duration:.2f}ms") print(f" Maximum: {max_duration:.2f}ms") print(f" Success rate: {len(successful_results)}/{len(results)}") # Performance thresholds for remote testing if avg_duration > 2000: # 2 seconds print(" ⚠️ WARNING: Average response time exceeds 2s") elif avg_duration > 1000: # 1 second print(" ⚠️ CAUTION: Average response time exceeds 1s") else: print(" ✅ Remote avatar performance is good") else: avg_duration = 0 max_duration = 0 print(" ❌ All remote requests failed") self.results["avatar_generation"] = { "average_ms": avg_duration, "maximum_ms": max_duration, "results": results, "success_rate": len(successful_results) / len(results) if results else 0, } def assess_overall_performance(self): """Provide overall performance assessment""" print("\n=== OVERALL PERFORMANCE ASSESSMENT ===") warnings = [] # Check avatar generation if "avatar_generation" in self.results: avg_gen = self.results["avatar_generation"]["average_ms"] if avg_gen > 1000: warnings.append(f"Avatar generation is slow ({avg_gen:.0f}ms average)") # Check concurrent load if "concurrent_load" in self.results: failed = self.results["concurrent_load"]["failed_requests"] if failed > 0: warnings.append(f"{failed} requests failed under concurrent load") # Check cache performance if "cache_performance" in self.results: cache_working = self.results["cache_performance"].get( "cache_working", False ) if not cache_working: warnings.append("Caching may not be working effectively") if warnings: print("⚠️ Performance Issues Found:") for warning in warnings: print(f" - {warning}") # noqa: E221 print("\nRecommendations:") print(" - Review database indexes and query optimization") print(" - Check caching configuration") print(" - Consider async processing for heavy operations") else: print("✅ Overall performance is good!") print(" - Avatar generation is responsive") print(" - Application handles concurrent load well") print(" - Caching is working effectively") # Store warnings in results for main function to check self.results["warnings"] = warnings return len(warnings) > 0 def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description="Run Libravatar performance tests") parser.add_argument( "--base-url", default="http://localhost:8000", help="Base URL for testing (default: http://localhost:8000)", ) parser.add_argument( "--concurrent-users", type=int, default=10, help="Number of concurrent users to simulate (default: 10)", ) parser.add_argument("--output", help="Output file for results (JSON)") parser.add_argument( "--no-cache-test", action="store_true", help="Disable cache performance testing (useful for local development)", ) parser.add_argument( "--remote", action="store_true", help="Force remote testing mode (auto-detected for non-localhost URLs)", ) args = parser.parse_args() # Determine if we should test cache performance test_cache = not args.no_cache_test # Determine if we're doing remote testing remote_testing = args.remote or not args.base_url.startswith("http://localhost") runner = PerformanceTestRunner( base_url=args.base_url, concurrent_users=args.concurrent_users, test_cache=test_cache, remote_testing=remote_testing, ) results = runner.run_all_tests() if args.output and results: import json with open(args.output, "w") as f: json.dump(results, f, indent=2) print(f"\nResults saved to {args.output}") # Exit with error code if there were performance issues if results and "warnings" in results and len(results["warnings"]) > 0: sys.exit(1) return results if __name__ == "__main__": main()