From 13165579e8bb32b641912c77df17af0d1dc3b13a Mon Sep 17 00:00:00 2001 From: Oliver Falk Date: Wed, 22 Oct 2025 12:52:29 +0200 Subject: [PATCH] Add performance tests for produciton (merge latest development efforts) --- .gitlab-ci.yml | 85 ++++ ivatar/file_security.py | 12 +- ivatar/ivataraccount/forms.py | 8 +- scripts/check_deployment.py | 2 +- scripts/performance_tests.py | 785 ++++++++++++++++++++++++++++++++++ 5 files changed, 885 insertions(+), 7 deletions(-) create mode 100644 scripts/performance_tests.py diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f7dc390..47fa8ae 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -112,6 +112,91 @@ pages: # - docker build --pull -t "$CI_REGISTRY_IMAGE${tag}" . # - docker push "$CI_REGISTRY_IMAGE${tag}" +# Local performance testing job (runs in CI environment) +performance_tests_local: + stage: test + services: + - postgres:latest + variables: + POSTGRES_DB: django_db + POSTGRES_USER: django_user + POSTGRES_PASSWORD: django_password + POSTGRES_HOST: postgres + DATABASE_URL: "postgres://django_user:django_password@postgres/django_db" + PYTHONUNBUFFERED: 1 + # OpenTelemetry configuration for performance testing + OTEL_EXPORT_ENABLED: "false" + OTEL_SERVICE_NAME: "ivatar-perf-test-local" + OTEL_ENVIRONMENT: "ci-performance" + before_script: + - virtualenv -p python3 /tmp/.virtualenv + - source /tmp/.virtualenv/bin/activate + - pip install -U pip + - pip install Pillow + - pip install -r requirements.txt + - pip install requests # Additional dependency for performance tests + script: + - source /tmp/.virtualenv/bin/activate + - echo 'from ivatar.settings import TEMPLATES' > config_local.py + - echo 'TEMPLATES[0]["OPTIONS"]["debug"] = True' >> config_local.py + - echo "DEBUG = True" >> config_local.py + - echo "from config import CACHES" >> config_local.py + - echo "CACHES['default'] = CACHES['filesystem']" >> config_local.py + - python manage.py migrate + - python manage.py collectstatic --noinput + - echo "Running local performance tests (no cache testing)..." + - python3 scripts/performance_tests.py --no-cache-test --output performance_local.json + artifacts: + paths: + - performance_local.json + expire_in: 7 days + allow_failure: true # Don't fail the pipeline on performance issues, but report them + +# Performance testing against dev server (devel branch only) +performance_tests_dev: + stage: deploy + image: python:3.11-alpine + only: + - devel + variables: + DEV_URL: "https://dev.libravatar.org" + before_script: + - apk add --no-cache curl + - pip install requests + script: + - echo "Running performance tests against dev.libravatar.org..." + - python3 scripts/performance_tests.py --base-url $DEV_URL --concurrent-users 5 --output performance_dev.json + artifacts: + paths: + - performance_dev.json + expire_in: 7 days + allow_failure: true # Don't fail deployment on performance issues + needs: + - verify_dev_deployment # Run after deployment verification + +# Performance testing against production server (master branch only) +performance_tests_prod: + stage: deploy + image: python:3.11-alpine + only: + - master + when: manual # Manual trigger to avoid impacting production unnecessarily + variables: + PROD_URL: "https://libravatar.org" + before_script: + - apk add --no-cache curl + - pip install requests + script: + - echo "Running performance tests against libravatar.org..." + - python3 scripts/performance_tests.py --base-url $PROD_URL --concurrent-users 3 --output performance_prod.json + artifacts: + paths: + - performance_prod.json + expire_in: 30 days # Keep production results longer + allow_failure: true # Don't fail deployment on performance issues + needs: + - verify_prod_deployment # Run after deployment verification + # Deployment verification jobs verify_dev_deployment: stage: deploy diff --git a/ivatar/file_security.py b/ivatar/file_security.py index 3684d00..0a486b5 100644 --- a/ivatar/file_security.py +++ b/ivatar/file_security.py @@ -262,7 +262,9 @@ class FileValidator: if not magic_results["valid"]: results["valid"] = False results["errors"].extend(magic_results["errors"]) - results["security_score"] -= 10 # Reduced from 25 - basic format issue, not security threat + results[ + "security_score" + ] -= 10 # Reduced from 25 - basic format issue, not security threat results["file_info"]["detected_type"] = magic_results["detected_type"] @@ -271,7 +273,9 @@ class FileValidator: if not mime_results["valid"]: results["valid"] = False results["errors"].extend(mime_results["errors"]) - results["security_score"] -= 10 # Reduced from 20 - basic format issue, not security threat + results[ + "security_score" + ] -= 10 # Reduced from 20 - basic format issue, not security threat results["file_info"]["detected_mime"] = mime_results["detected_mime"] results["warnings"].extend(mime_results.get("warnings", [])) @@ -281,7 +285,9 @@ class FileValidator: if not pil_results["valid"]: results["valid"] = False results["errors"].extend(pil_results["errors"]) - results["security_score"] -= 10 # Reduced from 15 - basic format issue, not security threat + results[ + "security_score" + ] -= 10 # Reduced from 15 - basic format issue, not security threat results["file_info"]["image_info"] = pil_results["image_info"] results["warnings"].extend(pil_results.get("warnings", [])) diff --git a/ivatar/ivataraccount/forms.py b/ivatar/ivataraccount/forms.py index 7a31408..b5f686a 100644 --- a/ivatar/ivataraccount/forms.py +++ b/ivatar/ivataraccount/forms.py @@ -126,9 +126,9 @@ class UploadPhotoForm(forms.Form): # Read file data try: # Handle different file types - if hasattr(photo, 'read'): + if hasattr(photo, "read"): file_data = photo.read() - elif hasattr(photo, 'file'): + elif hasattr(photo, "file"): file_data = photo.file.read() else: file_data = bytes(photo) @@ -159,7 +159,9 @@ class UploadPhotoForm(forms.Form): else: # For format issues, don't raise ValidationError - let Photo.save() handle it # This preserves the original error handling behavior - logger.info(f"File format issue detected, allowing Photo.save() to handle: {validation_results['errors']}") + logger.info( + f"File format issue detected, allowing Photo.save() to handle: {validation_results['errors']}" + ) # Store the validation results for potential use, but don't reject the form self.validation_results = validation_results self.file_data = file_data diff --git a/scripts/check_deployment.py b/scripts/check_deployment.py index 45c09d8..d632a55 100755 --- a/scripts/check_deployment.py +++ b/scripts/check_deployment.py @@ -93,7 +93,7 @@ def is_commit_newer_or_equal(commit1: str, commit2: str) -> Optional[bool]: except subprocess.CalledProcessError: # If the above fails, try the reverse - check if commit2 is newer try: - result = subprocess.run( + subprocess.run( ["git", "merge-base", "--is-ancestor", commit1, commit2], capture_output=True, check=True, diff --git a/scripts/performance_tests.py b/scripts/performance_tests.py new file mode 100644 index 0000000..485543e --- /dev/null +++ b/scripts/performance_tests.py @@ -0,0 +1,785 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Performance testing script for Libravatar CI/CD pipeline + +This script runs automated performance tests to catch regressions +and ensure the application meets performance requirements. +""" + +import os +import sys +import time +import statistics +import hashlib + +# Add project root to path +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + +# Django setup - only for local testing +def setup_django(): + """Setup Django for local testing""" + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ivatar.settings") + import django + + django.setup() + + +class PerformanceTestRunner: + """Main performance test runner""" + + def __init__( + self, + base_url="http://localhost:8000", + concurrent_users=10, + test_cache=True, + remote_testing=False, + ): + self.base_url = base_url + self.concurrent_users = concurrent_users + self.test_cache = test_cache + self.remote_testing = remote_testing + self.client = None + self.results = {} + + # Determine if we're testing locally or remotely + if remote_testing or not base_url.startswith("http://localhost"): + self.remote_testing = True + print(f"🌐 Remote testing mode: {base_url}") + else: + print(f"🏠 Local testing mode: {base_url}") + # Only setup Django and create client for local testing + setup_django() + from django.test import Client + + self.client = Client() + + def setup_test_data(self): + """Create test data for performance tests""" + print("Setting up test data...") + + # Import Django models only when needed + from django.contrib.auth.models import User + from ivatar.ivataraccount.models import ConfirmedEmail + + # Create test users and emails + test_emails = [f"perftest{i}@example.com" for i in range(100)] + + for i, email in enumerate(test_emails): + if not User.objects.filter(username=f"perftest{i}").exists(): + user = User.objects.create_user( + username=f"perftest{i}", email=email, password="testpass123" + ) + + # Create confirmed email + ConfirmedEmail.objects.create( + user=user, email=email, ip_address="127.0.0.1" + ) + + print(f"Created {len(test_emails)} test users and emails") + + def test_avatar_generation_performance(self): + """Test avatar generation performance""" + print("\n=== Avatar Generation Performance Test ===") + + # Test different avatar types and sizes + test_cases = [ + {"default": "identicon", "size": 80}, + {"default": "monsterid", "size": 80}, + {"default": "robohash", "size": 80}, + {"default": "identicon", "size": 256}, + {"default": "monsterid", "size": 256}, + ] + + results = [] + + for case in test_cases: + # Generate test hash + test_email = "perftest@example.com" + email_hash = hashlib.md5(test_email.encode()).hexdigest() + + # Build URL + url = f"/avatar/{email_hash}" + params = {"d": case["default"], "s": case["size"]} + + # Time the request + start_time = time.time() + response = self.client.get(url, params) + end_time = time.time() + + duration = (end_time - start_time) * 1000 # Convert to ms + + results.append( + { + "test": f"{case['default']}_{case['size']}px", + "duration_ms": duration, + "status_code": response.status_code, + "content_length": len(response.content) if response.content else 0, + } + ) + + print(f" {case['default']} ({case['size']}px): {duration:.2f}ms") + + # Calculate statistics + durations = [r["duration_ms"] for r in results] + avg_duration = statistics.mean(durations) + max_duration = max(durations) + + print(f"\n Average: {avg_duration:.2f}ms") + print(f" Maximum: {max_duration:.2f}ms") + + # Performance thresholds + if avg_duration > 1000: # 1 second + print(" ⚠️ WARNING: Average avatar generation time exceeds 1s") + elif avg_duration > 500: # 500ms + print(" ⚠️ CAUTION: Average avatar generation time exceeds 500ms") + else: + print(" ✅ Avatar generation performance is good") + + self.results["avatar_generation"] = { + "average_ms": avg_duration, + "maximum_ms": max_duration, + "results": results, + } + + def test_concurrent_load(self): + """Test concurrent load handling""" + print("\n=== Concurrent Load Test ===") + + num_requests = 20 + + if self.remote_testing: + print(f" Running {num_requests} HTTP requests to {self.base_url}...") + results = self._test_remote_concurrent_load(num_requests) + else: + print(f" Running {num_requests} local avatar generations...") + results = self._test_local_concurrent_load(num_requests) + + # Analyze results + successful_requests = [r for r in results if r["success"]] + failed_requests = [r for r in results if not r["success"]] + + total_duration = ( + sum(r["duration_ms"] for r in results) / 1000 + ) # Convert to seconds + + print(f" Total time: {total_duration:.2f}s") + print(f" Successful requests: {len(successful_requests)}/{num_requests}") + print(f" Failed requests: {len(failed_requests)}") + + if successful_requests: + durations = [r["duration_ms"] for r in successful_requests] + avg_duration = statistics.mean(durations) + + # Calculate p95 safely + if len(durations) >= 2: + try: + p95_duration = statistics.quantiles(durations, n=20)[ + 18 + ] # 95th percentile + except (ValueError, IndexError): + p95_duration = max(durations) + else: + p95_duration = max(durations) + + print(f" Average response time: {avg_duration:.2f}ms") + print(f" 95th percentile: {p95_duration:.2f}ms") + print( + f" Operations per second: {len(successful_requests) / total_duration:.2f}" + ) + + # Performance evaluation + if len(failed_requests) > 0: + print(" ⚠️ WARNING: Some operations failed under load") + elif p95_duration > 2000: # 2 seconds + print(" ⚠️ WARNING: 95th percentile response time exceeds 2s") + elif avg_duration > 1000: # 1 second + print(" ⚠️ CAUTION: Average response time exceeds 1s under load") + else: + print(" ✅ Load handling is good") + else: + avg_duration = 0 + p95_duration = 0 + print(" ❌ All operations failed") + + self.results["concurrent_load"] = { + "total_duration_s": total_duration, + "successful_requests": len(successful_requests), + "failed_requests": len(failed_requests), + "average_ms": avg_duration, + "p95_ms": p95_duration, + "requests_per_second": ( + len(successful_requests) / total_duration if total_duration > 0 else 0 + ), + } + + def _test_remote_concurrent_load(self, num_requests): + """Test concurrent load against remote server""" + import requests # noqa: F401 + from concurrent.futures import ThreadPoolExecutor, as_completed + + def make_remote_request(thread_id): + test_email = f"perftest{thread_id % 10}@example.com" + email_hash = hashlib.md5(test_email.encode()).hexdigest() + url = f"{self.base_url}/avatar/{email_hash}" + params = {"d": "identicon", "s": 80} + + start_time = time.time() + try: + response = requests.get(url, params=params, timeout=10) + end_time = time.time() + + return { + "thread_id": thread_id, + "duration_ms": (end_time - start_time) * 1000, + "status_code": response.status_code, + "success": response.status_code == 200, + } + except Exception as e: + end_time = time.time() + return { + "thread_id": thread_id, + "duration_ms": (end_time - start_time) * 1000, + "success": False, + "error": str(e), + } + + results = [] + with ThreadPoolExecutor(max_workers=self.concurrent_users) as executor: + futures = [ + executor.submit(make_remote_request, i) for i in range(num_requests) + ] + + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + print(f" Request failed: {e}") + + return results + + def _test_local_concurrent_load(self, num_requests): + """Test concurrent load locally using avatar generation functions""" + results = [] + + # Import avatar generation functions + try: + import Identicon + + for i in range(num_requests): + test_email = f"perftest{i % 10}@example.com" + email_hash = hashlib.md5(test_email.encode()).hexdigest() + + request_start = time.time() + try: + # Test identicon generation directly + identicon_data = Identicon.render(email_hash) + request_end = time.time() + + results.append( + { + "thread_id": i, + "duration_ms": (request_end - request_start) * 1000, + "success": len(identicon_data) > 0, + } + ) + except Exception as e: + request_end = time.time() + results.append( + { + "thread_id": i, + "duration_ms": (request_end - request_start) * 1000, + "success": False, + "error": str(e), + } + ) + + except ImportError: + # Fallback: just test database queries + print( + " Avatar generators not available, testing database queries instead..." + ) + for i in range(num_requests): + request_start = time.time() + try: + from django.contrib.auth.models import User + + User.objects.count() + request_end = time.time() + + results.append( + { + "thread_id": i, + "duration_ms": (request_end - request_start) * 1000, + "success": True, + } + ) + except Exception as e: + request_end = time.time() + results.append( + { + "thread_id": i, + "duration_ms": (request_end - request_start) * 1000, + "success": False, + "error": str(e), + } + ) + + return results + + def test_database_performance(self): + """Test database query performance""" + print("\n=== Database Performance Test ===") + + from django.db import connection + from django.contrib.auth.models import User + from ivatar.ivataraccount.models import ConfirmedEmail, Photo + + # Reset query log + connection.queries_log.clear() + + test_queries = [ + {"name": "User count", "query": lambda: User.objects.count()}, + { + "name": "Email lookup by digest", + "query": lambda: ConfirmedEmail.objects.filter( + digest="5d41402abc4b2a76b9719d911017c592" + ).first(), + }, + { + "name": "Top 10 photos by access count", + "query": lambda: list(Photo.objects.order_by("-access_count")[:10]), + }, + ] + + for test in test_queries: + start_time = time.time() + try: + test["query"]() + end_time = time.time() + duration = (end_time - start_time) * 1000 + + print(f" {test['name']}: {duration:.2f}ms") + + if duration > 100: # 100ms threshold + print(" ⚠️ WARNING: Query exceeds 100ms threshold") + + except Exception as e: + print(f" {test['name']}: ERROR - {e}") + + # Check for N+1 queries + query_count = len(connection.queries) + if query_count > 10: + print( + f" ⚠️ WARNING: {query_count} database queries executed (potential N+1 problem)" + ) + else: + print(f" ✅ Database query count is reasonable ({query_count} queries)") + + def test_cache_performance(self): + """Test caching effectiveness""" + if not self.test_cache: + print("\n=== Cache Performance Test ===") + print(" ⏭️ Cache testing disabled") + return + + print("\n=== Cache Performance Test ===") + + # Use an actual email address that exists in the system + test_email = "dev@libravatar.org" + email_hash = hashlib.md5(test_email.encode()).hexdigest() + print(f" Testing with: {test_email}") + + if self.remote_testing: + first_duration, second_duration = self._test_remote_cache_performance( + email_hash + ) + else: + first_duration, second_duration = self._test_local_cache_performance( + email_hash + ) + + print(f" First request: {first_duration:.2f}ms") + print(f" Second request: {second_duration:.2f}ms") + + improvement_ratio = ( + first_duration / second_duration if second_duration > 0 else 0 + ) + + # Analyze cache effectiveness based on headers AND timing + cache_working = False + cache_status = "unknown" + + if self.remote_testing and hasattr(self, "cache_info"): + # For remote testing, check actual cache headers + first_cached = self.cache_info["first_request"]["is_cached"] + second_cached = self.cache_info["second_request"]["is_cached"] + + if not first_cached and second_cached: + cache_status = "✅ Cache working correctly (miss → hit)" + cache_working = True + elif first_cached and second_cached: + cache_status = "✅ Cache working (both requests cached)" + cache_working = True + elif not first_cached and not second_cached: + cache_status = "⚠️ No cache hits detected" + cache_working = False + else: + cache_status = "⚠️ Unexpected cache behavior" + cache_working = False + else: + # For local testing, fall back to timing-based analysis + if improvement_ratio >= 1.5: + cache_status = "✅ Caching appears to be working (timing-based)" + cache_working = True + else: + cache_status = ( + "⚠️ Caching may not be working as expected (timing-based)" + ) + cache_working = False + + print(f" {cache_status}") + if improvement_ratio > 1: + print(f" Performance improvement: {improvement_ratio:.1f}x faster") + + self.results["cache_performance"] = { + "first_request_ms": first_duration, + "second_request_ms": second_duration, + "improvement_ratio": improvement_ratio, + "cache_working": cache_working, + "cache_status": cache_status, + "cache_headers": getattr(self, "cache_info", {}), + } + + def _test_remote_cache_performance(self, email_hash): + """Test cache performance against remote server""" + import requests + + url = f"{self.base_url}/avatar/{email_hash}" + params = {"d": "identicon", "s": 80} + + # First request (should be cache miss or fresh) + start_time = time.time() + response1 = requests.get(url, params=params, timeout=10) + first_duration = (time.time() - start_time) * 1000 + + # Check first request headers + first_cache_detail = response1.headers.get("x-cache-detail", "unknown") + first_age = response1.headers.get("age", "0") + first_cache_control = response1.headers.get("cache-control", "none") + + print(" First request headers:") + print(f" x-cache-detail: {first_cache_detail}") + print(f" age: {first_age}") + print(f" cache-control: {first_cache_control}") + + # Small delay to ensure any processing is complete + time.sleep(0.1) + + # Second request (should be cache hit) + start_time = time.time() + response2 = requests.get(url, params=params, timeout=10) + second_duration = (time.time() - start_time) * 1000 + + # Check second request headers + second_cache_detail = response2.headers.get("x-cache-detail", "unknown") + second_age = response2.headers.get("age", "0") + second_cache_control = response2.headers.get("cache-control", "none") + + print(" Second request headers:") + print(f" x-cache-detail: {second_cache_detail}") + print(f" age: {second_age}") + print(f" cache-control: {second_cache_control}") + + # Determine if we actually got cache hits + first_is_cached = ( + "cache hit" in first_cache_detail.lower() or int(first_age) > 0 + ) + second_is_cached = ( + "cache hit" in second_cache_detail.lower() or int(second_age) > 0 + ) + + print(" Cache analysis:") + print( + f" First request: {'Cache HIT' if first_is_cached else 'Cache MISS'}" + ) + print( + f" Second request: {'Cache HIT' if second_is_cached else 'Cache MISS'}" + ) + + # Store cache information for analysis + self.cache_info = { + "first_request": { + "cache_detail": first_cache_detail, + "age": first_age, + "is_cached": first_is_cached, + }, + "second_request": { + "cache_detail": second_cache_detail, + "age": second_age, + "is_cached": second_is_cached, + }, + } + + return first_duration, second_duration + + def _test_local_cache_performance(self, email_hash): + """Test cache performance locally""" + url = f"/avatar/{email_hash}" + params = {"d": "identicon", "s": 80} + + # First request (cache miss) + start_time = time.time() + self.client.get(url, params) + first_duration = (time.time() - start_time) * 1000 + + # Second request (should be cache hit) + start_time = time.time() + self.client.get(url, params) + second_duration = (time.time() - start_time) * 1000 + + return first_duration, second_duration + + def run_all_tests(self): + """Run all performance tests""" + print("Starting Libravatar Performance Tests") + print("=" * 50) + + start_time = time.time() + + try: + # Only setup test data for local testing + if not self.remote_testing: + self.setup_test_data() + + # Run tests based on mode + if self.remote_testing: + print("🌐 Running remote server tests...") + self.test_remote_avatar_performance() + else: + print("🏠 Running local tests...") + self.test_avatar_generation_performance() + self.test_database_performance() + + # Always test concurrent load + self.test_concurrent_load() + + # Test cache performance if enabled + self.test_cache_performance() + + end_time = time.time() + total_duration = end_time - start_time + + print("\n" + "=" * 50) + print(f"Performance tests completed in {total_duration:.2f}s") + + # Overall assessment + self.assess_overall_performance() + + return self.results + + except Exception as e: + print(f"Performance test failed: {e}") + return None + + def test_remote_avatar_performance(self): + """Test avatar generation performance on remote server""" + print("\n=== Remote Avatar Performance Test ===") + + import requests + + # Test different avatar types and sizes + test_cases = [ + {"default": "identicon", "size": 80}, + {"default": "monsterid", "size": 80}, + {"default": "robohash", "size": 80}, + {"default": "identicon", "size": 256}, + {"default": "monsterid", "size": 256}, + ] + + results = [] + + for case in test_cases: + # Generate test hash + test_email = "perftest@example.com" + email_hash = hashlib.md5(test_email.encode()).hexdigest() + + # Build URL + url = f"{self.base_url}/avatar/{email_hash}" + params = {"d": case["default"], "s": case["size"]} + + # Time the request + start_time = time.time() + try: + response = requests.get(url, params=params, timeout=10) + end_time = time.time() + + duration = (end_time - start_time) * 1000 # Convert to ms + + results.append( + { + "test": f"{case['default']}_{case['size']}px", + "duration_ms": duration, + "status_code": response.status_code, + "content_length": ( + len(response.content) if response.content else 0 + ), + "success": response.status_code == 200, + } + ) + + status = "✅" if response.status_code == 200 else "❌" + print( + f" {case['default']} ({case['size']}px): {duration:.2f}ms {status}" + ) + + except Exception as e: + print(f" {case['default']} ({case['size']}px): ❌ Failed - {e}") + results.append( + { + "test": f"{case['default']}_{case['size']}px", + "duration_ms": 0, + "status_code": 0, + "success": False, + "error": str(e), + } + ) + + # Calculate statistics for successful requests + successful_results = [r for r in results if r["success"]] + if successful_results: + durations = [r["duration_ms"] for r in successful_results] + avg_duration = statistics.mean(durations) + max_duration = max(durations) + + print(f"\n Average: {avg_duration:.2f}ms") + print(f" Maximum: {max_duration:.2f}ms") + print(f" Success rate: {len(successful_results)}/{len(results)}") + + # Performance thresholds for remote testing + if avg_duration > 2000: # 2 seconds + print(" ⚠️ WARNING: Average response time exceeds 2s") + elif avg_duration > 1000: # 1 second + print(" ⚠️ CAUTION: Average response time exceeds 1s") + else: + print(" ✅ Remote avatar performance is good") + else: + avg_duration = 0 + max_duration = 0 + print(" ❌ All remote requests failed") + + self.results["avatar_generation"] = { + "average_ms": avg_duration, + "maximum_ms": max_duration, + "results": results, + "success_rate": len(successful_results) / len(results) if results else 0, + } + + def assess_overall_performance(self): + """Provide overall performance assessment""" + print("\n=== OVERALL PERFORMANCE ASSESSMENT ===") + + warnings = [] + + # Check avatar generation + if "avatar_generation" in self.results: + avg_gen = self.results["avatar_generation"]["average_ms"] + if avg_gen > 1000: + warnings.append(f"Avatar generation is slow ({avg_gen:.0f}ms average)") + + # Check concurrent load + if "concurrent_load" in self.results: + failed = self.results["concurrent_load"]["failed_requests"] + if failed > 0: + warnings.append(f"{failed} requests failed under concurrent load") + + # Check cache performance + if "cache_performance" in self.results: + cache_working = self.results["cache_performance"].get( + "cache_working", False + ) + if not cache_working: + warnings.append("Caching may not be working effectively") + + if warnings: + print("⚠️ Performance Issues Found:") + for warning in warnings: + print(f" - {warning}") # noqa: E221 + print("\nRecommendations:") + print(" - Review database indexes and query optimization") + print(" - Check caching configuration") + print(" - Consider async processing for heavy operations") + else: + print("✅ Overall performance is good!") + print(" - Avatar generation is responsive") + print(" - Application handles concurrent load well") + print(" - Caching is working effectively") + + # Store warnings in results for main function to check + self.results["warnings"] = warnings + return len(warnings) > 0 + + +def main(): + """Main entry point""" + import argparse + + parser = argparse.ArgumentParser(description="Run Libravatar performance tests") + parser.add_argument( + "--base-url", + default="http://localhost:8000", + help="Base URL for testing (default: http://localhost:8000)", + ) + parser.add_argument( + "--concurrent-users", + type=int, + default=10, + help="Number of concurrent users to simulate (default: 10)", + ) + parser.add_argument("--output", help="Output file for results (JSON)") + parser.add_argument( + "--no-cache-test", + action="store_true", + help="Disable cache performance testing (useful for local development)", + ) + parser.add_argument( + "--remote", + action="store_true", + help="Force remote testing mode (auto-detected for non-localhost URLs)", + ) + + args = parser.parse_args() + + # Determine if we should test cache performance + test_cache = not args.no_cache_test + + # Determine if we're doing remote testing + remote_testing = args.remote or not args.base_url.startswith("http://localhost") + + runner = PerformanceTestRunner( + base_url=args.base_url, + concurrent_users=args.concurrent_users, + test_cache=test_cache, + remote_testing=remote_testing, + ) + + results = runner.run_all_tests() + + if args.output and results: + import json + + with open(args.output, "w") as f: + json.dump(results, f, indent=2) + print(f"\nResults saved to {args.output}") + + # Exit with error code if there were performance issues + if results and "warnings" in results and len(results["warnings"]) > 0: + sys.exit(1) + + return results + + +if __name__ == "__main__": + main()