mirror of
https://git.linux-kernel.at/oliver/ivatar.git
synced 2025-11-11 10:46:24 +00:00
1060 lines
40 KiB
Python
1060 lines
40 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Performance testing script for Libravatar CI/CD pipeline
|
|
|
|
This script runs automated performance tests to catch regressions
|
|
and ensure the application meets performance requirements.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import statistics
|
|
import hashlib
|
|
import random
|
|
import string
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
|
# Add project root to path
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from libravatar import libravatar_url
|
|
from urllib.parse import urlsplit
|
|
from prettytable import PrettyTable
|
|
|
|
|
|
def random_string(length=10):
|
|
"""Return some random string with default length 10"""
|
|
return "".join(
|
|
random.SystemRandom().choice(string.ascii_lowercase + string.digits)
|
|
for _ in range(length)
|
|
)
|
|
|
|
|
|
# Try to import Django utilities for local testing, fallback to local implementation
|
|
try:
|
|
from ivatar.utils import generate_random_email
|
|
except ImportError:
|
|
# Use local version for external testing
|
|
def generate_random_email():
|
|
"""Generate a random email address using the same pattern as test_views.py"""
|
|
username = random_string()
|
|
domain = random_string()
|
|
tld = random_string(2)
|
|
return f"{username}@{domain}.{tld}"
|
|
|
|
|
|
# Django setup - only for local testing
|
|
def setup_django() -> None:
|
|
"""Setup Django for local testing"""
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ivatar.settings")
|
|
import django
|
|
|
|
django.setup()
|
|
|
|
|
|
class PerformanceTestRunner:
|
|
"""Main performance test runner"""
|
|
|
|
# Define all avatar styles and sizes to test
|
|
AVATAR_STYLES: List[str] = [
|
|
"identicon",
|
|
"monsterid",
|
|
"robohash",
|
|
"pagan",
|
|
"retro",
|
|
"wavatar",
|
|
"mm",
|
|
"mmng",
|
|
]
|
|
AVATAR_SIZES: List[int] = [80, 256]
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str = "http://localhost:8000",
|
|
concurrent_users: int = 10,
|
|
test_cache: bool = True,
|
|
remote_testing: bool = False,
|
|
) -> None:
|
|
self.base_url: str = base_url
|
|
self.concurrent_users: int = concurrent_users
|
|
self.test_cache: bool = test_cache
|
|
self.remote_testing: bool = remote_testing
|
|
self.client: Optional[Any] = None # Django test client
|
|
self.results: Dict[str, Any] = {}
|
|
|
|
# Determine if we're testing locally or remotely
|
|
if remote_testing or not base_url.startswith("http://localhost"):
|
|
self.remote_testing = True
|
|
print(f"🌐 Remote testing mode: {base_url}")
|
|
else:
|
|
print(f"🏠 Local testing mode: {base_url}")
|
|
# Only setup Django and create client for local testing
|
|
setup_django()
|
|
from django.test import Client
|
|
|
|
self.client = Client()
|
|
|
|
def setup_test_data(self) -> None:
|
|
"""Create test data for performance tests"""
|
|
print("Setting up test data...")
|
|
|
|
# Import Django models only when needed
|
|
from django.contrib.auth.models import User
|
|
from ivatar.ivataraccount.models import ConfirmedEmail
|
|
|
|
# Create test users and emails
|
|
test_emails = [f"perftest{i}@example.com" for i in range(100)]
|
|
|
|
for i, email in enumerate(test_emails):
|
|
if not User.objects.filter(username=f"perftest{i}").exists():
|
|
user = User.objects.create_user(
|
|
username=f"perftest{i}", email=email, password="testpass123"
|
|
)
|
|
|
|
# Create confirmed email
|
|
ConfirmedEmail.objects.create(
|
|
user=user, email=email, ip_address="127.0.0.1"
|
|
)
|
|
|
|
print(f"Created {len(test_emails)} test users and emails")
|
|
|
|
def _generate_test_cases(self) -> List[Dict[str, Any]]:
|
|
"""Generate test cases for all avatar styles and sizes"""
|
|
test_cases = []
|
|
for style in self.AVATAR_STYLES:
|
|
for size in self.AVATAR_SIZES:
|
|
test_cases.append({"default": style, "size": size})
|
|
return test_cases
|
|
|
|
def _test_single_avatar_request(
|
|
self, case: Dict[str, Any], email: str, use_requests: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""Test a single avatar request - shared logic for local and remote testing"""
|
|
# Use libravatar library to generate the URL
|
|
full_url = libravatar_url(
|
|
email=email, size=case["size"], default=case["default"]
|
|
)
|
|
|
|
# Extract path and query from the full URL
|
|
urlobj = urlsplit(full_url)
|
|
url_path = f"{urlobj.path}?{urlobj.query}"
|
|
|
|
start_time = time.time()
|
|
|
|
if use_requests:
|
|
# Remote testing with requests
|
|
import requests
|
|
|
|
url = f"{self.base_url}{url_path}"
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
end_time = time.time()
|
|
duration = (end_time - start_time) * 1000
|
|
|
|
# Determine cache status from response headers
|
|
cache_detail = response.headers.get("x-cache-detail", "").lower()
|
|
age = response.headers.get("age", "0")
|
|
cache_status = "unknown"
|
|
|
|
if "cache hit" in cache_detail or int(age) > 0:
|
|
cache_status = "hit"
|
|
elif "cache miss" in cache_detail or age == "0":
|
|
cache_status = "miss"
|
|
|
|
return {
|
|
"test": f"{case['default']}_{case['size']}px",
|
|
"duration_ms": duration,
|
|
"status_code": response.status_code,
|
|
"content_length": len(response.content) if response.content else 0,
|
|
"success": response.status_code == 200,
|
|
"cache_status": cache_status,
|
|
"cache_detail": cache_detail,
|
|
"age": age,
|
|
"full_url": full_url,
|
|
"email": email,
|
|
}
|
|
except Exception as e:
|
|
end_time = time.time()
|
|
duration = (end_time - start_time) * 1000
|
|
return {
|
|
"test": f"{case['default']}_{case['size']}px",
|
|
"duration_ms": duration,
|
|
"status_code": 0,
|
|
"success": False,
|
|
"error": str(e),
|
|
"cache_status": "error",
|
|
"full_url": full_url,
|
|
"email": email,
|
|
}
|
|
else:
|
|
# Local testing with Django test client
|
|
if self.client is None:
|
|
raise RuntimeError("Django test client not initialized")
|
|
response = self.client.get(url_path, follow=True)
|
|
end_time = time.time()
|
|
duration = (end_time - start_time) * 1000
|
|
|
|
# Check for cache information in response headers
|
|
cache_status = "unknown"
|
|
if hasattr(response, "get") and callable(getattr(response, "get", None)):
|
|
cache_control = response.get("Cache-Control", "")
|
|
age = response.get("Age", "0")
|
|
if age and int(age) > 0:
|
|
cache_status = "hit"
|
|
elif "no-cache" in cache_control:
|
|
cache_status = "miss"
|
|
else:
|
|
cache_status = "miss" # Default assumption for first generation
|
|
|
|
# Handle content length for different response types
|
|
content_length = 0
|
|
if hasattr(response, "content"):
|
|
content_length = len(response.content) if response.content else 0
|
|
elif hasattr(response, "streaming_content"):
|
|
# For FileResponse, we can't easily get content length without consuming the stream
|
|
content_length = 1 # Just indicate there's content
|
|
|
|
return {
|
|
"test": f"{case['default']}_{case['size']}px",
|
|
"duration_ms": duration,
|
|
"status_code": response.status_code,
|
|
"content_length": content_length,
|
|
"cache_status": cache_status,
|
|
"success": response.status_code == 200,
|
|
"full_url": full_url,
|
|
"email": email,
|
|
}
|
|
|
|
def _display_avatar_results(self, results: List[Dict[str, Any]]) -> None:
|
|
"""Display avatar test results using prettytable for perfect alignment"""
|
|
# Group results by avatar style
|
|
style_results: Dict[str, List[Dict[str, Any]]] = {}
|
|
for result in results:
|
|
style = result["test"].split("_")[0] # Extract style from test name
|
|
if style not in style_results:
|
|
style_results[style] = []
|
|
style_results[style].append(result)
|
|
|
|
# Create table
|
|
table = PrettyTable()
|
|
table.field_names = ["Avatar Style", "Size", "Time (ms)", "Status", "Cache"]
|
|
table.align["Avatar Style"] = "l"
|
|
table.align["Size"] = "r"
|
|
table.align["Time (ms)"] = "r"
|
|
table.align["Status"] = "c"
|
|
table.align["Cache"] = "c"
|
|
|
|
# Add data to table
|
|
styles_with_data = [
|
|
style for style in self.AVATAR_STYLES if style in style_results
|
|
]
|
|
|
|
for i, style in enumerate(styles_with_data):
|
|
style_data = style_results[style]
|
|
successful_results = [r for r in style_data if r.get("success", True)]
|
|
failed_results = [r for r in style_data if not r.get("success", True)]
|
|
|
|
if successful_results:
|
|
# Calculate average
|
|
avg_duration = statistics.mean(
|
|
[r["duration_ms"] for r in successful_results]
|
|
)
|
|
|
|
# Determine overall cache status
|
|
cache_statuses = [
|
|
r["cache_status"]
|
|
for r in successful_results
|
|
if r["cache_status"] != "unknown"
|
|
]
|
|
if not cache_statuses:
|
|
cache_summary = "unknown"
|
|
elif all(status == "hit" for status in cache_statuses):
|
|
cache_summary = "hit"
|
|
elif all(status == "miss" for status in cache_statuses):
|
|
cache_summary = "miss"
|
|
else:
|
|
cache_summary = "mixed"
|
|
|
|
# Determine status icon for average line
|
|
if len(failed_results) == 0:
|
|
avg_status_icon = "✅" # All successful
|
|
elif len(successful_results) == 0:
|
|
avg_status_icon = "❌" # All failed
|
|
else:
|
|
avg_status_icon = "⚠️" # Mixed results
|
|
|
|
# Add average row
|
|
table.add_row(
|
|
[
|
|
f"{style} (avg)",
|
|
"",
|
|
f"{avg_duration:.2f}",
|
|
avg_status_icon,
|
|
cache_summary,
|
|
]
|
|
)
|
|
|
|
# Add individual size rows
|
|
for result in style_data:
|
|
size = result["test"].split("_")[1] # Extract size from test name
|
|
status_icon = "✅" if result.get("success", True) else "❌"
|
|
cache_status = result["cache_status"]
|
|
|
|
if result.get("success", True):
|
|
table.add_row(
|
|
[
|
|
"",
|
|
size,
|
|
f"{result['duration_ms']:.2f}",
|
|
status_icon,
|
|
cache_status,
|
|
]
|
|
)
|
|
else:
|
|
error_msg = result.get("error", "Failed")
|
|
table.add_row(["", size, error_msg, status_icon, cache_status])
|
|
else:
|
|
# All requests failed
|
|
table.add_row([f"{style} (avg)", "", "Failed", "❌", "error"])
|
|
for result in style_data:
|
|
size = result["test"].split("_")[1]
|
|
error_msg = result.get("error", "Failed")
|
|
table.add_row(["", size, error_msg, "❌", "error"])
|
|
|
|
# Add divider line between styles (except after the last style)
|
|
if i < len(styles_with_data) - 1:
|
|
table.add_row(["-" * 15, "-" * 5, "-" * 9, "-" * 6, "-" * 5])
|
|
|
|
print(table)
|
|
|
|
def test_avatar_generation_performance(self) -> None:
|
|
"""Test avatar generation performance"""
|
|
print("\n=== Avatar Generation Performance Test ===")
|
|
|
|
# Generate test cases for all avatar styles and sizes
|
|
test_cases = self._generate_test_cases()
|
|
results = []
|
|
|
|
# Generate random email for testing
|
|
test_email = generate_random_email()
|
|
print(f" Testing with email: {test_email}")
|
|
|
|
for case in test_cases:
|
|
result = self._test_single_avatar_request(
|
|
case, test_email, use_requests=False
|
|
)
|
|
results.append(result)
|
|
|
|
# Show example URL from first result
|
|
if results:
|
|
print(f" Example URL: {results[0]['full_url']}")
|
|
|
|
# Display results grouped by style
|
|
self._display_avatar_results(results)
|
|
|
|
# Calculate statistics
|
|
successful_results = [r for r in results if r.get("success", True)]
|
|
if successful_results:
|
|
durations = [r["duration_ms"] for r in successful_results]
|
|
avg_duration = statistics.mean(durations)
|
|
max_duration = max(durations)
|
|
else:
|
|
avg_duration = 0
|
|
max_duration = 0
|
|
|
|
print(f"\n Average: {avg_duration:.2f}ms")
|
|
print(f" Maximum: {max_duration:.2f}ms")
|
|
|
|
# Performance thresholds
|
|
if avg_duration > 1000: # 1 second
|
|
print(" ⚠️ WARNING: Average avatar generation time exceeds 1s")
|
|
elif avg_duration > 500: # 500ms
|
|
print(" ⚠️ CAUTION: Average avatar generation time exceeds 500ms")
|
|
else:
|
|
print(" ✅ Avatar generation performance is good")
|
|
|
|
self.results["avatar_generation"] = {
|
|
"average_ms": avg_duration,
|
|
"maximum_ms": max_duration,
|
|
"results": results,
|
|
}
|
|
|
|
def test_concurrent_load(self, response_threshold: int = 1000, p95_threshold: int = 2000) -> None:
|
|
"""Test concurrent load handling"""
|
|
print("\n=== Concurrent Load Test ===")
|
|
|
|
num_requests = 20
|
|
|
|
if self.remote_testing:
|
|
print(f" Running {num_requests} HTTP requests to {self.base_url}...")
|
|
results = self._test_remote_concurrent_load(num_requests)
|
|
else:
|
|
print(f" Running {num_requests} local avatar generations...")
|
|
results = self._test_local_concurrent_load(num_requests)
|
|
|
|
# Analyze results
|
|
successful_requests = [r for r in results if r["success"]]
|
|
failed_requests = [r for r in results if not r["success"]]
|
|
|
|
# Analyze cache performance
|
|
cache_hits = [r for r in results if r.get("cache_status") == "hit"]
|
|
cache_misses = [r for r in results if r.get("cache_status") == "miss"]
|
|
cache_errors = [r for r in results if r.get("cache_status") == "error"]
|
|
|
|
total_duration = (
|
|
sum(r["duration_ms"] for r in results) / 1000
|
|
) # Convert to seconds
|
|
|
|
print(f" Total time: {total_duration:.2f}s")
|
|
print(f" Successful requests: {len(successful_requests)}/{num_requests}")
|
|
print(f" Failed requests: {len(failed_requests)}")
|
|
|
|
# Show cache statistics if available
|
|
if cache_hits or cache_misses:
|
|
print(f" Cache hits: {len(cache_hits)}")
|
|
print(f" Cache misses: {len(cache_misses)}")
|
|
if cache_errors:
|
|
print(f" Cache errors: {len(cache_errors)}")
|
|
|
|
cache_hit_rate = (
|
|
len(cache_hits) / (len(cache_hits) + len(cache_misses)) * 100
|
|
if (cache_hits or cache_misses)
|
|
else 0
|
|
)
|
|
print(f" Cache hit rate: {cache_hit_rate:.1f}%")
|
|
|
|
if successful_requests:
|
|
durations = [r["duration_ms"] for r in successful_requests]
|
|
avg_duration = statistics.mean(durations)
|
|
|
|
# Calculate p95 safely
|
|
if len(durations) >= 2:
|
|
try:
|
|
p95_duration = statistics.quantiles(durations, n=20)[
|
|
18
|
|
] # 95th percentile
|
|
except (ValueError, IndexError):
|
|
p95_duration = max(durations)
|
|
else:
|
|
p95_duration = max(durations)
|
|
|
|
print(f" Average response time: {avg_duration:.2f}ms")
|
|
print(f" 95th percentile: {p95_duration:.2f}ms")
|
|
print(
|
|
f" Operations per second: {len(successful_requests) / total_duration:.2f}"
|
|
)
|
|
|
|
# Performance evaluation
|
|
if len(failed_requests) > 0:
|
|
print(" ⚠️ WARNING: Some operations failed under load")
|
|
elif p95_duration > p95_threshold:
|
|
print(f" ⚠️ WARNING: 95th percentile response time exceeds {p95_threshold}ms")
|
|
elif avg_duration > response_threshold:
|
|
print(f" ⚠️ CAUTION: Average response time exceeds {response_threshold}ms under load")
|
|
else:
|
|
print(" ✅ Load handling is good")
|
|
else:
|
|
avg_duration = 0
|
|
p95_duration = 0
|
|
print(" ❌ All operations failed")
|
|
|
|
self.results["concurrent_load"] = {
|
|
"total_duration_s": total_duration,
|
|
"successful_requests": len(successful_requests),
|
|
"failed_requests": len(failed_requests),
|
|
"average_ms": avg_duration,
|
|
"p95_ms": p95_duration,
|
|
"requests_per_second": (
|
|
len(successful_requests) / total_duration if total_duration > 0 else 0
|
|
),
|
|
"cache_hits": len(cache_hits),
|
|
"cache_misses": len(cache_misses),
|
|
"cache_errors": len(cache_errors),
|
|
"cache_hit_rate": (
|
|
len(cache_hits) / (len(cache_hits) + len(cache_misses)) * 100
|
|
if (cache_hits or cache_misses)
|
|
else 0
|
|
),
|
|
}
|
|
|
|
def _test_remote_concurrent_load(self, num_requests: int) -> List[Dict[str, Any]]:
|
|
"""Test concurrent load against remote server"""
|
|
import requests # noqa: F401
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
def make_remote_request(thread_id):
|
|
test_email = generate_random_email()
|
|
|
|
# Use libravatar library to generate the URL
|
|
full_url = libravatar_url(email=test_email, size=80, default="identicon")
|
|
urlobj = urlsplit(full_url)
|
|
url_path = f"{urlobj.path}?{urlobj.query}"
|
|
url = f"{self.base_url}{url_path}"
|
|
|
|
start_time = time.time()
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
end_time = time.time()
|
|
|
|
# Determine cache status
|
|
cache_detail = response.headers.get("x-cache-detail", "").lower()
|
|
age = response.headers.get("age", "0")
|
|
cache_status = "unknown"
|
|
|
|
if "cache hit" in cache_detail or int(age) > 0:
|
|
cache_status = "hit"
|
|
elif "cache miss" in cache_detail or age == "0":
|
|
cache_status = "miss"
|
|
|
|
return {
|
|
"thread_id": thread_id,
|
|
"duration_ms": (end_time - start_time) * 1000,
|
|
"status_code": response.status_code,
|
|
"success": response.status_code == 200,
|
|
"cache_status": cache_status,
|
|
}
|
|
except Exception as e:
|
|
end_time = time.time()
|
|
return {
|
|
"thread_id": thread_id,
|
|
"duration_ms": (end_time - start_time) * 1000,
|
|
"success": False,
|
|
"error": str(e),
|
|
"cache_status": "error",
|
|
}
|
|
|
|
results = []
|
|
with ThreadPoolExecutor(max_workers=self.concurrent_users) as executor:
|
|
futures = [
|
|
executor.submit(make_remote_request, i) for i in range(num_requests)
|
|
]
|
|
|
|
for future in as_completed(futures):
|
|
try:
|
|
result = future.result()
|
|
results.append(result)
|
|
except Exception as e:
|
|
print(f" Request failed: {e}")
|
|
|
|
return results
|
|
|
|
def _test_local_concurrent_load(self, num_requests: int) -> List[Dict[str, Any]]:
|
|
"""Test concurrent load locally using avatar generation functions"""
|
|
results = []
|
|
|
|
# Import avatar generation functions
|
|
try:
|
|
import Identicon
|
|
|
|
for i in range(num_requests):
|
|
test_email = generate_random_email()
|
|
email_hash = hashlib.md5(test_email.encode()).hexdigest()
|
|
|
|
request_start = time.time()
|
|
try:
|
|
# Test identicon generation directly
|
|
identicon_data = Identicon.render(email_hash)
|
|
request_end = time.time()
|
|
|
|
results.append(
|
|
{
|
|
"thread_id": i,
|
|
"duration_ms": (request_end - request_start) * 1000,
|
|
"success": len(identicon_data) > 0,
|
|
"cache_status": "miss", # Direct generation is always a cache miss
|
|
}
|
|
)
|
|
except Exception as e:
|
|
request_end = time.time()
|
|
results.append(
|
|
{
|
|
"thread_id": i,
|
|
"duration_ms": (request_end - request_start) * 1000,
|
|
"success": False,
|
|
"error": str(e),
|
|
"cache_status": "error",
|
|
}
|
|
)
|
|
|
|
except ImportError:
|
|
# Fallback: just test database queries
|
|
print(
|
|
" Avatar generators not available, testing database queries instead..."
|
|
)
|
|
for i in range(num_requests):
|
|
request_start = time.time()
|
|
try:
|
|
from django.contrib.auth.models import User
|
|
|
|
User.objects.count()
|
|
request_end = time.time()
|
|
|
|
results.append(
|
|
{
|
|
"thread_id": i,
|
|
"duration_ms": (request_end - request_start) * 1000,
|
|
"success": True,
|
|
"cache_status": "n/a", # Database queries don't use image cache
|
|
}
|
|
)
|
|
except Exception as e:
|
|
request_end = time.time()
|
|
results.append(
|
|
{
|
|
"thread_id": i,
|
|
"duration_ms": (request_end - request_start) * 1000,
|
|
"success": False,
|
|
"error": str(e),
|
|
"cache_status": "error",
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
def test_database_performance(self) -> None:
|
|
"""Test database query performance"""
|
|
print("\n=== Database Performance Test ===")
|
|
|
|
from django.db import connection
|
|
from django.contrib.auth.models import User
|
|
from ivatar.ivataraccount.models import ConfirmedEmail, Photo
|
|
|
|
# Reset query log
|
|
connection.queries_log.clear()
|
|
|
|
test_queries = [
|
|
{"name": "User count", "query": lambda: User.objects.count()},
|
|
{
|
|
"name": "Email lookup by digest",
|
|
"query": lambda: ConfirmedEmail.objects.filter(
|
|
digest="5d41402abc4b2a76b9719d911017c592"
|
|
).first(),
|
|
},
|
|
{
|
|
"name": "Top 10 photos by access count",
|
|
"query": lambda: list(Photo.objects.order_by("-access_count")[:10]),
|
|
},
|
|
]
|
|
|
|
for test in test_queries:
|
|
start_time = time.time()
|
|
try:
|
|
test["query"]()
|
|
end_time = time.time()
|
|
duration = (end_time - start_time) * 1000
|
|
|
|
print(f" {test['name']}: {duration:.2f}ms")
|
|
|
|
if duration > 100: # 100ms threshold
|
|
print(" ⚠️ WARNING: Query exceeds 100ms threshold")
|
|
|
|
except Exception as e:
|
|
print(f" {test['name']}: ERROR - {e}")
|
|
|
|
# Check for N+1 queries
|
|
query_count = len(connection.queries)
|
|
if query_count > 10:
|
|
print(
|
|
f" ⚠️ WARNING: {query_count} database queries executed (potential N+1 problem)"
|
|
)
|
|
else:
|
|
print(f" ✅ Database query count is reasonable ({query_count} queries)")
|
|
|
|
def test_cache_performance(self) -> None:
|
|
"""Test caching effectiveness"""
|
|
if not self.test_cache:
|
|
print("\n=== Cache Performance Test ===")
|
|
print(" ⏭️ Cache testing disabled")
|
|
return
|
|
|
|
print("\n=== Cache Performance Test ===")
|
|
|
|
# Generate a random email address for cache testing
|
|
test_email = generate_random_email()
|
|
print(f" Testing with: {test_email}")
|
|
|
|
if self.remote_testing:
|
|
first_duration, second_duration = self._test_remote_cache_performance(
|
|
test_email
|
|
)
|
|
else:
|
|
first_duration, second_duration = self._test_local_cache_performance(
|
|
test_email
|
|
)
|
|
|
|
print(f" First request: {first_duration:.2f}ms")
|
|
print(f" Second request: {second_duration:.2f}ms")
|
|
|
|
improvement_ratio = (
|
|
first_duration / second_duration if second_duration > 0 else 0
|
|
)
|
|
|
|
# Analyze cache effectiveness based on headers AND timing
|
|
cache_working = False
|
|
cache_status = "unknown"
|
|
|
|
if self.remote_testing and hasattr(self, "cache_info"):
|
|
# For remote testing, check actual cache headers
|
|
first_cached = self.cache_info["first_request"]["is_cached"]
|
|
second_cached = self.cache_info["second_request"]["is_cached"]
|
|
|
|
if not first_cached and second_cached:
|
|
cache_status = "✅ Cache working correctly (miss → hit)"
|
|
cache_working = True
|
|
elif first_cached and second_cached:
|
|
cache_status = "✅ Cache working (both requests cached)"
|
|
cache_working = True
|
|
elif not first_cached and not second_cached:
|
|
cache_status = "⚠️ No cache hits detected"
|
|
cache_working = False
|
|
else:
|
|
cache_status = "⚠️ Unexpected cache behavior"
|
|
cache_working = False
|
|
else:
|
|
# For local testing, fall back to timing-based analysis
|
|
if improvement_ratio >= 1.5:
|
|
cache_status = "✅ Caching appears to be working (timing-based)"
|
|
cache_working = True
|
|
else:
|
|
cache_status = (
|
|
"⚠️ Caching may not be working as expected (timing-based)"
|
|
)
|
|
cache_working = False
|
|
|
|
print(f" {cache_status}")
|
|
if improvement_ratio > 1:
|
|
print(f" Performance improvement: {improvement_ratio:.1f}x faster")
|
|
|
|
self.results["cache_performance"] = {
|
|
"first_request_ms": first_duration,
|
|
"second_request_ms": second_duration,
|
|
"improvement_ratio": improvement_ratio,
|
|
"cache_working": cache_working,
|
|
"cache_status": cache_status,
|
|
"cache_headers": getattr(self, "cache_info", {}),
|
|
}
|
|
|
|
def _test_remote_cache_performance(self, email: str) -> Tuple[float, float]:
|
|
"""Test cache performance against remote server"""
|
|
import requests
|
|
|
|
# Use libravatar library to generate the URL
|
|
full_url = libravatar_url(email=email, size=80, default="identicon")
|
|
urlobj = urlsplit(full_url)
|
|
url_path = f"{urlobj.path}?{urlobj.query}"
|
|
url = f"{self.base_url}{url_path}"
|
|
|
|
# First request (should be cache miss or fresh)
|
|
start_time = time.time()
|
|
response1 = requests.get(url, timeout=10)
|
|
first_duration = (time.time() - start_time) * 1000
|
|
|
|
# Check first request headers
|
|
first_cache_detail = response1.headers.get("x-cache-detail", "unknown")
|
|
first_age = response1.headers.get("age", "0")
|
|
first_cache_control = response1.headers.get("cache-control", "none")
|
|
|
|
print(" First request headers:")
|
|
print(f" x-cache-detail: {first_cache_detail}")
|
|
print(f" age: {first_age}")
|
|
print(f" cache-control: {first_cache_control}")
|
|
|
|
# Small delay to ensure any processing is complete
|
|
time.sleep(0.1)
|
|
|
|
# Second request (should be cache hit)
|
|
start_time = time.time()
|
|
response2 = requests.get(url, timeout=10)
|
|
second_duration = (time.time() - start_time) * 1000
|
|
|
|
# Check second request headers
|
|
second_cache_detail = response2.headers.get("x-cache-detail", "unknown")
|
|
second_age = response2.headers.get("age", "0")
|
|
second_cache_control = response2.headers.get("cache-control", "none")
|
|
|
|
print(" Second request headers:")
|
|
print(f" x-cache-detail: {second_cache_detail}")
|
|
print(f" age: {second_age}")
|
|
print(f" cache-control: {second_cache_control}")
|
|
|
|
# Determine if we actually got cache hits
|
|
first_is_cached = (
|
|
"cache hit" in first_cache_detail.lower() or int(first_age) > 0
|
|
)
|
|
second_is_cached = (
|
|
"cache hit" in second_cache_detail.lower() or int(second_age) > 0
|
|
)
|
|
|
|
print(" Cache analysis:")
|
|
print(
|
|
f" First request: {'Cache HIT' if first_is_cached else 'Cache MISS'}"
|
|
)
|
|
print(
|
|
f" Second request: {'Cache HIT' if second_is_cached else 'Cache MISS'}"
|
|
)
|
|
|
|
# Store cache information for analysis
|
|
self.cache_info = {
|
|
"first_request": {
|
|
"cache_detail": first_cache_detail,
|
|
"age": first_age,
|
|
"is_cached": first_is_cached,
|
|
},
|
|
"second_request": {
|
|
"cache_detail": second_cache_detail,
|
|
"age": second_age,
|
|
"is_cached": second_is_cached,
|
|
},
|
|
}
|
|
|
|
return first_duration, second_duration
|
|
|
|
def _test_local_cache_performance(self, email: str) -> Tuple[float, float]:
|
|
"""Test cache performance locally"""
|
|
# Use libravatar library to generate the URL
|
|
full_url = libravatar_url(email=email, size=80, default="identicon")
|
|
urlobj = urlsplit(full_url)
|
|
url_path = f"{urlobj.path}?{urlobj.query}"
|
|
|
|
# First request (cache miss)
|
|
start_time = time.time()
|
|
if self.client:
|
|
self.client.get(url_path)
|
|
first_duration = (time.time() - start_time) * 1000
|
|
|
|
# Second request (should be cache hit)
|
|
start_time = time.time()
|
|
if self.client:
|
|
self.client.get(url_path)
|
|
second_duration = (time.time() - start_time) * 1000
|
|
|
|
return first_duration, second_duration
|
|
|
|
def run_all_tests(self, avatar_threshold: int = 1000, response_threshold: int = 1000, p95_threshold: int = 2000, ignore_cache_warnings: bool = False) -> Optional[Dict[str, Any]]:
|
|
"""Run all performance tests"""
|
|
print("Starting Libravatar Performance Tests")
|
|
print("=" * 50)
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Only setup test data for local testing
|
|
if not self.remote_testing:
|
|
self.setup_test_data()
|
|
|
|
# Run tests based on mode
|
|
if self.remote_testing:
|
|
print("🌐 Running remote server tests...")
|
|
self.test_remote_avatar_performance(response_threshold)
|
|
else:
|
|
print("🏠 Running local tests...")
|
|
self.test_avatar_generation_performance()
|
|
self.test_database_performance()
|
|
|
|
# Always test concurrent load
|
|
self.test_concurrent_load(response_threshold, p95_threshold)
|
|
|
|
# Test cache performance if enabled
|
|
self.test_cache_performance()
|
|
|
|
end_time = time.time()
|
|
total_duration = end_time - start_time
|
|
|
|
print("\n" + "=" * 50)
|
|
print(f"Performance tests completed in {total_duration:.2f}s")
|
|
|
|
# Overall assessment
|
|
self.assess_overall_performance(avatar_threshold, response_threshold, p95_threshold, ignore_cache_warnings)
|
|
|
|
return self.results
|
|
|
|
except Exception as e:
|
|
print(f"Performance test failed: {e}")
|
|
return None
|
|
|
|
def test_remote_avatar_performance(self, response_threshold: int = 1000) -> None:
|
|
"""Test avatar generation performance on remote server"""
|
|
print("\n=== Remote Avatar Performance Test ===")
|
|
|
|
# Generate test cases for all avatar styles and sizes
|
|
test_cases = self._generate_test_cases()
|
|
results = []
|
|
|
|
# Generate random email for testing
|
|
test_email = generate_random_email()
|
|
print(f" Testing with email: {test_email}")
|
|
|
|
for case in test_cases:
|
|
result = self._test_single_avatar_request(
|
|
case, test_email, use_requests=True
|
|
)
|
|
results.append(result)
|
|
|
|
# Show example URL from first result
|
|
if results:
|
|
print(f" Example URL: {results[0]['full_url']}")
|
|
|
|
# Display results grouped by style
|
|
self._display_avatar_results(results)
|
|
|
|
# Calculate statistics for successful requests
|
|
successful_results = [r for r in results if r["success"]]
|
|
if successful_results:
|
|
durations = [r["duration_ms"] for r in successful_results]
|
|
avg_duration = statistics.mean(durations)
|
|
max_duration = max(durations)
|
|
|
|
print(f"\n Average: {avg_duration:.2f}ms")
|
|
print(f" Maximum: {max_duration:.2f}ms")
|
|
print(f" Success rate: {len(successful_results)}/{len(results)}")
|
|
|
|
# Performance thresholds for remote testing
|
|
if avg_duration > (response_threshold * 2): # 2x threshold for warning
|
|
print(f" ⚠️ WARNING: Average response time exceeds {response_threshold * 2}ms")
|
|
elif avg_duration > response_threshold:
|
|
print(f" ⚠️ CAUTION: Average response time exceeds {response_threshold}ms")
|
|
else:
|
|
print(" ✅ Remote avatar performance is good")
|
|
else:
|
|
avg_duration = 0
|
|
max_duration = 0
|
|
print(" ❌ All remote requests failed")
|
|
|
|
self.results["avatar_generation"] = {
|
|
"average_ms": avg_duration,
|
|
"maximum_ms": max_duration,
|
|
"results": results,
|
|
"success_rate": len(successful_results) / len(results) if results else 0,
|
|
}
|
|
|
|
def assess_overall_performance(self, avatar_threshold: int = 1000, response_threshold: int = 1000, p95_threshold: int = 2000, ignore_cache_warnings: bool = False) -> bool:
|
|
"""Provide overall performance assessment"""
|
|
print("\n=== OVERALL PERFORMANCE ASSESSMENT ===")
|
|
|
|
warnings = []
|
|
|
|
# Check avatar generation
|
|
if "avatar_generation" in self.results:
|
|
avg_gen = self.results["avatar_generation"]["average_ms"]
|
|
if avg_gen > avatar_threshold:
|
|
warnings.append(f"Avatar generation is slow ({avg_gen:.0f}ms average, threshold: {avatar_threshold}ms)")
|
|
|
|
# Check concurrent load
|
|
if "concurrent_load" in self.results:
|
|
failed = self.results["concurrent_load"]["failed_requests"]
|
|
if failed > 0:
|
|
warnings.append(f"{failed} requests failed under concurrent load")
|
|
|
|
# Check cache performance
|
|
if "cache_performance" in self.results and not ignore_cache_warnings:
|
|
cache_working = self.results["cache_performance"].get(
|
|
"cache_working", False
|
|
)
|
|
if not cache_working:
|
|
warnings.append("Caching may not be working effectively")
|
|
|
|
if warnings:
|
|
print("⚠️ Performance Issues Found:")
|
|
for warning in warnings:
|
|
print(f" - {warning}") # noqa: E221
|
|
print("\nRecommendations:")
|
|
print(" - Review database indexes and query optimization")
|
|
print(" - Check caching configuration")
|
|
print(" - Consider async processing for heavy operations")
|
|
else:
|
|
print("✅ Overall performance is good!")
|
|
print(" - Avatar generation is responsive")
|
|
print(" - Application handles concurrent load well")
|
|
print(" - Caching is working effectively")
|
|
|
|
# Store warnings in results for main function to check
|
|
self.results["warnings"] = warnings
|
|
return len(warnings) > 0
|
|
|
|
|
|
def main() -> Optional[Dict[str, Any]]:
|
|
"""Main entry point"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Run Libravatar performance tests")
|
|
parser.add_argument(
|
|
"--base-url",
|
|
default="http://localhost:8000",
|
|
help="Base URL for testing (default: http://localhost:8000)",
|
|
)
|
|
parser.add_argument(
|
|
"--concurrent-users",
|
|
type=int,
|
|
default=10,
|
|
help="Number of concurrent users to simulate (default: 10)",
|
|
)
|
|
parser.add_argument("--output", help="Output file for results (JSON)")
|
|
parser.add_argument(
|
|
"--no-cache-test",
|
|
action="store_true",
|
|
help="Disable cache performance testing (useful for local development)",
|
|
)
|
|
parser.add_argument(
|
|
"--remote",
|
|
action="store_true",
|
|
help="Force remote testing mode (auto-detected for non-localhost URLs)",
|
|
)
|
|
parser.add_argument(
|
|
"--avatar-threshold",
|
|
type=int,
|
|
default=1000,
|
|
help="Avatar generation threshold in ms (default: 1000ms, use 2500 for dev environments)",
|
|
)
|
|
parser.add_argument(
|
|
"--response-threshold",
|
|
type=int,
|
|
default=1000,
|
|
help="Response time threshold in ms (default: 1000ms, use 2500 for dev environments)",
|
|
)
|
|
parser.add_argument(
|
|
"--p95-threshold",
|
|
type=int,
|
|
default=2000,
|
|
help="95th percentile threshold in ms (default: 2000ms, use 5000 for dev environments)",
|
|
)
|
|
parser.add_argument(
|
|
"--ignore-cache-warnings",
|
|
action="store_true",
|
|
help="Don't fail on cache performance warnings (useful for dev environments)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Determine if we should test cache performance
|
|
test_cache = not args.no_cache_test
|
|
|
|
# Determine if we're doing remote testing
|
|
remote_testing = args.remote or not args.base_url.startswith("http://localhost")
|
|
|
|
runner = PerformanceTestRunner(
|
|
base_url=args.base_url,
|
|
concurrent_users=args.concurrent_users,
|
|
test_cache=test_cache,
|
|
remote_testing=remote_testing,
|
|
)
|
|
|
|
results = runner.run_all_tests(args.avatar_threshold, args.response_threshold, args.p95_threshold, args.ignore_cache_warnings)
|
|
|
|
if args.output and results:
|
|
import json
|
|
|
|
with open(args.output, "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
print(f"\nResults saved to {args.output}")
|
|
|
|
# Exit with error code if there were performance issues
|
|
if results and "warnings" in results and len(results["warnings"]) > 0:
|
|
sys.exit(1)
|
|
|
|
return results
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|