mirror of
https://git.linux-kernel.at/oliver/ivatar.git
synced 2025-11-11 10:46:24 +00:00
Add performance tests
This commit is contained in:
@@ -112,6 +112,91 @@ pages:
|
||||
# - docker build --pull -t "$CI_REGISTRY_IMAGE${tag}" .
|
||||
# - docker push "$CI_REGISTRY_IMAGE${tag}"
|
||||
|
||||
# Local performance testing job (runs in CI environment)
|
||||
performance_tests_local:
|
||||
stage: test
|
||||
services:
|
||||
- postgres:latest
|
||||
variables:
|
||||
POSTGRES_DB: django_db
|
||||
POSTGRES_USER: django_user
|
||||
POSTGRES_PASSWORD: django_password
|
||||
POSTGRES_HOST: postgres
|
||||
DATABASE_URL: "postgres://django_user:django_password@postgres/django_db"
|
||||
PYTHONUNBUFFERED: 1
|
||||
# OpenTelemetry configuration for performance testing
|
||||
OTEL_EXPORT_ENABLED: "false"
|
||||
OTEL_SERVICE_NAME: "ivatar-perf-test-local"
|
||||
OTEL_ENVIRONMENT: "ci-performance"
|
||||
before_script:
|
||||
- virtualenv -p python3 /tmp/.virtualenv
|
||||
- source /tmp/.virtualenv/bin/activate
|
||||
- pip install -U pip
|
||||
- pip install Pillow
|
||||
- pip install -r requirements.txt
|
||||
- pip install requests # Additional dependency for performance tests
|
||||
script:
|
||||
- source /tmp/.virtualenv/bin/activate
|
||||
- echo 'from ivatar.settings import TEMPLATES' > config_local.py
|
||||
- echo 'TEMPLATES[0]["OPTIONS"]["debug"] = True' >> config_local.py
|
||||
- echo "DEBUG = True" >> config_local.py
|
||||
- echo "from config import CACHES" >> config_local.py
|
||||
- echo "CACHES['default'] = CACHES['filesystem']" >> config_local.py
|
||||
- python manage.py migrate
|
||||
- python manage.py collectstatic --noinput
|
||||
- echo "Running local performance tests (no cache testing)..."
|
||||
- python3 scripts/performance_tests.py --no-cache-test --output performance_local.json
|
||||
artifacts:
|
||||
paths:
|
||||
- performance_local.json
|
||||
expire_in: 7 days
|
||||
allow_failure: true # Don't fail the pipeline on performance issues, but report them
|
||||
|
||||
# Performance testing against dev server (devel branch only)
|
||||
performance_tests_dev:
|
||||
stage: deploy
|
||||
image: python:3.11-alpine
|
||||
only:
|
||||
- devel
|
||||
variables:
|
||||
DEV_URL: "https://dev.libravatar.org"
|
||||
before_script:
|
||||
- apk add --no-cache curl
|
||||
- pip install requests
|
||||
script:
|
||||
- echo "Running performance tests against dev.libravatar.org..."
|
||||
- python3 scripts/performance_tests.py --base-url $DEV_URL --concurrent-users 5 --output performance_dev.json
|
||||
artifacts:
|
||||
paths:
|
||||
- performance_dev.json
|
||||
expire_in: 7 days
|
||||
allow_failure: true # Don't fail deployment on performance issues
|
||||
needs:
|
||||
- verify_dev_deployment # Run after deployment verification
|
||||
|
||||
# Performance testing against production server (master branch only)
|
||||
performance_tests_prod:
|
||||
stage: deploy
|
||||
image: python:3.11-alpine
|
||||
only:
|
||||
- master
|
||||
when: manual # Manual trigger to avoid impacting production unnecessarily
|
||||
variables:
|
||||
PROD_URL: "https://libravatar.org"
|
||||
before_script:
|
||||
- apk add --no-cache curl
|
||||
- pip install requests
|
||||
script:
|
||||
- echo "Running performance tests against libravatar.org..."
|
||||
- python3 scripts/performance_tests.py --base-url $PROD_URL --concurrent-users 3 --output performance_prod.json
|
||||
artifacts:
|
||||
paths:
|
||||
- performance_prod.json
|
||||
expire_in: 30 days # Keep production results longer
|
||||
allow_failure: true # Don't fail deployment on performance issues
|
||||
needs:
|
||||
- verify_prod_deployment # Run after deployment verification
|
||||
|
||||
# Deployment verification jobs
|
||||
verify_dev_deployment:
|
||||
stage: deploy
|
||||
|
||||
@@ -262,7 +262,9 @@ class FileValidator:
|
||||
if not magic_results["valid"]:
|
||||
results["valid"] = False
|
||||
results["errors"].extend(magic_results["errors"])
|
||||
results["security_score"] -= 10 # Reduced from 25 - basic format issue, not security threat
|
||||
results[
|
||||
"security_score"
|
||||
] -= 10 # Reduced from 25 - basic format issue, not security threat
|
||||
|
||||
results["file_info"]["detected_type"] = magic_results["detected_type"]
|
||||
|
||||
@@ -271,7 +273,9 @@ class FileValidator:
|
||||
if not mime_results["valid"]:
|
||||
results["valid"] = False
|
||||
results["errors"].extend(mime_results["errors"])
|
||||
results["security_score"] -= 10 # Reduced from 20 - basic format issue, not security threat
|
||||
results[
|
||||
"security_score"
|
||||
] -= 10 # Reduced from 20 - basic format issue, not security threat
|
||||
|
||||
results["file_info"]["detected_mime"] = mime_results["detected_mime"]
|
||||
results["warnings"].extend(mime_results.get("warnings", []))
|
||||
@@ -281,7 +285,9 @@ class FileValidator:
|
||||
if not pil_results["valid"]:
|
||||
results["valid"] = False
|
||||
results["errors"].extend(pil_results["errors"])
|
||||
results["security_score"] -= 10 # Reduced from 15 - basic format issue, not security threat
|
||||
results[
|
||||
"security_score"
|
||||
] -= 10 # Reduced from 15 - basic format issue, not security threat
|
||||
|
||||
results["file_info"]["image_info"] = pil_results["image_info"]
|
||||
results["warnings"].extend(pil_results.get("warnings", []))
|
||||
|
||||
@@ -126,9 +126,9 @@ class UploadPhotoForm(forms.Form):
|
||||
# Read file data
|
||||
try:
|
||||
# Handle different file types
|
||||
if hasattr(photo, 'read'):
|
||||
if hasattr(photo, "read"):
|
||||
file_data = photo.read()
|
||||
elif hasattr(photo, 'file'):
|
||||
elif hasattr(photo, "file"):
|
||||
file_data = photo.file.read()
|
||||
else:
|
||||
file_data = bytes(photo)
|
||||
@@ -159,7 +159,9 @@ class UploadPhotoForm(forms.Form):
|
||||
else:
|
||||
# For format issues, don't raise ValidationError - let Photo.save() handle it
|
||||
# This preserves the original error handling behavior
|
||||
logger.info(f"File format issue detected, allowing Photo.save() to handle: {validation_results['errors']}")
|
||||
logger.info(
|
||||
f"File format issue detected, allowing Photo.save() to handle: {validation_results['errors']}"
|
||||
)
|
||||
# Store the validation results for potential use, but don't reject the form
|
||||
self.validation_results = validation_results
|
||||
self.file_data = file_data
|
||||
|
||||
@@ -93,7 +93,7 @@ def is_commit_newer_or_equal(commit1: str, commit2: str) -> Optional[bool]:
|
||||
except subprocess.CalledProcessError:
|
||||
# If the above fails, try the reverse - check if commit2 is newer
|
||||
try:
|
||||
result = subprocess.run(
|
||||
subprocess.run(
|
||||
["git", "merge-base", "--is-ancestor", commit1, commit2],
|
||||
capture_output=True,
|
||||
check=True,
|
||||
|
||||
775
scripts/performance_tests.py
Normal file
775
scripts/performance_tests.py
Normal file
@@ -0,0 +1,775 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Performance testing script for Libravatar CI/CD pipeline
|
||||
|
||||
This script runs automated performance tests to catch regressions
|
||||
and ensure the application meets performance requirements.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import statistics
|
||||
import hashlib
|
||||
|
||||
# Add project root to path
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
# Django setup
|
||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ivatar.settings")
|
||||
import django
|
||||
|
||||
django.setup()
|
||||
|
||||
from django.test import Client
|
||||
from django.contrib.auth.models import User
|
||||
from ivatar.ivataraccount.models import ConfirmedEmail, Photo
|
||||
|
||||
|
||||
class PerformanceTestRunner:
|
||||
"""Main performance test runner"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url="http://localhost:8000",
|
||||
concurrent_users=10,
|
||||
test_cache=True,
|
||||
remote_testing=False,
|
||||
):
|
||||
self.base_url = base_url
|
||||
self.concurrent_users = concurrent_users
|
||||
self.test_cache = test_cache
|
||||
self.remote_testing = remote_testing
|
||||
self.client = Client() if not remote_testing else None
|
||||
self.results = {}
|
||||
|
||||
# Determine if we're testing locally or remotely
|
||||
if remote_testing or not base_url.startswith("http://localhost"):
|
||||
self.remote_testing = True
|
||||
print(f"🌐 Remote testing mode: {base_url}")
|
||||
else:
|
||||
print(f"🏠 Local testing mode: {base_url}")
|
||||
|
||||
def setup_test_data(self):
|
||||
"""Create test data for performance tests"""
|
||||
print("Setting up test data...")
|
||||
|
||||
# Create test users and emails
|
||||
test_emails = [f"perftest{i}@example.com" for i in range(100)]
|
||||
|
||||
for i, email in enumerate(test_emails):
|
||||
if not User.objects.filter(username=f"perftest{i}").exists():
|
||||
user = User.objects.create_user(
|
||||
username=f"perftest{i}", email=email, password="testpass123"
|
||||
)
|
||||
|
||||
# Create confirmed email
|
||||
ConfirmedEmail.objects.create(
|
||||
user=user, email=email, ip_address="127.0.0.1"
|
||||
)
|
||||
|
||||
print(f"Created {len(test_emails)} test users and emails")
|
||||
|
||||
def test_avatar_generation_performance(self):
|
||||
"""Test avatar generation performance"""
|
||||
print("\n=== Avatar Generation Performance Test ===")
|
||||
|
||||
# Test different avatar types and sizes
|
||||
test_cases = [
|
||||
{"default": "identicon", "size": 80},
|
||||
{"default": "monsterid", "size": 80},
|
||||
{"default": "robohash", "size": 80},
|
||||
{"default": "identicon", "size": 256},
|
||||
{"default": "monsterid", "size": 256},
|
||||
]
|
||||
|
||||
results = []
|
||||
|
||||
for case in test_cases:
|
||||
# Generate test hash
|
||||
test_email = "perftest@example.com"
|
||||
email_hash = hashlib.md5(test_email.encode()).hexdigest()
|
||||
|
||||
# Build URL
|
||||
url = f"/avatar/{email_hash}"
|
||||
params = {"d": case["default"], "s": case["size"]}
|
||||
|
||||
# Time the request
|
||||
start_time = time.time()
|
||||
response = self.client.get(url, params)
|
||||
end_time = time.time()
|
||||
|
||||
duration = (end_time - start_time) * 1000 # Convert to ms
|
||||
|
||||
results.append(
|
||||
{
|
||||
"test": f"{case['default']}_{case['size']}px",
|
||||
"duration_ms": duration,
|
||||
"status_code": response.status_code,
|
||||
"content_length": len(response.content) if response.content else 0,
|
||||
}
|
||||
)
|
||||
|
||||
print(f" {case['default']} ({case['size']}px): {duration:.2f}ms")
|
||||
|
||||
# Calculate statistics
|
||||
durations = [r["duration_ms"] for r in results]
|
||||
avg_duration = statistics.mean(durations)
|
||||
max_duration = max(durations)
|
||||
|
||||
print(f"\n Average: {avg_duration:.2f}ms")
|
||||
print(f" Maximum: {max_duration:.2f}ms")
|
||||
|
||||
# Performance thresholds
|
||||
if avg_duration > 1000: # 1 second
|
||||
print(" ⚠️ WARNING: Average avatar generation time exceeds 1s")
|
||||
elif avg_duration > 500: # 500ms
|
||||
print(" ⚠️ CAUTION: Average avatar generation time exceeds 500ms")
|
||||
else:
|
||||
print(" ✅ Avatar generation performance is good")
|
||||
|
||||
self.results["avatar_generation"] = {
|
||||
"average_ms": avg_duration,
|
||||
"maximum_ms": max_duration,
|
||||
"results": results,
|
||||
}
|
||||
|
||||
def test_concurrent_load(self):
|
||||
"""Test concurrent load handling"""
|
||||
print("\n=== Concurrent Load Test ===")
|
||||
|
||||
num_requests = 20
|
||||
|
||||
if self.remote_testing:
|
||||
print(f" Running {num_requests} HTTP requests to {self.base_url}...")
|
||||
results = self._test_remote_concurrent_load(num_requests)
|
||||
else:
|
||||
print(f" Running {num_requests} local avatar generations...")
|
||||
results = self._test_local_concurrent_load(num_requests)
|
||||
|
||||
# Analyze results
|
||||
successful_requests = [r for r in results if r["success"]]
|
||||
failed_requests = [r for r in results if not r["success"]]
|
||||
|
||||
total_duration = (
|
||||
sum(r["duration_ms"] for r in results) / 1000
|
||||
) # Convert to seconds
|
||||
|
||||
print(f" Total time: {total_duration:.2f}s")
|
||||
print(f" Successful requests: {len(successful_requests)}/{num_requests}")
|
||||
print(f" Failed requests: {len(failed_requests)}")
|
||||
|
||||
if successful_requests:
|
||||
durations = [r["duration_ms"] for r in successful_requests]
|
||||
avg_duration = statistics.mean(durations)
|
||||
|
||||
# Calculate p95 safely
|
||||
if len(durations) >= 2:
|
||||
try:
|
||||
p95_duration = statistics.quantiles(durations, n=20)[
|
||||
18
|
||||
] # 95th percentile
|
||||
except (ValueError, IndexError):
|
||||
p95_duration = max(durations)
|
||||
else:
|
||||
p95_duration = max(durations)
|
||||
|
||||
print(f" Average response time: {avg_duration:.2f}ms")
|
||||
print(f" 95th percentile: {p95_duration:.2f}ms")
|
||||
print(
|
||||
f" Operations per second: {len(successful_requests) / total_duration:.2f}"
|
||||
)
|
||||
|
||||
# Performance evaluation
|
||||
if len(failed_requests) > 0:
|
||||
print(" ⚠️ WARNING: Some operations failed under load")
|
||||
elif p95_duration > 2000: # 2 seconds
|
||||
print(" ⚠️ WARNING: 95th percentile response time exceeds 2s")
|
||||
elif avg_duration > 1000: # 1 second
|
||||
print(" ⚠️ CAUTION: Average response time exceeds 1s under load")
|
||||
else:
|
||||
print(" ✅ Load handling is good")
|
||||
else:
|
||||
avg_duration = 0
|
||||
p95_duration = 0
|
||||
print(" ❌ All operations failed")
|
||||
|
||||
self.results["concurrent_load"] = {
|
||||
"total_duration_s": total_duration,
|
||||
"successful_requests": len(successful_requests),
|
||||
"failed_requests": len(failed_requests),
|
||||
"average_ms": avg_duration,
|
||||
"p95_ms": p95_duration,
|
||||
"requests_per_second": (
|
||||
len(successful_requests) / total_duration if total_duration > 0 else 0
|
||||
),
|
||||
}
|
||||
|
||||
def _test_remote_concurrent_load(self, num_requests):
|
||||
"""Test concurrent load against remote server"""
|
||||
import requests # noqa: F401
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
def make_remote_request(thread_id):
|
||||
test_email = f"perftest{thread_id % 10}@example.com"
|
||||
email_hash = hashlib.md5(test_email.encode()).hexdigest()
|
||||
url = f"{self.base_url}/avatar/{email_hash}"
|
||||
params = {"d": "identicon", "s": 80}
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
end_time = time.time()
|
||||
|
||||
return {
|
||||
"thread_id": thread_id,
|
||||
"duration_ms": (end_time - start_time) * 1000,
|
||||
"status_code": response.status_code,
|
||||
"success": response.status_code == 200,
|
||||
}
|
||||
except Exception as e:
|
||||
end_time = time.time()
|
||||
return {
|
||||
"thread_id": thread_id,
|
||||
"duration_ms": (end_time - start_time) * 1000,
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
results = []
|
||||
with ThreadPoolExecutor(max_workers=self.concurrent_users) as executor:
|
||||
futures = [
|
||||
executor.submit(make_remote_request, i) for i in range(num_requests)
|
||||
]
|
||||
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
result = future.result()
|
||||
results.append(result)
|
||||
except Exception as e:
|
||||
print(f" Request failed: {e}")
|
||||
|
||||
return results
|
||||
|
||||
def _test_local_concurrent_load(self, num_requests):
|
||||
"""Test concurrent load locally using avatar generation functions"""
|
||||
results = []
|
||||
|
||||
# Import avatar generation functions
|
||||
try:
|
||||
import Identicon
|
||||
|
||||
for i in range(num_requests):
|
||||
test_email = f"perftest{i % 10}@example.com"
|
||||
email_hash = hashlib.md5(test_email.encode()).hexdigest()
|
||||
|
||||
request_start = time.time()
|
||||
try:
|
||||
# Test identicon generation directly
|
||||
identicon_data = Identicon.render(email_hash)
|
||||
request_end = time.time()
|
||||
|
||||
results.append(
|
||||
{
|
||||
"thread_id": i,
|
||||
"duration_ms": (request_end - request_start) * 1000,
|
||||
"success": len(identicon_data) > 0,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
request_end = time.time()
|
||||
results.append(
|
||||
{
|
||||
"thread_id": i,
|
||||
"duration_ms": (request_end - request_start) * 1000,
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
}
|
||||
)
|
||||
|
||||
except ImportError:
|
||||
# Fallback: just test database queries
|
||||
print(
|
||||
" Avatar generators not available, testing database queries instead..."
|
||||
)
|
||||
for i in range(num_requests):
|
||||
request_start = time.time()
|
||||
try:
|
||||
from django.contrib.auth.models import User
|
||||
|
||||
User.objects.count()
|
||||
request_end = time.time()
|
||||
|
||||
results.append(
|
||||
{
|
||||
"thread_id": i,
|
||||
"duration_ms": (request_end - request_start) * 1000,
|
||||
"success": True,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
request_end = time.time()
|
||||
results.append(
|
||||
{
|
||||
"thread_id": i,
|
||||
"duration_ms": (request_end - request_start) * 1000,
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
}
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def test_database_performance(self):
|
||||
"""Test database query performance"""
|
||||
print("\n=== Database Performance Test ===")
|
||||
|
||||
from django.db import connection
|
||||
|
||||
# Reset query log
|
||||
connection.queries_log.clear()
|
||||
|
||||
test_queries = [
|
||||
{"name": "User count", "query": lambda: User.objects.count()},
|
||||
{
|
||||
"name": "Email lookup by digest",
|
||||
"query": lambda: ConfirmedEmail.objects.filter(
|
||||
digest="5d41402abc4b2a76b9719d911017c592"
|
||||
).first(),
|
||||
},
|
||||
{
|
||||
"name": "Top 10 photos by access count",
|
||||
"query": lambda: list(Photo.objects.order_by("-access_count")[:10]),
|
||||
},
|
||||
]
|
||||
|
||||
for test in test_queries:
|
||||
start_time = time.time()
|
||||
try:
|
||||
test["query"]()
|
||||
end_time = time.time()
|
||||
duration = (end_time - start_time) * 1000
|
||||
|
||||
print(f" {test['name']}: {duration:.2f}ms")
|
||||
|
||||
if duration > 100: # 100ms threshold
|
||||
print(" ⚠️ WARNING: Query exceeds 100ms threshold")
|
||||
|
||||
except Exception as e:
|
||||
print(f" {test['name']}: ERROR - {e}")
|
||||
|
||||
# Check for N+1 queries
|
||||
query_count = len(connection.queries)
|
||||
if query_count > 10:
|
||||
print(
|
||||
f" ⚠️ WARNING: {query_count} database queries executed (potential N+1 problem)"
|
||||
)
|
||||
else:
|
||||
print(f" ✅ Database query count is reasonable ({query_count} queries)")
|
||||
|
||||
def test_cache_performance(self):
|
||||
"""Test caching effectiveness"""
|
||||
if not self.test_cache:
|
||||
print("\n=== Cache Performance Test ===")
|
||||
print(" ⏭️ Cache testing disabled")
|
||||
return
|
||||
|
||||
print("\n=== Cache Performance Test ===")
|
||||
|
||||
# Use an actual email address that exists in the system
|
||||
test_email = "dev@libravatar.org"
|
||||
email_hash = hashlib.md5(test_email.encode()).hexdigest()
|
||||
print(f" Testing with: {test_email}")
|
||||
|
||||
if self.remote_testing:
|
||||
first_duration, second_duration = self._test_remote_cache_performance(
|
||||
email_hash
|
||||
)
|
||||
else:
|
||||
first_duration, second_duration = self._test_local_cache_performance(
|
||||
email_hash
|
||||
)
|
||||
|
||||
print(f" First request: {first_duration:.2f}ms")
|
||||
print(f" Second request: {second_duration:.2f}ms")
|
||||
|
||||
improvement_ratio = (
|
||||
first_duration / second_duration if second_duration > 0 else 0
|
||||
)
|
||||
|
||||
# Analyze cache effectiveness based on headers AND timing
|
||||
cache_working = False
|
||||
cache_status = "unknown"
|
||||
|
||||
if self.remote_testing and hasattr(self, "cache_info"):
|
||||
# For remote testing, check actual cache headers
|
||||
first_cached = self.cache_info["first_request"]["is_cached"]
|
||||
second_cached = self.cache_info["second_request"]["is_cached"]
|
||||
|
||||
if not first_cached and second_cached:
|
||||
cache_status = "✅ Cache working correctly (miss → hit)"
|
||||
cache_working = True
|
||||
elif first_cached and second_cached:
|
||||
cache_status = "✅ Cache working (both requests cached)"
|
||||
cache_working = True
|
||||
elif not first_cached and not second_cached:
|
||||
cache_status = "⚠️ No cache hits detected"
|
||||
cache_working = False
|
||||
else:
|
||||
cache_status = "⚠️ Unexpected cache behavior"
|
||||
cache_working = False
|
||||
else:
|
||||
# For local testing, fall back to timing-based analysis
|
||||
if improvement_ratio >= 1.5:
|
||||
cache_status = "✅ Caching appears to be working (timing-based)"
|
||||
cache_working = True
|
||||
else:
|
||||
cache_status = (
|
||||
"⚠️ Caching may not be working as expected (timing-based)"
|
||||
)
|
||||
cache_working = False
|
||||
|
||||
print(f" {cache_status}")
|
||||
if improvement_ratio > 1:
|
||||
print(f" Performance improvement: {improvement_ratio:.1f}x faster")
|
||||
|
||||
self.results["cache_performance"] = {
|
||||
"first_request_ms": first_duration,
|
||||
"second_request_ms": second_duration,
|
||||
"improvement_ratio": improvement_ratio,
|
||||
"cache_working": cache_working,
|
||||
"cache_status": cache_status,
|
||||
"cache_headers": getattr(self, "cache_info", {}),
|
||||
}
|
||||
|
||||
def _test_remote_cache_performance(self, email_hash):
|
||||
"""Test cache performance against remote server"""
|
||||
import requests
|
||||
|
||||
url = f"{self.base_url}/avatar/{email_hash}"
|
||||
params = {"d": "identicon", "s": 80}
|
||||
|
||||
# First request (should be cache miss or fresh)
|
||||
start_time = time.time()
|
||||
response1 = requests.get(url, params=params, timeout=10)
|
||||
first_duration = (time.time() - start_time) * 1000
|
||||
|
||||
# Check first request headers
|
||||
first_cache_detail = response1.headers.get("x-cache-detail", "unknown")
|
||||
first_age = response1.headers.get("age", "0")
|
||||
first_cache_control = response1.headers.get("cache-control", "none")
|
||||
|
||||
print(" First request headers:")
|
||||
print(f" x-cache-detail: {first_cache_detail}")
|
||||
print(f" age: {first_age}")
|
||||
print(f" cache-control: {first_cache_control}")
|
||||
|
||||
# Small delay to ensure any processing is complete
|
||||
time.sleep(0.1)
|
||||
|
||||
# Second request (should be cache hit)
|
||||
start_time = time.time()
|
||||
response2 = requests.get(url, params=params, timeout=10)
|
||||
second_duration = (time.time() - start_time) * 1000
|
||||
|
||||
# Check second request headers
|
||||
second_cache_detail = response2.headers.get("x-cache-detail", "unknown")
|
||||
second_age = response2.headers.get("age", "0")
|
||||
second_cache_control = response2.headers.get("cache-control", "none")
|
||||
|
||||
print(" Second request headers:")
|
||||
print(f" x-cache-detail: {second_cache_detail}")
|
||||
print(f" age: {second_age}")
|
||||
print(f" cache-control: {second_cache_control}")
|
||||
|
||||
# Determine if we actually got cache hits
|
||||
first_is_cached = (
|
||||
"cache hit" in first_cache_detail.lower() or int(first_age) > 0
|
||||
)
|
||||
second_is_cached = (
|
||||
"cache hit" in second_cache_detail.lower() or int(second_age) > 0
|
||||
)
|
||||
|
||||
print(" Cache analysis:")
|
||||
print(
|
||||
f" First request: {'Cache HIT' if first_is_cached else 'Cache MISS'}"
|
||||
)
|
||||
print(
|
||||
f" Second request: {'Cache HIT' if second_is_cached else 'Cache MISS'}"
|
||||
)
|
||||
|
||||
# Store cache information for analysis
|
||||
self.cache_info = {
|
||||
"first_request": {
|
||||
"cache_detail": first_cache_detail,
|
||||
"age": first_age,
|
||||
"is_cached": first_is_cached,
|
||||
},
|
||||
"second_request": {
|
||||
"cache_detail": second_cache_detail,
|
||||
"age": second_age,
|
||||
"is_cached": second_is_cached,
|
||||
},
|
||||
}
|
||||
|
||||
return first_duration, second_duration
|
||||
|
||||
def _test_local_cache_performance(self, email_hash):
|
||||
"""Test cache performance locally"""
|
||||
url = f"/avatar/{email_hash}"
|
||||
params = {"d": "identicon", "s": 80}
|
||||
|
||||
# First request (cache miss)
|
||||
start_time = time.time()
|
||||
self.client.get(url, params)
|
||||
first_duration = (time.time() - start_time) * 1000
|
||||
|
||||
# Second request (should be cache hit)
|
||||
start_time = time.time()
|
||||
self.client.get(url, params)
|
||||
second_duration = (time.time() - start_time) * 1000
|
||||
|
||||
return first_duration, second_duration
|
||||
|
||||
def run_all_tests(self):
|
||||
"""Run all performance tests"""
|
||||
print("Starting Libravatar Performance Tests")
|
||||
print("=" * 50)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Only setup test data for local testing
|
||||
if not self.remote_testing:
|
||||
self.setup_test_data()
|
||||
|
||||
# Run tests based on mode
|
||||
if self.remote_testing:
|
||||
print("🌐 Running remote server tests...")
|
||||
self.test_remote_avatar_performance()
|
||||
else:
|
||||
print("🏠 Running local tests...")
|
||||
self.test_avatar_generation_performance()
|
||||
self.test_database_performance()
|
||||
|
||||
# Always test concurrent load
|
||||
self.test_concurrent_load()
|
||||
|
||||
# Test cache performance if enabled
|
||||
self.test_cache_performance()
|
||||
|
||||
end_time = time.time()
|
||||
total_duration = end_time - start_time
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print(f"Performance tests completed in {total_duration:.2f}s")
|
||||
|
||||
# Overall assessment
|
||||
self.assess_overall_performance()
|
||||
|
||||
return self.results
|
||||
|
||||
except Exception as e:
|
||||
print(f"Performance test failed: {e}")
|
||||
return None
|
||||
|
||||
def test_remote_avatar_performance(self):
|
||||
"""Test avatar generation performance on remote server"""
|
||||
print("\n=== Remote Avatar Performance Test ===")
|
||||
|
||||
import requests
|
||||
|
||||
# Test different avatar types and sizes
|
||||
test_cases = [
|
||||
{"default": "identicon", "size": 80},
|
||||
{"default": "monsterid", "size": 80},
|
||||
{"default": "robohash", "size": 80},
|
||||
{"default": "identicon", "size": 256},
|
||||
{"default": "monsterid", "size": 256},
|
||||
]
|
||||
|
||||
results = []
|
||||
|
||||
for case in test_cases:
|
||||
# Generate test hash
|
||||
test_email = "perftest@example.com"
|
||||
email_hash = hashlib.md5(test_email.encode()).hexdigest()
|
||||
|
||||
# Build URL
|
||||
url = f"{self.base_url}/avatar/{email_hash}"
|
||||
params = {"d": case["default"], "s": case["size"]}
|
||||
|
||||
# Time the request
|
||||
start_time = time.time()
|
||||
try:
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
end_time = time.time()
|
||||
|
||||
duration = (end_time - start_time) * 1000 # Convert to ms
|
||||
|
||||
results.append(
|
||||
{
|
||||
"test": f"{case['default']}_{case['size']}px",
|
||||
"duration_ms": duration,
|
||||
"status_code": response.status_code,
|
||||
"content_length": (
|
||||
len(response.content) if response.content else 0
|
||||
),
|
||||
"success": response.status_code == 200,
|
||||
}
|
||||
)
|
||||
|
||||
status = "✅" if response.status_code == 200 else "❌"
|
||||
print(
|
||||
f" {case['default']} ({case['size']}px): {duration:.2f}ms {status}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(f" {case['default']} ({case['size']}px): ❌ Failed - {e}")
|
||||
results.append(
|
||||
{
|
||||
"test": f"{case['default']}_{case['size']}px",
|
||||
"duration_ms": 0,
|
||||
"status_code": 0,
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
}
|
||||
)
|
||||
|
||||
# Calculate statistics for successful requests
|
||||
successful_results = [r for r in results if r["success"]]
|
||||
if successful_results:
|
||||
durations = [r["duration_ms"] for r in successful_results]
|
||||
avg_duration = statistics.mean(durations)
|
||||
max_duration = max(durations)
|
||||
|
||||
print(f"\n Average: {avg_duration:.2f}ms")
|
||||
print(f" Maximum: {max_duration:.2f}ms")
|
||||
print(f" Success rate: {len(successful_results)}/{len(results)}")
|
||||
|
||||
# Performance thresholds for remote testing
|
||||
if avg_duration > 2000: # 2 seconds
|
||||
print(" ⚠️ WARNING: Average response time exceeds 2s")
|
||||
elif avg_duration > 1000: # 1 second
|
||||
print(" ⚠️ CAUTION: Average response time exceeds 1s")
|
||||
else:
|
||||
print(" ✅ Remote avatar performance is good")
|
||||
else:
|
||||
avg_duration = 0
|
||||
max_duration = 0
|
||||
print(" ❌ All remote requests failed")
|
||||
|
||||
self.results["avatar_generation"] = {
|
||||
"average_ms": avg_duration,
|
||||
"maximum_ms": max_duration,
|
||||
"results": results,
|
||||
"success_rate": len(successful_results) / len(results) if results else 0,
|
||||
}
|
||||
|
||||
def assess_overall_performance(self):
|
||||
"""Provide overall performance assessment"""
|
||||
print("\n=== OVERALL PERFORMANCE ASSESSMENT ===")
|
||||
|
||||
warnings = []
|
||||
|
||||
# Check avatar generation
|
||||
if "avatar_generation" in self.results:
|
||||
avg_gen = self.results["avatar_generation"]["average_ms"]
|
||||
if avg_gen > 1000:
|
||||
warnings.append(f"Avatar generation is slow ({avg_gen:.0f}ms average)")
|
||||
|
||||
# Check concurrent load
|
||||
if "concurrent_load" in self.results:
|
||||
failed = self.results["concurrent_load"]["failed_requests"]
|
||||
if failed > 0:
|
||||
warnings.append(f"{failed} requests failed under concurrent load")
|
||||
|
||||
# Check cache performance
|
||||
if "cache_performance" in self.results:
|
||||
cache_working = self.results["cache_performance"].get(
|
||||
"cache_working", False
|
||||
)
|
||||
if not cache_working:
|
||||
warnings.append("Caching may not be working effectively")
|
||||
|
||||
if warnings:
|
||||
print("⚠️ Performance Issues Found:")
|
||||
for warning in warnings:
|
||||
print(f" - {warning}") # noqa: E221
|
||||
print("\nRecommendations:")
|
||||
print(" - Review database indexes and query optimization")
|
||||
print(" - Check caching configuration")
|
||||
print(" - Consider async processing for heavy operations")
|
||||
else:
|
||||
print("✅ Overall performance is good!")
|
||||
print(" - Avatar generation is responsive")
|
||||
print(" - Application handles concurrent load well")
|
||||
print(" - Caching is working effectively")
|
||||
|
||||
# Store warnings in results for main function to check
|
||||
self.results["warnings"] = warnings
|
||||
return len(warnings) > 0
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Run Libravatar performance tests")
|
||||
parser.add_argument(
|
||||
"--base-url",
|
||||
default="http://localhost:8000",
|
||||
help="Base URL for testing (default: http://localhost:8000)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--concurrent-users",
|
||||
type=int,
|
||||
default=10,
|
||||
help="Number of concurrent users to simulate (default: 10)",
|
||||
)
|
||||
parser.add_argument("--output", help="Output file for results (JSON)")
|
||||
parser.add_argument(
|
||||
"--no-cache-test",
|
||||
action="store_true",
|
||||
help="Disable cache performance testing (useful for local development)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--remote",
|
||||
action="store_true",
|
||||
help="Force remote testing mode (auto-detected for non-localhost URLs)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Determine if we should test cache performance
|
||||
test_cache = not args.no_cache_test
|
||||
|
||||
# Determine if we're doing remote testing
|
||||
remote_testing = args.remote or not args.base_url.startswith("http://localhost")
|
||||
|
||||
runner = PerformanceTestRunner(
|
||||
base_url=args.base_url,
|
||||
concurrent_users=args.concurrent_users,
|
||||
test_cache=test_cache,
|
||||
remote_testing=remote_testing,
|
||||
)
|
||||
|
||||
results = runner.run_all_tests()
|
||||
|
||||
if args.output and results:
|
||||
import json
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(results, f, indent=2)
|
||||
print(f"\nResults saved to {args.output}")
|
||||
|
||||
# Exit with error code if there were performance issues
|
||||
if results and "warnings" in results and len(results["warnings"]) > 0:
|
||||
sys.exit(1)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user