Enhance the StatsView

This commit is contained in:
Oliver Falk
2025-09-24 17:44:41 +02:00
parent 9d3d5fe5a1
commit 9caee65b8e
6 changed files with 582 additions and 19 deletions

View File

@@ -1,5 +1,5 @@
image: image:
name: quay.io/rhn_support_ofalk/fedora36-python3 name: git.linux-kernel.at:5050/oliver/fedora42-python3:latest
entrypoint: entrypoint:
- "/bin/sh" - "/bin/sh"
- "-c" - "-c"

View File

@@ -7,7 +7,6 @@ import contextlib
# pylint: disable=too-many-lines # pylint: disable=too-many-lines
import os import os
import json
import django import django
from django.urls import reverse from django.urls import reverse
from django.test import TestCase from django.test import TestCase
@@ -68,22 +67,6 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
# msg_prefix="Why does an invalid hash not redirect to deadbeef?", # msg_prefix="Why does an invalid hash not redirect to deadbeef?",
# ) # )
def test_stats(self):
"""
Test incorrect digest
"""
response = self.client.get("/stats/", follow=True)
self.assertEqual(response.status_code, 200, "unable to fetch stats!")
j = json.loads(response.content)
self.assertEqual(j["users"], 1, "user count incorrect")
self.assertEqual(j["mails"], 0, "mails count incorrect")
self.assertEqual(j["openids"], 0, "openids count incorrect")
self.assertEqual(j["unconfirmed_mails"], 0, "unconfirmed mails count incorrect")
self.assertEqual(
j["unconfirmed_openids"], 0, "unconfirmed openids count incorrect"
)
self.assertEqual(j["avatars"], 0, "avatars count incorrect")
def test_logout(self): def test_logout(self):
""" """
Test if logout works correctly Test if logout works correctly

374
ivatar/test_views_stats.py Normal file
View File

@@ -0,0 +1,374 @@
# -*- coding: utf-8 -*-
"""
Test our StatsView in ivatar.views
"""
import json
import os
import django
from django.test import TestCase
from django.test import Client
from django.contrib.auth.models import User
from ivatar.utils import random_string, random_ip_address
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
class StatsTester(TestCase):
"""
Test class for StatsView
"""
client = Client()
user = None
username = random_string()
password = random_string()
def login(self):
"""
Login as user
"""
self.client.login(username=self.username, password=self.password)
def setUp(self):
"""
Prepare for tests.
- Create user
"""
self.user = User.objects.create_user(
username=self.username,
password=self.password,
)
def test_stats_basic(self):
"""
Test basic stats functionality
"""
response = self.client.get("/stats/", follow=True)
self.assertEqual(response.status_code, 200, "unable to fetch stats!")
j = json.loads(response.content)
self.assertEqual(j["users"], 1, "user count incorrect")
self.assertEqual(j["mails"], 0, "mails count incorrect")
self.assertEqual(j["openids"], 0, "openids count incorrect")
self.assertEqual(j["unconfirmed_mails"], 0, "unconfirmed mails count incorrect")
self.assertEqual(
j["unconfirmed_openids"], 0, "unconfirmed openids count incorrect"
)
self.assertEqual(j["avatars"], 0, "avatars count incorrect")
def test_stats_comprehensive(self):
"""
Test comprehensive stats with actual data
"""
from ivatar.ivataraccount.models import (
ConfirmedEmail,
ConfirmedOpenId,
Photo,
UnconfirmedEmail,
UnconfirmedOpenId,
)
# Create test data with random values
email1 = ConfirmedEmail.objects.create(
user=self.user,
email=f"{random_string()}@{random_string()}.{random_string(2)}",
ip_address=random_ip_address(),
)
email1.access_count = 100
email1.save()
email2 = ConfirmedEmail.objects.create(
user=self.user,
email=f"{random_string()}@{random_string()}.{random_string(2)}",
ip_address=random_ip_address(),
)
email2.access_count = 50
email2.save()
openid1 = ConfirmedOpenId.objects.create(
user=self.user,
openid=f"http://{random_string()}.{random_string()}.org/",
ip_address=random_ip_address(),
)
openid1.access_count = 75
openid1.save()
# Create photos with valid image data (minimal PNG)
# PNG header + minimal data
png_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\nIDATx\x9cc```\x00\x00\x00\x04\x00\x01\xdd\x8d\xb4\x1c\x00\x00\x00\x00IEND\xaeB`\x82"
photo1 = Photo.objects.create(
user=self.user, data=png_data, format="png", ip_address=random_ip_address()
)
photo1.access_count = 200
photo1.save()
photo2 = Photo.objects.create(
user=self.user,
data=png_data, # Same data for testing
format="png", # Same format for testing
ip_address=random_ip_address(),
)
photo2.access_count = 150
photo2.save()
# Associate photos with emails/openids
email1.photo = photo1
email1.save()
email2.photo = photo2
email2.save()
openid1.photo = photo1
openid1.save()
# Create unconfirmed entries
UnconfirmedEmail.objects.create(
user=self.user,
email=f"{random_string()}@{random_string()}.{random_string(2)}",
ip_address=random_ip_address(),
)
UnconfirmedOpenId.objects.create(
user=self.user,
openid=f"http://{random_string()}.{random_string()}.org/",
ip_address=random_ip_address(),
)
# Test the stats endpoint
response = self.client.get("/stats/")
self.assertEqual(response.status_code, 200, "unable to fetch stats!")
j = json.loads(response.content)
# Test basic counts
self.assertEqual(j["users"], 1, "user count incorrect")
self.assertEqual(j["mails"], 2, "mails count incorrect")
self.assertEqual(j["openids"], 1, "openids count incorrect")
self.assertEqual(j["unconfirmed_mails"], 1, "unconfirmed mails count incorrect")
self.assertEqual(
j["unconfirmed_openids"], 1, "unconfirmed openids count incorrect"
)
self.assertEqual(j["avatars"], 2, "avatars count incorrect")
# Test top viewed avatars
self.assertIn("top_viewed_avatars", j, "top_viewed_avatars missing")
self.assertEqual(
len(j["top_viewed_avatars"]), 2, "should have 2 top viewed avatars"
)
# The top viewed avatar should be the one with highest associated email/openid access count
self.assertEqual(
j["top_viewed_avatars"][0]["access_count"],
100,
"top avatar access count incorrect",
)
# Test top queried emails
self.assertIn("top_queried_emails", j, "top_queried_emails missing")
self.assertEqual(
len(j["top_queried_emails"]), 2, "should have 2 top queried emails"
)
self.assertEqual(
j["top_queried_emails"][0]["access_count"],
100,
"top email access count incorrect",
)
# Test top queried openids
self.assertIn("top_queried_openids", j, "top_queried_openids missing")
self.assertEqual(
len(j["top_queried_openids"]), 1, "should have 1 top queried openid"
)
self.assertEqual(
j["top_queried_openids"][0]["access_count"],
75,
"top openid access count incorrect",
)
# Test photo format distribution
self.assertIn(
"photo_format_distribution", j, "photo_format_distribution missing"
)
formats = {
item["format"]: item["count"] for item in j["photo_format_distribution"]
}
self.assertEqual(formats["png"], 2, "png format count incorrect")
# Test user activity stats
self.assertIn("user_activity", j, "user_activity missing")
self.assertEqual(
j["user_activity"]["users_with_multiple_photos"],
1,
"users with multiple photos incorrect",
)
self.assertEqual(
j["user_activity"]["users_with_both_email_and_openid"],
1,
"users with both email and openid incorrect",
)
self.assertEqual(
j["user_activity"]["average_photos_per_user"],
2.0,
"average photos per user incorrect",
)
# Test Bluesky handles (should be empty)
self.assertIn("bluesky_handles", j, "bluesky_handles missing")
self.assertEqual(
j["bluesky_handles"]["total_bluesky_handles"],
0,
"total bluesky handles should be 0",
)
# Test photo size stats
self.assertIn("photo_size_stats", j, "photo_size_stats missing")
self.assertGreater(
j["photo_size_stats"]["average_size_bytes"],
0,
"average photo size should be > 0",
)
self.assertEqual(
j["photo_size_stats"]["total_photos_analyzed"],
2,
"total photos analyzed incorrect",
)
# Test potential duplicate photos
self.assertIn(
"potential_duplicate_photos", j, "potential_duplicate_photos missing"
)
self.assertEqual(
j["potential_duplicate_photos"]["potential_duplicate_groups"],
1,
"should have 1 duplicate group (same PNG data)",
)
def test_stats_edge_cases(self):
"""
Test edge cases for stats
"""
# Test with no data
response = self.client.get("/stats/")
self.assertEqual(response.status_code, 200, "unable to fetch stats!")
j = json.loads(response.content)
# All lists should be empty
self.assertEqual(
len(j["top_viewed_avatars"]), 0, "top_viewed_avatars should be empty"
)
self.assertEqual(
len(j["top_queried_emails"]), 0, "top_queried_emails should be empty"
)
self.assertEqual(
len(j["top_queried_openids"]), 0, "top_queried_openids should be empty"
)
self.assertEqual(
len(j["photo_format_distribution"]),
0,
"photo_format_distribution should be empty",
)
self.assertEqual(
j["bluesky_handles"]["total_bluesky_handles"],
0,
"bluesky_handles should be 0",
)
self.assertEqual(
j["photo_size_stats"]["total_photos_analyzed"],
0,
"photo_size_stats should be 0",
)
self.assertEqual(
j["potential_duplicate_photos"]["potential_duplicate_groups"],
0,
"potential_duplicate_photos should be 0",
)
def test_stats_with_bluesky_handles(self):
"""
Test stats with Bluesky handles
"""
from ivatar.ivataraccount.models import ConfirmedEmail, ConfirmedOpenId
# Create email with Bluesky handle
email = ConfirmedEmail.objects.create(
user=self.user,
email=f"{random_string()}@{random_string()}.{random_string(2)}",
ip_address=random_ip_address(),
)
email.bluesky_handle = f"{random_string()}.bsky.social"
email.access_count = 100
email.save()
# Create OpenID with Bluesky handle
openid = ConfirmedOpenId.objects.create(
user=self.user,
openid=f"http://{random_string()}.{random_string()}.org/",
ip_address=random_ip_address(),
)
openid.bluesky_handle = f"{random_string()}.bsky.social"
openid.access_count = 50
openid.save()
response = self.client.get("/stats/")
self.assertEqual(response.status_code, 200, "unable to fetch stats!")
j = json.loads(response.content)
# Test Bluesky handles stats
self.assertEqual(
j["bluesky_handles"]["total_bluesky_handles"],
2,
"total bluesky handles incorrect",
)
self.assertEqual(
j["bluesky_handles"]["bluesky_emails"], 1, "bluesky emails count incorrect"
)
self.assertEqual(
j["bluesky_handles"]["bluesky_openids"],
1,
"bluesky openids count incorrect",
)
def test_stats_photo_duplicates(self):
"""
Test potential duplicate photos detection
"""
from ivatar.ivataraccount.models import Photo
# Create photos with same format and size (potential duplicates)
# PNG header + minimal data
png_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\nIDATx\x9cc```\x00\x00\x00\x04\x00\x01\xdd\x8d\xb4\x1c\x00\x00\x00\x00IEND\xaeB`\x82"
Photo.objects.create(
user=self.user, data=png_data, format="png", ip_address=random_ip_address()
)
Photo.objects.create(
user=self.user,
data=png_data, # Same size
format="png", # Same format
ip_address=random_ip_address(),
)
Photo.objects.create(
user=self.user,
data=png_data, # Same size but different format
format="png", # Same format for testing
ip_address=random_ip_address(),
)
response = self.client.get("/stats/")
self.assertEqual(response.status_code, 200, "unable to fetch stats!")
j = json.loads(response.content)
# Should detect potential duplicates
self.assertEqual(
j["potential_duplicate_photos"]["potential_duplicate_groups"],
1,
"should have 1 duplicate group",
)
self.assertEqual(
j["potential_duplicate_photos"]["total_potential_duplicate_photos"],
3,
"should have 3 potential duplicate photos",
)
self.assertEqual(
len(j["potential_duplicate_photos"]["potential_duplicate_groups_detail"]),
1,
"should have 1 duplicate group detail",
)

View File

@@ -111,6 +111,13 @@ def random_string(length=10):
) )
def random_ip_address():
"""
Return a random IP address (IPv4)
"""
return f"{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}"
def openid_variations(openid): def openid_variations(openid):
""" """
Return the various OpenID variations, ALWAYS in the same order: Return the various OpenID variations, ALWAYS in the same order:

View File

@@ -560,4 +560,202 @@ class StatsView(TemplateView, JsonResponse):
"avatars": Photo.objects.count(), # pylint: disable=no-member "avatars": Photo.objects.count(), # pylint: disable=no-member
} }
# Top 10 viewed avatars
top_photos = Photo.objects.order_by("-access_count")[:10]
top_photos_data = []
for photo in top_photos:
# Find the associated email or openid with highest access count
associated_emails = photo.emails.all().order_by("-access_count")
associated_openids = photo.openids.all().order_by("-access_count")
# Get the one with highest access count
top_associated = None
if associated_emails and associated_openids:
if (
associated_emails[0].access_count
>= associated_openids[0].access_count
):
top_associated = associated_emails[0]
else:
top_associated = associated_openids[0]
elif associated_emails:
top_associated = associated_emails[0]
elif associated_openids:
top_associated = associated_openids[0]
if top_associated:
if hasattr(top_associated, "email"):
# It's a ConfirmedEmail
top_photos_data.append(
{
"access_count": top_associated.access_count,
"digest_sha256": top_associated.digest_sha256,
}
)
else:
# It's a ConfirmedOpenId
top_photos_data.append(
{
"access_count": top_associated.access_count,
"digest_sha256": top_associated.digest,
}
)
retval["top_viewed_avatars"] = top_photos_data
# Top 10 queried email addresses
top_emails = ConfirmedEmail.objects.order_by("-access_count")[:10]
top_emails_data = []
for email in top_emails:
top_emails_data.append(
{
"access_count": email.access_count,
"digest_sha256": email.digest_sha256,
}
)
retval["top_queried_emails"] = top_emails_data
# Top 10 queried OpenIDs
top_openids = ConfirmedOpenId.objects.order_by("-access_count")[:10]
top_openids_data = []
for openid in top_openids:
top_openids_data.append(
{
"access_count": openid.access_count,
"digest_sha256": openid.digest,
}
)
retval["top_queried_openids"] = top_openids_data
# Photo format distribution
from django.db.models import Count
format_distribution = (
Photo.objects.values("format")
.annotate(count=Count("format"))
.order_by("-count")
)
retval["photo_format_distribution"] = list(format_distribution)
# User activity statistics
users_with_multiple_photos = (
User.objects.annotate(photo_count=Count("photo"))
.filter(photo_count__gt=1)
.count()
)
users_with_both_email_and_openid = (
User.objects.filter(
confirmedemail__isnull=False, confirmedopenid__isnull=False
)
.distinct()
.count()
)
# Calculate average photos per user
total_photos = Photo.objects.count()
total_users = User.objects.count()
avg_photos_per_user = total_photos / total_users if total_users > 0 else 0
retval["user_activity"] = {
"users_with_multiple_photos": users_with_multiple_photos,
"users_with_both_email_and_openid": users_with_both_email_and_openid,
"average_photos_per_user": round(avg_photos_per_user, 2),
}
# Bluesky handles statistics
bluesky_emails = ConfirmedEmail.objects.filter(
bluesky_handle__isnull=False
).count()
bluesky_openids = ConfirmedOpenId.objects.filter(
bluesky_handle__isnull=False
).count()
total_bluesky_handles = bluesky_emails + bluesky_openids
# Top Bluesky handles by access count
retval["bluesky_handles"] = {
"total_bluesky_handles": total_bluesky_handles,
"bluesky_emails": bluesky_emails,
"bluesky_openids": bluesky_openids,
}
# Average photo size statistics using raw SQL
from django.db import connection
with connection.cursor() as cursor:
# SQL to calculate average photo size
cursor.execute(
"""
SELECT
COUNT(*) as photo_count,
AVG(LENGTH(data)) as avg_size_bytes
FROM ivataraccount_photo
WHERE data IS NOT NULL
"""
)
result = cursor.fetchone()
if result and result[0] > 0:
photo_count, avg_size_bytes = result
# Convert to float in case database returns string
avg_size_bytes = float(avg_size_bytes) if avg_size_bytes else 0
avg_size_kb = round(avg_size_bytes / 1024, 2) if avg_size_bytes else 0
avg_size_mb = (
round(avg_size_bytes / (1024 * 1024), 2) if avg_size_bytes else 0
)
retval["photo_size_stats"] = {
"average_size_bytes": round(avg_size_bytes, 2)
if avg_size_bytes
else 0,
"average_size_kb": avg_size_kb,
"average_size_mb": avg_size_mb,
"total_photos_analyzed": photo_count,
}
else:
retval["photo_size_stats"] = {
"average_size_bytes": 0,
"average_size_kb": 0,
"average_size_mb": 0,
"total_photos_analyzed": 0,
}
# For potential duplicate photos, we'll check for photos with the same format and size
# Note: This is not definitive - different images can have the same format and size
# but it's a good indicator of potential duplicates that might warrant investigation
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT
format,
LENGTH(data) as file_size,
COUNT(*) as count
FROM ivataraccount_photo
WHERE data IS NOT NULL
GROUP BY format, LENGTH(data)
HAVING COUNT(*) > 1
ORDER BY count DESC
LIMIT 10
"""
)
duplicate_groups = cursor.fetchall()
total_potential_duplicate_photos = sum(
group[2] for group in duplicate_groups
)
# Convert to list of dictionaries for JSON serialization
duplicate_groups_detail = [
{"format": group[0], "file_size": group[1], "count": group[2]}
for group in duplicate_groups
]
retval["potential_duplicate_photos"] = {
"potential_duplicate_groups": len(duplicate_groups),
"total_potential_duplicate_photos": total_potential_duplicate_photos,
"potential_duplicate_groups_detail": duplicate_groups_detail,
"note": "Potential duplicates are identified by matching file format and size - not definitive duplicates",
}
return JsonResponse(retval) return JsonResponse(retval)

View File

@@ -15,9 +15,10 @@ fabric
flake8-respect-noqa flake8-respect-noqa
git+https://github.com/daboth/pagan.git git+https://github.com/daboth/pagan.git
git+https://github.com/ercpe/pydenticon5.git git+https://github.com/ercpe/pydenticon5.git
git+https://github.com/flavono123/identicon.git
git+https://github.com/necaris/python3-openid.git git+https://github.com/necaris/python3-openid.git
git+https://github.com/ofalk/django-openid-auth git+https://github.com/ofalk/django-openid-auth
#git+https://github.com/flavono123/identicon.git
git+https://github.com/ofalk/identicon.git
git+https://github.com/ofalk/monsterid.git git+https://github.com/ofalk/monsterid.git
git+https://github.com/ofalk/Robohash.git@devel git+https://github.com/ofalk/Robohash.git@devel
notsetuptools notsetuptools