Files
ivatar/scripts/performance_tests.py
2025-10-24 13:51:45 +02:00

1060 lines
40 KiB
Python

#!/usr/bin/env python3
"""
Performance testing script for Libravatar CI/CD pipeline
This script runs automated performance tests to catch regressions
and ensure the application meets performance requirements.
"""
import os
import sys
import time
import statistics
import hashlib
import random
import string
from typing import Dict, List, Any, Optional, Tuple
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from libravatar import libravatar_url
from urllib.parse import urlsplit
from prettytable import PrettyTable
def random_string(length=10):
"""Return some random string with default length 10"""
return "".join(
random.SystemRandom().choice(string.ascii_lowercase + string.digits)
for _ in range(length)
)
# Try to import Django utilities for local testing, fallback to local implementation
try:
from ivatar.utils import generate_random_email
except ImportError:
# Use local version for external testing
def generate_random_email():
"""Generate a random email address using the same pattern as test_views.py"""
username = random_string()
domain = random_string()
tld = random_string(2)
return f"{username}@{domain}.{tld}"
# Django setup - only for local testing
def setup_django() -> None:
"""Setup Django for local testing"""
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ivatar.settings")
import django
django.setup()
class PerformanceTestRunner:
"""Main performance test runner"""
# Define all avatar styles and sizes to test
AVATAR_STYLES: List[str] = [
"identicon",
"monsterid",
"robohash",
"pagan",
"retro",
"wavatar",
"mm",
"mmng",
]
AVATAR_SIZES: List[int] = [80, 256]
def __init__(
self,
base_url: str = "http://localhost:8000",
concurrent_users: int = 10,
test_cache: bool = True,
remote_testing: bool = False,
) -> None:
self.base_url: str = base_url
self.concurrent_users: int = concurrent_users
self.test_cache: bool = test_cache
self.remote_testing: bool = remote_testing
self.client: Optional[Any] = None # Django test client
self.results: Dict[str, Any] = {}
# Determine if we're testing locally or remotely
if remote_testing or not base_url.startswith("http://localhost"):
self.remote_testing = True
print(f"🌐 Remote testing mode: {base_url}")
else:
print(f"🏠 Local testing mode: {base_url}")
# Only setup Django and create client for local testing
setup_django()
from django.test import Client
self.client = Client()
def setup_test_data(self) -> None:
"""Create test data for performance tests"""
print("Setting up test data...")
# Import Django models only when needed
from django.contrib.auth.models import User
from ivatar.ivataraccount.models import ConfirmedEmail
# Create test users and emails
test_emails = [f"perftest{i}@example.com" for i in range(100)]
for i, email in enumerate(test_emails):
if not User.objects.filter(username=f"perftest{i}").exists():
user = User.objects.create_user(
username=f"perftest{i}", email=email, password="testpass123"
)
# Create confirmed email
ConfirmedEmail.objects.create(
user=user, email=email, ip_address="127.0.0.1"
)
print(f"Created {len(test_emails)} test users and emails")
def _generate_test_cases(self) -> List[Dict[str, Any]]:
"""Generate test cases for all avatar styles and sizes"""
test_cases = []
for style in self.AVATAR_STYLES:
for size in self.AVATAR_SIZES:
test_cases.append({"default": style, "size": size})
return test_cases
def _test_single_avatar_request(
self, case: Dict[str, Any], email: str, use_requests: bool = False
) -> Dict[str, Any]:
"""Test a single avatar request - shared logic for local and remote testing"""
# Use libravatar library to generate the URL
full_url = libravatar_url(
email=email, size=case["size"], default=case["default"]
)
# Extract path and query from the full URL
urlobj = urlsplit(full_url)
url_path = f"{urlobj.path}?{urlobj.query}"
start_time = time.time()
if use_requests:
# Remote testing with requests
import requests
url = f"{self.base_url}{url_path}"
try:
response = requests.get(url, timeout=10)
end_time = time.time()
duration = (end_time - start_time) * 1000
# Determine cache status from response headers
cache_detail = response.headers.get("x-cache-detail", "").lower()
age = response.headers.get("age", "0")
cache_status = "unknown"
if "cache hit" in cache_detail or int(age) > 0:
cache_status = "hit"
elif "cache miss" in cache_detail or age == "0":
cache_status = "miss"
return {
"test": f"{case['default']}_{case['size']}px",
"duration_ms": duration,
"status_code": response.status_code,
"content_length": len(response.content) if response.content else 0,
"success": response.status_code == 200,
"cache_status": cache_status,
"cache_detail": cache_detail,
"age": age,
"full_url": full_url,
"email": email,
}
except Exception as e:
end_time = time.time()
duration = (end_time - start_time) * 1000
return {
"test": f"{case['default']}_{case['size']}px",
"duration_ms": duration,
"status_code": 0,
"success": False,
"error": str(e),
"cache_status": "error",
"full_url": full_url,
"email": email,
}
else:
# Local testing with Django test client
if self.client is None:
raise RuntimeError("Django test client not initialized")
response = self.client.get(url_path, follow=True)
end_time = time.time()
duration = (end_time - start_time) * 1000
# Check for cache information in response headers
cache_status = "unknown"
if hasattr(response, "get") and callable(getattr(response, "get", None)):
cache_control = response.get("Cache-Control", "")
age = response.get("Age", "0")
if age and int(age) > 0:
cache_status = "hit"
elif "no-cache" in cache_control:
cache_status = "miss"
else:
cache_status = "miss" # Default assumption for first generation
# Handle content length for different response types
content_length = 0
if hasattr(response, "content"):
content_length = len(response.content) if response.content else 0
elif hasattr(response, "streaming_content"):
# For FileResponse, we can't easily get content length without consuming the stream
content_length = 1 # Just indicate there's content
return {
"test": f"{case['default']}_{case['size']}px",
"duration_ms": duration,
"status_code": response.status_code,
"content_length": content_length,
"cache_status": cache_status,
"success": response.status_code == 200,
"full_url": full_url,
"email": email,
}
def _display_avatar_results(self, results: List[Dict[str, Any]]) -> None:
"""Display avatar test results using prettytable for perfect alignment"""
# Group results by avatar style
style_results: Dict[str, List[Dict[str, Any]]] = {}
for result in results:
style = result["test"].split("_")[0] # Extract style from test name
if style not in style_results:
style_results[style] = []
style_results[style].append(result)
# Create table
table = PrettyTable()
table.field_names = ["Avatar Style", "Size", "Time (ms)", "Status", "Cache"]
table.align["Avatar Style"] = "l"
table.align["Size"] = "r"
table.align["Time (ms)"] = "r"
table.align["Status"] = "c"
table.align["Cache"] = "c"
# Add data to table
styles_with_data = [
style for style in self.AVATAR_STYLES if style in style_results
]
for i, style in enumerate(styles_with_data):
style_data = style_results[style]
successful_results = [r for r in style_data if r.get("success", True)]
failed_results = [r for r in style_data if not r.get("success", True)]
if successful_results:
# Calculate average
avg_duration = statistics.mean(
[r["duration_ms"] for r in successful_results]
)
# Determine overall cache status
cache_statuses = [
r["cache_status"]
for r in successful_results
if r["cache_status"] != "unknown"
]
if not cache_statuses:
cache_summary = "unknown"
elif all(status == "hit" for status in cache_statuses):
cache_summary = "hit"
elif all(status == "miss" for status in cache_statuses):
cache_summary = "miss"
else:
cache_summary = "mixed"
# Determine status icon for average line
if len(failed_results) == 0:
avg_status_icon = "" # All successful
elif len(successful_results) == 0:
avg_status_icon = "" # All failed
else:
avg_status_icon = "⚠️" # Mixed results
# Add average row
table.add_row(
[
f"{style} (avg)",
"",
f"{avg_duration:.2f}",
avg_status_icon,
cache_summary,
]
)
# Add individual size rows
for result in style_data:
size = result["test"].split("_")[1] # Extract size from test name
status_icon = "" if result.get("success", True) else ""
cache_status = result["cache_status"]
if result.get("success", True):
table.add_row(
[
"",
size,
f"{result['duration_ms']:.2f}",
status_icon,
cache_status,
]
)
else:
error_msg = result.get("error", "Failed")
table.add_row(["", size, error_msg, status_icon, cache_status])
else:
# All requests failed
table.add_row([f"{style} (avg)", "", "Failed", "", "error"])
for result in style_data:
size = result["test"].split("_")[1]
error_msg = result.get("error", "Failed")
table.add_row(["", size, error_msg, "", "error"])
# Add divider line between styles (except after the last style)
if i < len(styles_with_data) - 1:
table.add_row(["-" * 15, "-" * 5, "-" * 9, "-" * 6, "-" * 5])
print(table)
def test_avatar_generation_performance(self) -> None:
"""Test avatar generation performance"""
print("\n=== Avatar Generation Performance Test ===")
# Generate test cases for all avatar styles and sizes
test_cases = self._generate_test_cases()
results = []
# Generate random email for testing
test_email = generate_random_email()
print(f" Testing with email: {test_email}")
for case in test_cases:
result = self._test_single_avatar_request(
case, test_email, use_requests=False
)
results.append(result)
# Show example URL from first result
if results:
print(f" Example URL: {results[0]['full_url']}")
# Display results grouped by style
self._display_avatar_results(results)
# Calculate statistics
successful_results = [r for r in results if r.get("success", True)]
if successful_results:
durations = [r["duration_ms"] for r in successful_results]
avg_duration = statistics.mean(durations)
max_duration = max(durations)
else:
avg_duration = 0
max_duration = 0
print(f"\n Average: {avg_duration:.2f}ms")
print(f" Maximum: {max_duration:.2f}ms")
# Performance thresholds
if avg_duration > 1000: # 1 second
print(" ⚠️ WARNING: Average avatar generation time exceeds 1s")
elif avg_duration > 500: # 500ms
print(" ⚠️ CAUTION: Average avatar generation time exceeds 500ms")
else:
print(" ✅ Avatar generation performance is good")
self.results["avatar_generation"] = {
"average_ms": avg_duration,
"maximum_ms": max_duration,
"results": results,
}
def test_concurrent_load(self, response_threshold: int = 1000, p95_threshold: int = 2000) -> None:
"""Test concurrent load handling"""
print("\n=== Concurrent Load Test ===")
num_requests = 20
if self.remote_testing:
print(f" Running {num_requests} HTTP requests to {self.base_url}...")
results = self._test_remote_concurrent_load(num_requests)
else:
print(f" Running {num_requests} local avatar generations...")
results = self._test_local_concurrent_load(num_requests)
# Analyze results
successful_requests = [r for r in results if r["success"]]
failed_requests = [r for r in results if not r["success"]]
# Analyze cache performance
cache_hits = [r for r in results if r.get("cache_status") == "hit"]
cache_misses = [r for r in results if r.get("cache_status") == "miss"]
cache_errors = [r for r in results if r.get("cache_status") == "error"]
total_duration = (
sum(r["duration_ms"] for r in results) / 1000
) # Convert to seconds
print(f" Total time: {total_duration:.2f}s")
print(f" Successful requests: {len(successful_requests)}/{num_requests}")
print(f" Failed requests: {len(failed_requests)}")
# Show cache statistics if available
if cache_hits or cache_misses:
print(f" Cache hits: {len(cache_hits)}")
print(f" Cache misses: {len(cache_misses)}")
if cache_errors:
print(f" Cache errors: {len(cache_errors)}")
cache_hit_rate = (
len(cache_hits) / (len(cache_hits) + len(cache_misses)) * 100
if (cache_hits or cache_misses)
else 0
)
print(f" Cache hit rate: {cache_hit_rate:.1f}%")
if successful_requests:
durations = [r["duration_ms"] for r in successful_requests]
avg_duration = statistics.mean(durations)
# Calculate p95 safely
if len(durations) >= 2:
try:
p95_duration = statistics.quantiles(durations, n=20)[
18
] # 95th percentile
except (ValueError, IndexError):
p95_duration = max(durations)
else:
p95_duration = max(durations)
print(f" Average response time: {avg_duration:.2f}ms")
print(f" 95th percentile: {p95_duration:.2f}ms")
print(
f" Operations per second: {len(successful_requests) / total_duration:.2f}"
)
# Performance evaluation
if len(failed_requests) > 0:
print(" ⚠️ WARNING: Some operations failed under load")
elif p95_duration > p95_threshold:
print(f" ⚠️ WARNING: 95th percentile response time exceeds {p95_threshold}ms")
elif avg_duration > response_threshold:
print(f" ⚠️ CAUTION: Average response time exceeds {response_threshold}ms under load")
else:
print(" ✅ Load handling is good")
else:
avg_duration = 0
p95_duration = 0
print(" ❌ All operations failed")
self.results["concurrent_load"] = {
"total_duration_s": total_duration,
"successful_requests": len(successful_requests),
"failed_requests": len(failed_requests),
"average_ms": avg_duration,
"p95_ms": p95_duration,
"requests_per_second": (
len(successful_requests) / total_duration if total_duration > 0 else 0
),
"cache_hits": len(cache_hits),
"cache_misses": len(cache_misses),
"cache_errors": len(cache_errors),
"cache_hit_rate": (
len(cache_hits) / (len(cache_hits) + len(cache_misses)) * 100
if (cache_hits or cache_misses)
else 0
),
}
def _test_remote_concurrent_load(self, num_requests: int) -> List[Dict[str, Any]]:
"""Test concurrent load against remote server"""
import requests # noqa: F401
from concurrent.futures import ThreadPoolExecutor, as_completed
def make_remote_request(thread_id):
test_email = generate_random_email()
# Use libravatar library to generate the URL
full_url = libravatar_url(email=test_email, size=80, default="identicon")
urlobj = urlsplit(full_url)
url_path = f"{urlobj.path}?{urlobj.query}"
url = f"{self.base_url}{url_path}"
start_time = time.time()
try:
response = requests.get(url, timeout=10)
end_time = time.time()
# Determine cache status
cache_detail = response.headers.get("x-cache-detail", "").lower()
age = response.headers.get("age", "0")
cache_status = "unknown"
if "cache hit" in cache_detail or int(age) > 0:
cache_status = "hit"
elif "cache miss" in cache_detail or age == "0":
cache_status = "miss"
return {
"thread_id": thread_id,
"duration_ms": (end_time - start_time) * 1000,
"status_code": response.status_code,
"success": response.status_code == 200,
"cache_status": cache_status,
}
except Exception as e:
end_time = time.time()
return {
"thread_id": thread_id,
"duration_ms": (end_time - start_time) * 1000,
"success": False,
"error": str(e),
"cache_status": "error",
}
results = []
with ThreadPoolExecutor(max_workers=self.concurrent_users) as executor:
futures = [
executor.submit(make_remote_request, i) for i in range(num_requests)
]
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f" Request failed: {e}")
return results
def _test_local_concurrent_load(self, num_requests: int) -> List[Dict[str, Any]]:
"""Test concurrent load locally using avatar generation functions"""
results = []
# Import avatar generation functions
try:
import Identicon
for i in range(num_requests):
test_email = generate_random_email()
email_hash = hashlib.md5(test_email.encode()).hexdigest()
request_start = time.time()
try:
# Test identicon generation directly
identicon_data = Identicon.render(email_hash)
request_end = time.time()
results.append(
{
"thread_id": i,
"duration_ms": (request_end - request_start) * 1000,
"success": len(identicon_data) > 0,
"cache_status": "miss", # Direct generation is always a cache miss
}
)
except Exception as e:
request_end = time.time()
results.append(
{
"thread_id": i,
"duration_ms": (request_end - request_start) * 1000,
"success": False,
"error": str(e),
"cache_status": "error",
}
)
except ImportError:
# Fallback: just test database queries
print(
" Avatar generators not available, testing database queries instead..."
)
for i in range(num_requests):
request_start = time.time()
try:
from django.contrib.auth.models import User
User.objects.count()
request_end = time.time()
results.append(
{
"thread_id": i,
"duration_ms": (request_end - request_start) * 1000,
"success": True,
"cache_status": "n/a", # Database queries don't use image cache
}
)
except Exception as e:
request_end = time.time()
results.append(
{
"thread_id": i,
"duration_ms": (request_end - request_start) * 1000,
"success": False,
"error": str(e),
"cache_status": "error",
}
)
return results
def test_database_performance(self) -> None:
"""Test database query performance"""
print("\n=== Database Performance Test ===")
from django.db import connection
from django.contrib.auth.models import User
from ivatar.ivataraccount.models import ConfirmedEmail, Photo
# Reset query log
connection.queries_log.clear()
test_queries = [
{"name": "User count", "query": lambda: User.objects.count()},
{
"name": "Email lookup by digest",
"query": lambda: ConfirmedEmail.objects.filter(
digest="5d41402abc4b2a76b9719d911017c592"
).first(),
},
{
"name": "Top 10 photos by access count",
"query": lambda: list(Photo.objects.order_by("-access_count")[:10]),
},
]
for test in test_queries:
start_time = time.time()
try:
test["query"]()
end_time = time.time()
duration = (end_time - start_time) * 1000
print(f" {test['name']}: {duration:.2f}ms")
if duration > 100: # 100ms threshold
print(" ⚠️ WARNING: Query exceeds 100ms threshold")
except Exception as e:
print(f" {test['name']}: ERROR - {e}")
# Check for N+1 queries
query_count = len(connection.queries)
if query_count > 10:
print(
f" ⚠️ WARNING: {query_count} database queries executed (potential N+1 problem)"
)
else:
print(f" ✅ Database query count is reasonable ({query_count} queries)")
def test_cache_performance(self) -> None:
"""Test caching effectiveness"""
if not self.test_cache:
print("\n=== Cache Performance Test ===")
print(" ⏭️ Cache testing disabled")
return
print("\n=== Cache Performance Test ===")
# Generate a random email address for cache testing
test_email = generate_random_email()
print(f" Testing with: {test_email}")
if self.remote_testing:
first_duration, second_duration = self._test_remote_cache_performance(
test_email
)
else:
first_duration, second_duration = self._test_local_cache_performance(
test_email
)
print(f" First request: {first_duration:.2f}ms")
print(f" Second request: {second_duration:.2f}ms")
improvement_ratio = (
first_duration / second_duration if second_duration > 0 else 0
)
# Analyze cache effectiveness based on headers AND timing
cache_working = False
cache_status = "unknown"
if self.remote_testing and hasattr(self, "cache_info"):
# For remote testing, check actual cache headers
first_cached = self.cache_info["first_request"]["is_cached"]
second_cached = self.cache_info["second_request"]["is_cached"]
if not first_cached and second_cached:
cache_status = "✅ Cache working correctly (miss → hit)"
cache_working = True
elif first_cached and second_cached:
cache_status = "✅ Cache working (both requests cached)"
cache_working = True
elif not first_cached and not second_cached:
cache_status = "⚠️ No cache hits detected"
cache_working = False
else:
cache_status = "⚠️ Unexpected cache behavior"
cache_working = False
else:
# For local testing, fall back to timing-based analysis
if improvement_ratio >= 1.5:
cache_status = "✅ Caching appears to be working (timing-based)"
cache_working = True
else:
cache_status = (
"⚠️ Caching may not be working as expected (timing-based)"
)
cache_working = False
print(f" {cache_status}")
if improvement_ratio > 1:
print(f" Performance improvement: {improvement_ratio:.1f}x faster")
self.results["cache_performance"] = {
"first_request_ms": first_duration,
"second_request_ms": second_duration,
"improvement_ratio": improvement_ratio,
"cache_working": cache_working,
"cache_status": cache_status,
"cache_headers": getattr(self, "cache_info", {}),
}
def _test_remote_cache_performance(self, email: str) -> Tuple[float, float]:
"""Test cache performance against remote server"""
import requests
# Use libravatar library to generate the URL
full_url = libravatar_url(email=email, size=80, default="identicon")
urlobj = urlsplit(full_url)
url_path = f"{urlobj.path}?{urlobj.query}"
url = f"{self.base_url}{url_path}"
# First request (should be cache miss or fresh)
start_time = time.time()
response1 = requests.get(url, timeout=10)
first_duration = (time.time() - start_time) * 1000
# Check first request headers
first_cache_detail = response1.headers.get("x-cache-detail", "unknown")
first_age = response1.headers.get("age", "0")
first_cache_control = response1.headers.get("cache-control", "none")
print(" First request headers:")
print(f" x-cache-detail: {first_cache_detail}")
print(f" age: {first_age}")
print(f" cache-control: {first_cache_control}")
# Small delay to ensure any processing is complete
time.sleep(0.1)
# Second request (should be cache hit)
start_time = time.time()
response2 = requests.get(url, timeout=10)
second_duration = (time.time() - start_time) * 1000
# Check second request headers
second_cache_detail = response2.headers.get("x-cache-detail", "unknown")
second_age = response2.headers.get("age", "0")
second_cache_control = response2.headers.get("cache-control", "none")
print(" Second request headers:")
print(f" x-cache-detail: {second_cache_detail}")
print(f" age: {second_age}")
print(f" cache-control: {second_cache_control}")
# Determine if we actually got cache hits
first_is_cached = (
"cache hit" in first_cache_detail.lower() or int(first_age) > 0
)
second_is_cached = (
"cache hit" in second_cache_detail.lower() or int(second_age) > 0
)
print(" Cache analysis:")
print(
f" First request: {'Cache HIT' if first_is_cached else 'Cache MISS'}"
)
print(
f" Second request: {'Cache HIT' if second_is_cached else 'Cache MISS'}"
)
# Store cache information for analysis
self.cache_info = {
"first_request": {
"cache_detail": first_cache_detail,
"age": first_age,
"is_cached": first_is_cached,
},
"second_request": {
"cache_detail": second_cache_detail,
"age": second_age,
"is_cached": second_is_cached,
},
}
return first_duration, second_duration
def _test_local_cache_performance(self, email: str) -> Tuple[float, float]:
"""Test cache performance locally"""
# Use libravatar library to generate the URL
full_url = libravatar_url(email=email, size=80, default="identicon")
urlobj = urlsplit(full_url)
url_path = f"{urlobj.path}?{urlobj.query}"
# First request (cache miss)
start_time = time.time()
if self.client:
self.client.get(url_path)
first_duration = (time.time() - start_time) * 1000
# Second request (should be cache hit)
start_time = time.time()
if self.client:
self.client.get(url_path)
second_duration = (time.time() - start_time) * 1000
return first_duration, second_duration
def run_all_tests(self, avatar_threshold: int = 1000, response_threshold: int = 1000, p95_threshold: int = 2000, ignore_cache_warnings: bool = False) -> Optional[Dict[str, Any]]:
"""Run all performance tests"""
print("Starting Libravatar Performance Tests")
print("=" * 50)
start_time = time.time()
try:
# Only setup test data for local testing
if not self.remote_testing:
self.setup_test_data()
# Run tests based on mode
if self.remote_testing:
print("🌐 Running remote server tests...")
self.test_remote_avatar_performance(response_threshold)
else:
print("🏠 Running local tests...")
self.test_avatar_generation_performance()
self.test_database_performance()
# Always test concurrent load
self.test_concurrent_load(response_threshold, p95_threshold)
# Test cache performance if enabled
self.test_cache_performance()
end_time = time.time()
total_duration = end_time - start_time
print("\n" + "=" * 50)
print(f"Performance tests completed in {total_duration:.2f}s")
# Overall assessment
self.assess_overall_performance(avatar_threshold, response_threshold, p95_threshold, ignore_cache_warnings)
return self.results
except Exception as e:
print(f"Performance test failed: {e}")
return None
def test_remote_avatar_performance(self, response_threshold: int = 1000) -> None:
"""Test avatar generation performance on remote server"""
print("\n=== Remote Avatar Performance Test ===")
# Generate test cases for all avatar styles and sizes
test_cases = self._generate_test_cases()
results = []
# Generate random email for testing
test_email = generate_random_email()
print(f" Testing with email: {test_email}")
for case in test_cases:
result = self._test_single_avatar_request(
case, test_email, use_requests=True
)
results.append(result)
# Show example URL from first result
if results:
print(f" Example URL: {results[0]['full_url']}")
# Display results grouped by style
self._display_avatar_results(results)
# Calculate statistics for successful requests
successful_results = [r for r in results if r["success"]]
if successful_results:
durations = [r["duration_ms"] for r in successful_results]
avg_duration = statistics.mean(durations)
max_duration = max(durations)
print(f"\n Average: {avg_duration:.2f}ms")
print(f" Maximum: {max_duration:.2f}ms")
print(f" Success rate: {len(successful_results)}/{len(results)}")
# Performance thresholds for remote testing
if avg_duration > (response_threshold * 2): # 2x threshold for warning
print(f" ⚠️ WARNING: Average response time exceeds {response_threshold * 2}ms")
elif avg_duration > response_threshold:
print(f" ⚠️ CAUTION: Average response time exceeds {response_threshold}ms")
else:
print(" ✅ Remote avatar performance is good")
else:
avg_duration = 0
max_duration = 0
print(" ❌ All remote requests failed")
self.results["avatar_generation"] = {
"average_ms": avg_duration,
"maximum_ms": max_duration,
"results": results,
"success_rate": len(successful_results) / len(results) if results else 0,
}
def assess_overall_performance(self, avatar_threshold: int = 1000, response_threshold: int = 1000, p95_threshold: int = 2000, ignore_cache_warnings: bool = False) -> bool:
"""Provide overall performance assessment"""
print("\n=== OVERALL PERFORMANCE ASSESSMENT ===")
warnings = []
# Check avatar generation
if "avatar_generation" in self.results:
avg_gen = self.results["avatar_generation"]["average_ms"]
if avg_gen > avatar_threshold:
warnings.append(f"Avatar generation is slow ({avg_gen:.0f}ms average, threshold: {avatar_threshold}ms)")
# Check concurrent load
if "concurrent_load" in self.results:
failed = self.results["concurrent_load"]["failed_requests"]
if failed > 0:
warnings.append(f"{failed} requests failed under concurrent load")
# Check cache performance
if "cache_performance" in self.results and not ignore_cache_warnings:
cache_working = self.results["cache_performance"].get(
"cache_working", False
)
if not cache_working:
warnings.append("Caching may not be working effectively")
if warnings:
print("⚠️ Performance Issues Found:")
for warning in warnings:
print(f" - {warning}") # noqa: E221
print("\nRecommendations:")
print(" - Review database indexes and query optimization")
print(" - Check caching configuration")
print(" - Consider async processing for heavy operations")
else:
print("✅ Overall performance is good!")
print(" - Avatar generation is responsive")
print(" - Application handles concurrent load well")
print(" - Caching is working effectively")
# Store warnings in results for main function to check
self.results["warnings"] = warnings
return len(warnings) > 0
def main() -> Optional[Dict[str, Any]]:
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description="Run Libravatar performance tests")
parser.add_argument(
"--base-url",
default="http://localhost:8000",
help="Base URL for testing (default: http://localhost:8000)",
)
parser.add_argument(
"--concurrent-users",
type=int,
default=10,
help="Number of concurrent users to simulate (default: 10)",
)
parser.add_argument("--output", help="Output file for results (JSON)")
parser.add_argument(
"--no-cache-test",
action="store_true",
help="Disable cache performance testing (useful for local development)",
)
parser.add_argument(
"--remote",
action="store_true",
help="Force remote testing mode (auto-detected for non-localhost URLs)",
)
parser.add_argument(
"--avatar-threshold",
type=int,
default=1000,
help="Avatar generation threshold in ms (default: 1000ms, use 2500 for dev environments)",
)
parser.add_argument(
"--response-threshold",
type=int,
default=1000,
help="Response time threshold in ms (default: 1000ms, use 2500 for dev environments)",
)
parser.add_argument(
"--p95-threshold",
type=int,
default=2000,
help="95th percentile threshold in ms (default: 2000ms, use 5000 for dev environments)",
)
parser.add_argument(
"--ignore-cache-warnings",
action="store_true",
help="Don't fail on cache performance warnings (useful for dev environments)",
)
args = parser.parse_args()
# Determine if we should test cache performance
test_cache = not args.no_cache_test
# Determine if we're doing remote testing
remote_testing = args.remote or not args.base_url.startswith("http://localhost")
runner = PerformanceTestRunner(
base_url=args.base_url,
concurrent_users=args.concurrent_users,
test_cache=test_cache,
remote_testing=remote_testing,
)
results = runner.run_all_tests(args.avatar_threshold, args.response_threshold, args.p95_threshold, args.ignore_cache_warnings)
if args.output and results:
import json
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {args.output}")
# Exit with error code if there were performance issues
if results and "warnings" in results and len(results["warnings"]) > 0:
sys.exit(1)
return results
if __name__ == "__main__":
main()