mirror of
https://git.linux-kernel.at/oliver/ivatar.git
synced 2025-11-17 05:28:03 +00:00
Major enhancements to scripts/performance_tests.py: 🚀 Features Added: - Complete avatar style coverage (identicon, monsterid, robohash, pagan, retro, wavatar, mm, mmng) - All sizes tested (80px, 256px) for each style - Cache hit/miss tracking and display - Random email generation for realistic testing - Full libravatar URL generation using official library - Professional table output with PrettyTable 📊 Display Improvements: - Perfect alignment with PrettyTable library - Visual dividers between avatar styles - Status icons (✅ success, ⚠️ mixed, ❌ failed) - Cache status indicators (hit/miss/mixed/error) - Email address and example URL display - Grouped results by avatar style with averages 🔧 Technical Improvements: - Integrated libravatar library for URL generation - Replaced manual URL construction with proper library calls - Enhanced error handling and reporting - Added prettytable dependency to requirements.txt - Improved code organization and maintainability 🎯 Testing Coverage: - 8 avatar styles × 2 sizes = 16 test combinations - Cache performance testing with hit/miss analysis - Concurrent load testing with cache statistics - Both local and remote testing modes supported The performance tests now provide comprehensive, professional output that's easy to read and analyze, with complete coverage of all avatar generation functionality.
1001 lines
36 KiB
Python
1001 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Performance testing script for Libravatar CI/CD pipeline
|
|
|
|
This script runs automated performance tests to catch regressions
|
|
and ensure the application meets performance requirements.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import statistics
|
|
import hashlib
|
|
|
|
# Add project root to path
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
# Import utilities
|
|
from ivatar.utils import generate_random_email
|
|
from libravatar import libravatar_url
|
|
from urllib.parse import urlsplit
|
|
from prettytable import PrettyTable
|
|
|
|
|
|
# Django setup - only for local testing
|
|
def setup_django():
|
|
"""Setup Django for local testing"""
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ivatar.settings")
|
|
import django
|
|
|
|
django.setup()
|
|
|
|
|
|
class PerformanceTestRunner:
|
|
"""Main performance test runner"""
|
|
|
|
# Define all avatar styles and sizes to test
|
|
AVATAR_STYLES = [
|
|
"identicon",
|
|
"monsterid",
|
|
"robohash",
|
|
"pagan",
|
|
"retro",
|
|
"wavatar",
|
|
"mm",
|
|
"mmng",
|
|
]
|
|
AVATAR_SIZES = [80, 256]
|
|
|
|
def __init__(
|
|
self,
|
|
base_url="http://localhost:8000",
|
|
concurrent_users=10,
|
|
test_cache=True,
|
|
remote_testing=False,
|
|
):
|
|
self.base_url = base_url
|
|
self.concurrent_users = concurrent_users
|
|
self.test_cache = test_cache
|
|
self.remote_testing = remote_testing
|
|
self.client = None
|
|
self.results = {}
|
|
|
|
# Determine if we're testing locally or remotely
|
|
if remote_testing or not base_url.startswith("http://localhost"):
|
|
self.remote_testing = True
|
|
print(f"🌐 Remote testing mode: {base_url}")
|
|
else:
|
|
print(f"🏠 Local testing mode: {base_url}")
|
|
# Only setup Django and create client for local testing
|
|
setup_django()
|
|
from django.test import Client
|
|
|
|
self.client = Client()
|
|
|
|
def setup_test_data(self):
|
|
"""Create test data for performance tests"""
|
|
print("Setting up test data...")
|
|
|
|
# Import Django models only when needed
|
|
from django.contrib.auth.models import User
|
|
from ivatar.ivataraccount.models import ConfirmedEmail
|
|
|
|
# Create test users and emails
|
|
test_emails = [f"perftest{i}@example.com" for i in range(100)]
|
|
|
|
for i, email in enumerate(test_emails):
|
|
if not User.objects.filter(username=f"perftest{i}").exists():
|
|
user = User.objects.create_user(
|
|
username=f"perftest{i}", email=email, password="testpass123"
|
|
)
|
|
|
|
# Create confirmed email
|
|
ConfirmedEmail.objects.create(
|
|
user=user, email=email, ip_address="127.0.0.1"
|
|
)
|
|
|
|
print(f"Created {len(test_emails)} test users and emails")
|
|
|
|
def _generate_test_cases(self):
|
|
"""Generate test cases for all avatar styles and sizes"""
|
|
test_cases = []
|
|
for style in self.AVATAR_STYLES:
|
|
for size in self.AVATAR_SIZES:
|
|
test_cases.append({"default": style, "size": size})
|
|
return test_cases
|
|
|
|
def _test_single_avatar_request(self, case, email, use_requests=False):
|
|
"""Test a single avatar request - shared logic for local and remote testing"""
|
|
# Use libravatar library to generate the URL
|
|
full_url = libravatar_url(
|
|
email=email, size=case["size"], default=case["default"]
|
|
)
|
|
|
|
# Extract path and query from the full URL
|
|
urlobj = urlsplit(full_url)
|
|
url_path = f"{urlobj.path}?{urlobj.query}"
|
|
|
|
start_time = time.time()
|
|
|
|
if use_requests:
|
|
# Remote testing with requests
|
|
import requests
|
|
|
|
url = f"{self.base_url}{url_path}"
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
end_time = time.time()
|
|
duration = (end_time - start_time) * 1000
|
|
|
|
# Determine cache status from response headers
|
|
cache_detail = response.headers.get("x-cache-detail", "").lower()
|
|
age = response.headers.get("age", "0")
|
|
cache_status = "unknown"
|
|
|
|
if "cache hit" in cache_detail or int(age) > 0:
|
|
cache_status = "hit"
|
|
elif "cache miss" in cache_detail or age == "0":
|
|
cache_status = "miss"
|
|
|
|
return {
|
|
"test": f"{case['default']}_{case['size']}px",
|
|
"duration_ms": duration,
|
|
"status_code": response.status_code,
|
|
"content_length": len(response.content) if response.content else 0,
|
|
"success": response.status_code == 200,
|
|
"cache_status": cache_status,
|
|
"cache_detail": cache_detail,
|
|
"age": age,
|
|
"full_url": full_url,
|
|
"email": email,
|
|
}
|
|
except Exception as e:
|
|
end_time = time.time()
|
|
duration = (end_time - start_time) * 1000
|
|
return {
|
|
"test": f"{case['default']}_{case['size']}px",
|
|
"duration_ms": duration,
|
|
"status_code": 0,
|
|
"success": False,
|
|
"error": str(e),
|
|
"cache_status": "error",
|
|
"full_url": full_url,
|
|
"email": email,
|
|
}
|
|
else:
|
|
# Local testing with Django test client
|
|
response = self.client.get(url_path)
|
|
end_time = time.time()
|
|
duration = (end_time - start_time) * 1000
|
|
|
|
# Check for cache information in response headers
|
|
cache_status = "unknown"
|
|
if hasattr(response, "get") and callable(response.get):
|
|
cache_control = response.get("Cache-Control", "")
|
|
age = response.get("Age", "0")
|
|
if age and int(age) > 0:
|
|
cache_status = "hit"
|
|
elif "no-cache" in cache_control:
|
|
cache_status = "miss"
|
|
else:
|
|
cache_status = "miss" # Default assumption for first generation
|
|
|
|
return {
|
|
"test": f"{case['default']}_{case['size']}px",
|
|
"duration_ms": duration,
|
|
"status_code": response.status_code,
|
|
"content_length": len(response.content) if response.content else 0,
|
|
"cache_status": cache_status,
|
|
"success": response.status_code == 200,
|
|
"full_url": full_url,
|
|
"email": email,
|
|
}
|
|
|
|
def _display_avatar_results(self, results):
|
|
"""Display avatar test results using prettytable for perfect alignment"""
|
|
# Group results by avatar style
|
|
style_results = {}
|
|
for result in results:
|
|
style = result["test"].split("_")[0] # Extract style from test name
|
|
if style not in style_results:
|
|
style_results[style] = []
|
|
style_results[style].append(result)
|
|
|
|
# Create table
|
|
table = PrettyTable()
|
|
table.field_names = ["Avatar Style", "Size", "Time (ms)", "Status", "Cache"]
|
|
table.align["Avatar Style"] = "l"
|
|
table.align["Size"] = "r"
|
|
table.align["Time (ms)"] = "r"
|
|
table.align["Status"] = "c"
|
|
table.align["Cache"] = "c"
|
|
|
|
# Add data to table
|
|
styles_with_data = [
|
|
style for style in self.AVATAR_STYLES if style in style_results
|
|
]
|
|
|
|
for i, style in enumerate(styles_with_data):
|
|
style_data = style_results[style]
|
|
successful_results = [r for r in style_data if r.get("success", True)]
|
|
failed_results = [r for r in style_data if not r.get("success", True)]
|
|
|
|
if successful_results:
|
|
# Calculate average
|
|
avg_duration = statistics.mean(
|
|
[r["duration_ms"] for r in successful_results]
|
|
)
|
|
|
|
# Determine overall cache status
|
|
cache_statuses = [
|
|
r["cache_status"]
|
|
for r in successful_results
|
|
if r["cache_status"] != "unknown"
|
|
]
|
|
if not cache_statuses:
|
|
cache_summary = "unknown"
|
|
elif all(status == "hit" for status in cache_statuses):
|
|
cache_summary = "hit"
|
|
elif all(status == "miss" for status in cache_statuses):
|
|
cache_summary = "miss"
|
|
else:
|
|
cache_summary = "mixed"
|
|
|
|
# Determine status icon for average line
|
|
if len(failed_results) == 0:
|
|
avg_status_icon = "✅" # All successful
|
|
elif len(successful_results) == 0:
|
|
avg_status_icon = "❌" # All failed
|
|
else:
|
|
avg_status_icon = "⚠️" # Mixed results
|
|
|
|
# Add average row
|
|
table.add_row(
|
|
[
|
|
f"{style} (avg)",
|
|
"",
|
|
f"{avg_duration:.2f}",
|
|
avg_status_icon,
|
|
cache_summary,
|
|
]
|
|
)
|
|
|
|
# Add individual size rows
|
|
for result in style_data:
|
|
size = result["test"].split("_")[1] # Extract size from test name
|
|
status_icon = "✅" if result.get("success", True) else "❌"
|
|
cache_status = result["cache_status"]
|
|
|
|
if result.get("success", True):
|
|
table.add_row(
|
|
[
|
|
"",
|
|
size,
|
|
f"{result['duration_ms']:.2f}",
|
|
status_icon,
|
|
cache_status,
|
|
]
|
|
)
|
|
else:
|
|
error_msg = result.get("error", "Failed")
|
|
table.add_row(["", size, error_msg, status_icon, cache_status])
|
|
else:
|
|
# All requests failed
|
|
table.add_row([f"{style} (avg)", "", "Failed", "❌", "error"])
|
|
for result in style_data:
|
|
size = result["test"].split("_")[1]
|
|
error_msg = result.get("error", "Failed")
|
|
table.add_row(["", size, error_msg, "❌", "error"])
|
|
|
|
# Add divider line between styles (except after the last style)
|
|
if i < len(styles_with_data) - 1:
|
|
table.add_row(["-" * 15, "-" * 5, "-" * 9, "-" * 6, "-" * 5])
|
|
|
|
print(table)
|
|
|
|
def test_avatar_generation_performance(self):
|
|
"""Test avatar generation performance"""
|
|
print("\n=== Avatar Generation Performance Test ===")
|
|
|
|
# Generate test cases for all avatar styles and sizes
|
|
test_cases = self._generate_test_cases()
|
|
results = []
|
|
|
|
# Generate random email for testing
|
|
test_email = generate_random_email()
|
|
print(f" Testing with email: {test_email}")
|
|
|
|
for case in test_cases:
|
|
result = self._test_single_avatar_request(
|
|
case, test_email, use_requests=False
|
|
)
|
|
results.append(result)
|
|
|
|
# Show example URL from first result
|
|
if results:
|
|
print(f" Example URL: {results[0]['full_url']}")
|
|
|
|
# Display results grouped by style
|
|
self._display_avatar_results(results)
|
|
|
|
# Calculate statistics
|
|
successful_results = [r for r in results if r.get("success", True)]
|
|
if successful_results:
|
|
durations = [r["duration_ms"] for r in successful_results]
|
|
avg_duration = statistics.mean(durations)
|
|
max_duration = max(durations)
|
|
else:
|
|
avg_duration = 0
|
|
max_duration = 0
|
|
|
|
print(f"\n Average: {avg_duration:.2f}ms")
|
|
print(f" Maximum: {max_duration:.2f}ms")
|
|
|
|
# Performance thresholds
|
|
if avg_duration > 1000: # 1 second
|
|
print(" ⚠️ WARNING: Average avatar generation time exceeds 1s")
|
|
elif avg_duration > 500: # 500ms
|
|
print(" ⚠️ CAUTION: Average avatar generation time exceeds 500ms")
|
|
else:
|
|
print(" ✅ Avatar generation performance is good")
|
|
|
|
self.results["avatar_generation"] = {
|
|
"average_ms": avg_duration,
|
|
"maximum_ms": max_duration,
|
|
"results": results,
|
|
}
|
|
|
|
def test_concurrent_load(self):
|
|
"""Test concurrent load handling"""
|
|
print("\n=== Concurrent Load Test ===")
|
|
|
|
num_requests = 20
|
|
|
|
if self.remote_testing:
|
|
print(f" Running {num_requests} HTTP requests to {self.base_url}...")
|
|
results = self._test_remote_concurrent_load(num_requests)
|
|
else:
|
|
print(f" Running {num_requests} local avatar generations...")
|
|
results = self._test_local_concurrent_load(num_requests)
|
|
|
|
# Analyze results
|
|
successful_requests = [r for r in results if r["success"]]
|
|
failed_requests = [r for r in results if not r["success"]]
|
|
|
|
# Analyze cache performance
|
|
cache_hits = [r for r in results if r.get("cache_status") == "hit"]
|
|
cache_misses = [r for r in results if r.get("cache_status") == "miss"]
|
|
cache_errors = [r for r in results if r.get("cache_status") == "error"]
|
|
|
|
total_duration = (
|
|
sum(r["duration_ms"] for r in results) / 1000
|
|
) # Convert to seconds
|
|
|
|
print(f" Total time: {total_duration:.2f}s")
|
|
print(f" Successful requests: {len(successful_requests)}/{num_requests}")
|
|
print(f" Failed requests: {len(failed_requests)}")
|
|
|
|
# Show cache statistics if available
|
|
if cache_hits or cache_misses:
|
|
print(f" Cache hits: {len(cache_hits)}")
|
|
print(f" Cache misses: {len(cache_misses)}")
|
|
if cache_errors:
|
|
print(f" Cache errors: {len(cache_errors)}")
|
|
|
|
cache_hit_rate = (
|
|
len(cache_hits) / (len(cache_hits) + len(cache_misses)) * 100
|
|
if (cache_hits or cache_misses)
|
|
else 0
|
|
)
|
|
print(f" Cache hit rate: {cache_hit_rate:.1f}%")
|
|
|
|
if successful_requests:
|
|
durations = [r["duration_ms"] for r in successful_requests]
|
|
avg_duration = statistics.mean(durations)
|
|
|
|
# Calculate p95 safely
|
|
if len(durations) >= 2:
|
|
try:
|
|
p95_duration = statistics.quantiles(durations, n=20)[
|
|
18
|
|
] # 95th percentile
|
|
except (ValueError, IndexError):
|
|
p95_duration = max(durations)
|
|
else:
|
|
p95_duration = max(durations)
|
|
|
|
print(f" Average response time: {avg_duration:.2f}ms")
|
|
print(f" 95th percentile: {p95_duration:.2f}ms")
|
|
print(
|
|
f" Operations per second: {len(successful_requests) / total_duration:.2f}"
|
|
)
|
|
|
|
# Performance evaluation
|
|
if len(failed_requests) > 0:
|
|
print(" ⚠️ WARNING: Some operations failed under load")
|
|
elif p95_duration > 2000: # 2 seconds
|
|
print(" ⚠️ WARNING: 95th percentile response time exceeds 2s")
|
|
elif avg_duration > 1000: # 1 second
|
|
print(" ⚠️ CAUTION: Average response time exceeds 1s under load")
|
|
else:
|
|
print(" ✅ Load handling is good")
|
|
else:
|
|
avg_duration = 0
|
|
p95_duration = 0
|
|
print(" ❌ All operations failed")
|
|
|
|
self.results["concurrent_load"] = {
|
|
"total_duration_s": total_duration,
|
|
"successful_requests": len(successful_requests),
|
|
"failed_requests": len(failed_requests),
|
|
"average_ms": avg_duration,
|
|
"p95_ms": p95_duration,
|
|
"requests_per_second": (
|
|
len(successful_requests) / total_duration if total_duration > 0 else 0
|
|
),
|
|
"cache_hits": len(cache_hits),
|
|
"cache_misses": len(cache_misses),
|
|
"cache_errors": len(cache_errors),
|
|
"cache_hit_rate": (
|
|
len(cache_hits) / (len(cache_hits) + len(cache_misses)) * 100
|
|
if (cache_hits or cache_misses)
|
|
else 0
|
|
),
|
|
}
|
|
|
|
def _test_remote_concurrent_load(self, num_requests):
|
|
"""Test concurrent load against remote server"""
|
|
import requests # noqa: F401
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
def make_remote_request(thread_id):
|
|
test_email = generate_random_email()
|
|
|
|
# Use libravatar library to generate the URL
|
|
full_url = libravatar_url(email=test_email, size=80, default="identicon")
|
|
urlobj = urlsplit(full_url)
|
|
url_path = f"{urlobj.path}?{urlobj.query}"
|
|
url = f"{self.base_url}{url_path}"
|
|
|
|
start_time = time.time()
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
end_time = time.time()
|
|
|
|
# Determine cache status
|
|
cache_detail = response.headers.get("x-cache-detail", "").lower()
|
|
age = response.headers.get("age", "0")
|
|
cache_status = "unknown"
|
|
|
|
if "cache hit" in cache_detail or int(age) > 0:
|
|
cache_status = "hit"
|
|
elif "cache miss" in cache_detail or age == "0":
|
|
cache_status = "miss"
|
|
|
|
return {
|
|
"thread_id": thread_id,
|
|
"duration_ms": (end_time - start_time) * 1000,
|
|
"status_code": response.status_code,
|
|
"success": response.status_code == 200,
|
|
"cache_status": cache_status,
|
|
}
|
|
except Exception as e:
|
|
end_time = time.time()
|
|
return {
|
|
"thread_id": thread_id,
|
|
"duration_ms": (end_time - start_time) * 1000,
|
|
"success": False,
|
|
"error": str(e),
|
|
"cache_status": "error",
|
|
}
|
|
|
|
results = []
|
|
with ThreadPoolExecutor(max_workers=self.concurrent_users) as executor:
|
|
futures = [
|
|
executor.submit(make_remote_request, i) for i in range(num_requests)
|
|
]
|
|
|
|
for future in as_completed(futures):
|
|
try:
|
|
result = future.result()
|
|
results.append(result)
|
|
except Exception as e:
|
|
print(f" Request failed: {e}")
|
|
|
|
return results
|
|
|
|
def _test_local_concurrent_load(self, num_requests):
|
|
"""Test concurrent load locally using avatar generation functions"""
|
|
results = []
|
|
|
|
# Import avatar generation functions
|
|
try:
|
|
import Identicon
|
|
|
|
for i in range(num_requests):
|
|
test_email = generate_random_email()
|
|
email_hash = hashlib.md5(test_email.encode()).hexdigest()
|
|
|
|
request_start = time.time()
|
|
try:
|
|
# Test identicon generation directly
|
|
identicon_data = Identicon.render(email_hash)
|
|
request_end = time.time()
|
|
|
|
results.append(
|
|
{
|
|
"thread_id": i,
|
|
"duration_ms": (request_end - request_start) * 1000,
|
|
"success": len(identicon_data) > 0,
|
|
"cache_status": "miss", # Direct generation is always a cache miss
|
|
}
|
|
)
|
|
except Exception as e:
|
|
request_end = time.time()
|
|
results.append(
|
|
{
|
|
"thread_id": i,
|
|
"duration_ms": (request_end - request_start) * 1000,
|
|
"success": False,
|
|
"error": str(e),
|
|
"cache_status": "error",
|
|
}
|
|
)
|
|
|
|
except ImportError:
|
|
# Fallback: just test database queries
|
|
print(
|
|
" Avatar generators not available, testing database queries instead..."
|
|
)
|
|
for i in range(num_requests):
|
|
request_start = time.time()
|
|
try:
|
|
from django.contrib.auth.models import User
|
|
|
|
User.objects.count()
|
|
request_end = time.time()
|
|
|
|
results.append(
|
|
{
|
|
"thread_id": i,
|
|
"duration_ms": (request_end - request_start) * 1000,
|
|
"success": True,
|
|
"cache_status": "n/a", # Database queries don't use image cache
|
|
}
|
|
)
|
|
except Exception as e:
|
|
request_end = time.time()
|
|
results.append(
|
|
{
|
|
"thread_id": i,
|
|
"duration_ms": (request_end - request_start) * 1000,
|
|
"success": False,
|
|
"error": str(e),
|
|
"cache_status": "error",
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
def test_database_performance(self):
|
|
"""Test database query performance"""
|
|
print("\n=== Database Performance Test ===")
|
|
|
|
from django.db import connection
|
|
from django.contrib.auth.models import User
|
|
from ivatar.ivataraccount.models import ConfirmedEmail, Photo
|
|
|
|
# Reset query log
|
|
connection.queries_log.clear()
|
|
|
|
test_queries = [
|
|
{"name": "User count", "query": lambda: User.objects.count()},
|
|
{
|
|
"name": "Email lookup by digest",
|
|
"query": lambda: ConfirmedEmail.objects.filter(
|
|
digest="5d41402abc4b2a76b9719d911017c592"
|
|
).first(),
|
|
},
|
|
{
|
|
"name": "Top 10 photos by access count",
|
|
"query": lambda: list(Photo.objects.order_by("-access_count")[:10]),
|
|
},
|
|
]
|
|
|
|
for test in test_queries:
|
|
start_time = time.time()
|
|
try:
|
|
test["query"]()
|
|
end_time = time.time()
|
|
duration = (end_time - start_time) * 1000
|
|
|
|
print(f" {test['name']}: {duration:.2f}ms")
|
|
|
|
if duration > 100: # 100ms threshold
|
|
print(" ⚠️ WARNING: Query exceeds 100ms threshold")
|
|
|
|
except Exception as e:
|
|
print(f" {test['name']}: ERROR - {e}")
|
|
|
|
# Check for N+1 queries
|
|
query_count = len(connection.queries)
|
|
if query_count > 10:
|
|
print(
|
|
f" ⚠️ WARNING: {query_count} database queries executed (potential N+1 problem)"
|
|
)
|
|
else:
|
|
print(f" ✅ Database query count is reasonable ({query_count} queries)")
|
|
|
|
def test_cache_performance(self):
|
|
"""Test caching effectiveness"""
|
|
if not self.test_cache:
|
|
print("\n=== Cache Performance Test ===")
|
|
print(" ⏭️ Cache testing disabled")
|
|
return
|
|
|
|
print("\n=== Cache Performance Test ===")
|
|
|
|
# Generate a random email address for cache testing
|
|
test_email = generate_random_email()
|
|
print(f" Testing with: {test_email}")
|
|
|
|
if self.remote_testing:
|
|
first_duration, second_duration = self._test_remote_cache_performance(
|
|
test_email
|
|
)
|
|
else:
|
|
first_duration, second_duration = self._test_local_cache_performance(
|
|
test_email
|
|
)
|
|
|
|
print(f" First request: {first_duration:.2f}ms")
|
|
print(f" Second request: {second_duration:.2f}ms")
|
|
|
|
improvement_ratio = (
|
|
first_duration / second_duration if second_duration > 0 else 0
|
|
)
|
|
|
|
# Analyze cache effectiveness based on headers AND timing
|
|
cache_working = False
|
|
cache_status = "unknown"
|
|
|
|
if self.remote_testing and hasattr(self, "cache_info"):
|
|
# For remote testing, check actual cache headers
|
|
first_cached = self.cache_info["first_request"]["is_cached"]
|
|
second_cached = self.cache_info["second_request"]["is_cached"]
|
|
|
|
if not first_cached and second_cached:
|
|
cache_status = "✅ Cache working correctly (miss → hit)"
|
|
cache_working = True
|
|
elif first_cached and second_cached:
|
|
cache_status = "✅ Cache working (both requests cached)"
|
|
cache_working = True
|
|
elif not first_cached and not second_cached:
|
|
cache_status = "⚠️ No cache hits detected"
|
|
cache_working = False
|
|
else:
|
|
cache_status = "⚠️ Unexpected cache behavior"
|
|
cache_working = False
|
|
else:
|
|
# For local testing, fall back to timing-based analysis
|
|
if improvement_ratio >= 1.5:
|
|
cache_status = "✅ Caching appears to be working (timing-based)"
|
|
cache_working = True
|
|
else:
|
|
cache_status = (
|
|
"⚠️ Caching may not be working as expected (timing-based)"
|
|
)
|
|
cache_working = False
|
|
|
|
print(f" {cache_status}")
|
|
if improvement_ratio > 1:
|
|
print(f" Performance improvement: {improvement_ratio:.1f}x faster")
|
|
|
|
self.results["cache_performance"] = {
|
|
"first_request_ms": first_duration,
|
|
"second_request_ms": second_duration,
|
|
"improvement_ratio": improvement_ratio,
|
|
"cache_working": cache_working,
|
|
"cache_status": cache_status,
|
|
"cache_headers": getattr(self, "cache_info", {}),
|
|
}
|
|
|
|
def _test_remote_cache_performance(self, email):
|
|
"""Test cache performance against remote server"""
|
|
import requests
|
|
|
|
# Use libravatar library to generate the URL
|
|
full_url = libravatar_url(email=email, size=80, default="identicon")
|
|
urlobj = urlsplit(full_url)
|
|
url_path = f"{urlobj.path}?{urlobj.query}"
|
|
url = f"{self.base_url}{url_path}"
|
|
|
|
# First request (should be cache miss or fresh)
|
|
start_time = time.time()
|
|
response1 = requests.get(url, timeout=10)
|
|
first_duration = (time.time() - start_time) * 1000
|
|
|
|
# Check first request headers
|
|
first_cache_detail = response1.headers.get("x-cache-detail", "unknown")
|
|
first_age = response1.headers.get("age", "0")
|
|
first_cache_control = response1.headers.get("cache-control", "none")
|
|
|
|
print(" First request headers:")
|
|
print(f" x-cache-detail: {first_cache_detail}")
|
|
print(f" age: {first_age}")
|
|
print(f" cache-control: {first_cache_control}")
|
|
|
|
# Small delay to ensure any processing is complete
|
|
time.sleep(0.1)
|
|
|
|
# Second request (should be cache hit)
|
|
start_time = time.time()
|
|
response2 = requests.get(url, timeout=10)
|
|
second_duration = (time.time() - start_time) * 1000
|
|
|
|
# Check second request headers
|
|
second_cache_detail = response2.headers.get("x-cache-detail", "unknown")
|
|
second_age = response2.headers.get("age", "0")
|
|
second_cache_control = response2.headers.get("cache-control", "none")
|
|
|
|
print(" Second request headers:")
|
|
print(f" x-cache-detail: {second_cache_detail}")
|
|
print(f" age: {second_age}")
|
|
print(f" cache-control: {second_cache_control}")
|
|
|
|
# Determine if we actually got cache hits
|
|
first_is_cached = (
|
|
"cache hit" in first_cache_detail.lower() or int(first_age) > 0
|
|
)
|
|
second_is_cached = (
|
|
"cache hit" in second_cache_detail.lower() or int(second_age) > 0
|
|
)
|
|
|
|
print(" Cache analysis:")
|
|
print(
|
|
f" First request: {'Cache HIT' if first_is_cached else 'Cache MISS'}"
|
|
)
|
|
print(
|
|
f" Second request: {'Cache HIT' if second_is_cached else 'Cache MISS'}"
|
|
)
|
|
|
|
# Store cache information for analysis
|
|
self.cache_info = {
|
|
"first_request": {
|
|
"cache_detail": first_cache_detail,
|
|
"age": first_age,
|
|
"is_cached": first_is_cached,
|
|
},
|
|
"second_request": {
|
|
"cache_detail": second_cache_detail,
|
|
"age": second_age,
|
|
"is_cached": second_is_cached,
|
|
},
|
|
}
|
|
|
|
return first_duration, second_duration
|
|
|
|
def _test_local_cache_performance(self, email):
|
|
"""Test cache performance locally"""
|
|
# Use libravatar library to generate the URL
|
|
full_url = libravatar_url(email=email, size=80, default="identicon")
|
|
urlobj = urlsplit(full_url)
|
|
url_path = f"{urlobj.path}?{urlobj.query}"
|
|
|
|
# First request (cache miss)
|
|
start_time = time.time()
|
|
self.client.get(url_path)
|
|
first_duration = (time.time() - start_time) * 1000
|
|
|
|
# Second request (should be cache hit)
|
|
start_time = time.time()
|
|
self.client.get(url_path)
|
|
second_duration = (time.time() - start_time) * 1000
|
|
|
|
return first_duration, second_duration
|
|
|
|
def run_all_tests(self):
|
|
"""Run all performance tests"""
|
|
print("Starting Libravatar Performance Tests")
|
|
print("=" * 50)
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Only setup test data for local testing
|
|
if not self.remote_testing:
|
|
self.setup_test_data()
|
|
|
|
# Run tests based on mode
|
|
if self.remote_testing:
|
|
print("🌐 Running remote server tests...")
|
|
self.test_remote_avatar_performance()
|
|
else:
|
|
print("🏠 Running local tests...")
|
|
self.test_avatar_generation_performance()
|
|
self.test_database_performance()
|
|
|
|
# Always test concurrent load
|
|
self.test_concurrent_load()
|
|
|
|
# Test cache performance if enabled
|
|
self.test_cache_performance()
|
|
|
|
end_time = time.time()
|
|
total_duration = end_time - start_time
|
|
|
|
print("\n" + "=" * 50)
|
|
print(f"Performance tests completed in {total_duration:.2f}s")
|
|
|
|
# Overall assessment
|
|
self.assess_overall_performance()
|
|
|
|
return self.results
|
|
|
|
except Exception as e:
|
|
print(f"Performance test failed: {e}")
|
|
return None
|
|
|
|
def test_remote_avatar_performance(self):
|
|
"""Test avatar generation performance on remote server"""
|
|
print("\n=== Remote Avatar Performance Test ===")
|
|
|
|
# Generate test cases for all avatar styles and sizes
|
|
test_cases = self._generate_test_cases()
|
|
results = []
|
|
|
|
# Generate random email for testing
|
|
test_email = generate_random_email()
|
|
print(f" Testing with email: {test_email}")
|
|
|
|
for case in test_cases:
|
|
result = self._test_single_avatar_request(
|
|
case, test_email, use_requests=True
|
|
)
|
|
results.append(result)
|
|
|
|
# Show example URL from first result
|
|
if results:
|
|
print(f" Example URL: {results[0]['full_url']}")
|
|
|
|
# Display results grouped by style
|
|
self._display_avatar_results(results)
|
|
|
|
# Calculate statistics for successful requests
|
|
successful_results = [r for r in results if r["success"]]
|
|
if successful_results:
|
|
durations = [r["duration_ms"] for r in successful_results]
|
|
avg_duration = statistics.mean(durations)
|
|
max_duration = max(durations)
|
|
|
|
print(f"\n Average: {avg_duration:.2f}ms")
|
|
print(f" Maximum: {max_duration:.2f}ms")
|
|
print(f" Success rate: {len(successful_results)}/{len(results)}")
|
|
|
|
# Performance thresholds for remote testing
|
|
if avg_duration > 2000: # 2 seconds
|
|
print(" ⚠️ WARNING: Average response time exceeds 2s")
|
|
elif avg_duration > 1000: # 1 second
|
|
print(" ⚠️ CAUTION: Average response time exceeds 1s")
|
|
else:
|
|
print(" ✅ Remote avatar performance is good")
|
|
else:
|
|
avg_duration = 0
|
|
max_duration = 0
|
|
print(" ❌ All remote requests failed")
|
|
|
|
self.results["avatar_generation"] = {
|
|
"average_ms": avg_duration,
|
|
"maximum_ms": max_duration,
|
|
"results": results,
|
|
"success_rate": len(successful_results) / len(results) if results else 0,
|
|
}
|
|
|
|
def assess_overall_performance(self):
|
|
"""Provide overall performance assessment"""
|
|
print("\n=== OVERALL PERFORMANCE ASSESSMENT ===")
|
|
|
|
warnings = []
|
|
|
|
# Check avatar generation
|
|
if "avatar_generation" in self.results:
|
|
avg_gen = self.results["avatar_generation"]["average_ms"]
|
|
if avg_gen > 1000:
|
|
warnings.append(f"Avatar generation is slow ({avg_gen:.0f}ms average)")
|
|
|
|
# Check concurrent load
|
|
if "concurrent_load" in self.results:
|
|
failed = self.results["concurrent_load"]["failed_requests"]
|
|
if failed > 0:
|
|
warnings.append(f"{failed} requests failed under concurrent load")
|
|
|
|
# Check cache performance
|
|
if "cache_performance" in self.results:
|
|
cache_working = self.results["cache_performance"].get(
|
|
"cache_working", False
|
|
)
|
|
if not cache_working:
|
|
warnings.append("Caching may not be working effectively")
|
|
|
|
if warnings:
|
|
print("⚠️ Performance Issues Found:")
|
|
for warning in warnings:
|
|
print(f" - {warning}") # noqa: E221
|
|
print("\nRecommendations:")
|
|
print(" - Review database indexes and query optimization")
|
|
print(" - Check caching configuration")
|
|
print(" - Consider async processing for heavy operations")
|
|
else:
|
|
print("✅ Overall performance is good!")
|
|
print(" - Avatar generation is responsive")
|
|
print(" - Application handles concurrent load well")
|
|
print(" - Caching is working effectively")
|
|
|
|
# Store warnings in results for main function to check
|
|
self.results["warnings"] = warnings
|
|
return len(warnings) > 0
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Run Libravatar performance tests")
|
|
parser.add_argument(
|
|
"--base-url",
|
|
default="http://localhost:8000",
|
|
help="Base URL for testing (default: http://localhost:8000)",
|
|
)
|
|
parser.add_argument(
|
|
"--concurrent-users",
|
|
type=int,
|
|
default=10,
|
|
help="Number of concurrent users to simulate (default: 10)",
|
|
)
|
|
parser.add_argument("--output", help="Output file for results (JSON)")
|
|
parser.add_argument(
|
|
"--no-cache-test",
|
|
action="store_true",
|
|
help="Disable cache performance testing (useful for local development)",
|
|
)
|
|
parser.add_argument(
|
|
"--remote",
|
|
action="store_true",
|
|
help="Force remote testing mode (auto-detected for non-localhost URLs)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Determine if we should test cache performance
|
|
test_cache = not args.no_cache_test
|
|
|
|
# Determine if we're doing remote testing
|
|
remote_testing = args.remote or not args.base_url.startswith("http://localhost")
|
|
|
|
runner = PerformanceTestRunner(
|
|
base_url=args.base_url,
|
|
concurrent_users=args.concurrent_users,
|
|
test_cache=test_cache,
|
|
remote_testing=remote_testing,
|
|
)
|
|
|
|
results = runner.run_all_tests()
|
|
|
|
if args.output and results:
|
|
import json
|
|
|
|
with open(args.output, "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
print(f"\nResults saved to {args.output}")
|
|
|
|
# Exit with error code if there were performance issues
|
|
if results and "warnings" in results and len(results["warnings"]) > 0:
|
|
sys.exit(1)
|
|
|
|
return results
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|