diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 221e37a..29fb90e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,5 +1,5 @@ image: - name: quay.io/rhn_support_ofalk/fedora36-python3 + name: git.linux-kernel.at:5050/oliver/fedora42-python3:latest entrypoint: - "/bin/sh" - "-c" diff --git a/ivatar/test_views.py b/ivatar/test_views.py index 582539b..2049858 100644 --- a/ivatar/test_views.py +++ b/ivatar/test_views.py @@ -7,7 +7,6 @@ import contextlib # pylint: disable=too-many-lines import os -import json import django from django.urls import reverse from django.test import TestCase @@ -68,22 +67,6 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods # msg_prefix="Why does an invalid hash not redirect to deadbeef?", # ) - def test_stats(self): - """ - Test incorrect digest - """ - response = self.client.get("/stats/", follow=True) - self.assertEqual(response.status_code, 200, "unable to fetch stats!") - j = json.loads(response.content) - self.assertEqual(j["users"], 1, "user count incorrect") - self.assertEqual(j["mails"], 0, "mails count incorrect") - self.assertEqual(j["openids"], 0, "openids count incorrect") - self.assertEqual(j["unconfirmed_mails"], 0, "unconfirmed mails count incorrect") - self.assertEqual( - j["unconfirmed_openids"], 0, "unconfirmed openids count incorrect" - ) - self.assertEqual(j["avatars"], 0, "avatars count incorrect") - def test_logout(self): """ Test if logout works correctly diff --git a/ivatar/test_views_stats.py b/ivatar/test_views_stats.py new file mode 100644 index 0000000..309db08 --- /dev/null +++ b/ivatar/test_views_stats.py @@ -0,0 +1,374 @@ +# -*- coding: utf-8 -*- +""" +Test our StatsView in ivatar.views +""" + +import json +import os +import django +from django.test import TestCase +from django.test import Client +from django.contrib.auth.models import User +from ivatar.utils import random_string, random_ip_address + +os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" +django.setup() + + +class StatsTester(TestCase): + """ + Test class for StatsView + """ + + client = Client() + user = None + username = random_string() + password = random_string() + + def login(self): + """ + Login as user + """ + self.client.login(username=self.username, password=self.password) + + def setUp(self): + """ + Prepare for tests. + - Create user + """ + self.user = User.objects.create_user( + username=self.username, + password=self.password, + ) + + def test_stats_basic(self): + """ + Test basic stats functionality + """ + response = self.client.get("/stats/", follow=True) + self.assertEqual(response.status_code, 200, "unable to fetch stats!") + j = json.loads(response.content) + self.assertEqual(j["users"], 1, "user count incorrect") + self.assertEqual(j["mails"], 0, "mails count incorrect") + self.assertEqual(j["openids"], 0, "openids count incorrect") + self.assertEqual(j["unconfirmed_mails"], 0, "unconfirmed mails count incorrect") + self.assertEqual( + j["unconfirmed_openids"], 0, "unconfirmed openids count incorrect" + ) + self.assertEqual(j["avatars"], 0, "avatars count incorrect") + + def test_stats_comprehensive(self): + """ + Test comprehensive stats with actual data + """ + from ivatar.ivataraccount.models import ( + ConfirmedEmail, + ConfirmedOpenId, + Photo, + UnconfirmedEmail, + UnconfirmedOpenId, + ) + + # Create test data with random values + email1 = ConfirmedEmail.objects.create( + user=self.user, + email=f"{random_string()}@{random_string()}.{random_string(2)}", + ip_address=random_ip_address(), + ) + email1.access_count = 100 + email1.save() + + email2 = ConfirmedEmail.objects.create( + user=self.user, + email=f"{random_string()}@{random_string()}.{random_string(2)}", + ip_address=random_ip_address(), + ) + email2.access_count = 50 + email2.save() + + openid1 = ConfirmedOpenId.objects.create( + user=self.user, + openid=f"http://{random_string()}.{random_string()}.org/", + ip_address=random_ip_address(), + ) + openid1.access_count = 75 + openid1.save() + + # Create photos with valid image data (minimal PNG) + # PNG header + minimal data + png_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\nIDATx\x9cc```\x00\x00\x00\x04\x00\x01\xdd\x8d\xb4\x1c\x00\x00\x00\x00IEND\xaeB`\x82" + + photo1 = Photo.objects.create( + user=self.user, data=png_data, format="png", ip_address=random_ip_address() + ) + photo1.access_count = 200 + photo1.save() + + photo2 = Photo.objects.create( + user=self.user, + data=png_data, # Same data for testing + format="png", # Same format for testing + ip_address=random_ip_address(), + ) + photo2.access_count = 150 + photo2.save() + + # Associate photos with emails/openids + email1.photo = photo1 + email1.save() + email2.photo = photo2 + email2.save() + openid1.photo = photo1 + openid1.save() + + # Create unconfirmed entries + UnconfirmedEmail.objects.create( + user=self.user, + email=f"{random_string()}@{random_string()}.{random_string(2)}", + ip_address=random_ip_address(), + ) + + UnconfirmedOpenId.objects.create( + user=self.user, + openid=f"http://{random_string()}.{random_string()}.org/", + ip_address=random_ip_address(), + ) + + # Test the stats endpoint + response = self.client.get("/stats/") + self.assertEqual(response.status_code, 200, "unable to fetch stats!") + j = json.loads(response.content) + + # Test basic counts + self.assertEqual(j["users"], 1, "user count incorrect") + self.assertEqual(j["mails"], 2, "mails count incorrect") + self.assertEqual(j["openids"], 1, "openids count incorrect") + self.assertEqual(j["unconfirmed_mails"], 1, "unconfirmed mails count incorrect") + self.assertEqual( + j["unconfirmed_openids"], 1, "unconfirmed openids count incorrect" + ) + self.assertEqual(j["avatars"], 2, "avatars count incorrect") + + # Test top viewed avatars + self.assertIn("top_viewed_avatars", j, "top_viewed_avatars missing") + self.assertEqual( + len(j["top_viewed_avatars"]), 2, "should have 2 top viewed avatars" + ) + # The top viewed avatar should be the one with highest associated email/openid access count + self.assertEqual( + j["top_viewed_avatars"][0]["access_count"], + 100, + "top avatar access count incorrect", + ) + + # Test top queried emails + self.assertIn("top_queried_emails", j, "top_queried_emails missing") + self.assertEqual( + len(j["top_queried_emails"]), 2, "should have 2 top queried emails" + ) + self.assertEqual( + j["top_queried_emails"][0]["access_count"], + 100, + "top email access count incorrect", + ) + + # Test top queried openids + self.assertIn("top_queried_openids", j, "top_queried_openids missing") + self.assertEqual( + len(j["top_queried_openids"]), 1, "should have 1 top queried openid" + ) + self.assertEqual( + j["top_queried_openids"][0]["access_count"], + 75, + "top openid access count incorrect", + ) + + # Test photo format distribution + self.assertIn( + "photo_format_distribution", j, "photo_format_distribution missing" + ) + formats = { + item["format"]: item["count"] for item in j["photo_format_distribution"] + } + self.assertEqual(formats["png"], 2, "png format count incorrect") + + # Test user activity stats + self.assertIn("user_activity", j, "user_activity missing") + self.assertEqual( + j["user_activity"]["users_with_multiple_photos"], + 1, + "users with multiple photos incorrect", + ) + self.assertEqual( + j["user_activity"]["users_with_both_email_and_openid"], + 1, + "users with both email and openid incorrect", + ) + self.assertEqual( + j["user_activity"]["average_photos_per_user"], + 2.0, + "average photos per user incorrect", + ) + + # Test Bluesky handles (should be empty) + self.assertIn("bluesky_handles", j, "bluesky_handles missing") + self.assertEqual( + j["bluesky_handles"]["total_bluesky_handles"], + 0, + "total bluesky handles should be 0", + ) + + # Test photo size stats + self.assertIn("photo_size_stats", j, "photo_size_stats missing") + self.assertGreater( + j["photo_size_stats"]["average_size_bytes"], + 0, + "average photo size should be > 0", + ) + self.assertEqual( + j["photo_size_stats"]["total_photos_analyzed"], + 2, + "total photos analyzed incorrect", + ) + + # Test potential duplicate photos + self.assertIn( + "potential_duplicate_photos", j, "potential_duplicate_photos missing" + ) + self.assertEqual( + j["potential_duplicate_photos"]["potential_duplicate_groups"], + 1, + "should have 1 duplicate group (same PNG data)", + ) + + def test_stats_edge_cases(self): + """ + Test edge cases for stats + """ + # Test with no data + response = self.client.get("/stats/") + self.assertEqual(response.status_code, 200, "unable to fetch stats!") + j = json.loads(response.content) + + # All lists should be empty + self.assertEqual( + len(j["top_viewed_avatars"]), 0, "top_viewed_avatars should be empty" + ) + self.assertEqual( + len(j["top_queried_emails"]), 0, "top_queried_emails should be empty" + ) + self.assertEqual( + len(j["top_queried_openids"]), 0, "top_queried_openids should be empty" + ) + self.assertEqual( + len(j["photo_format_distribution"]), + 0, + "photo_format_distribution should be empty", + ) + self.assertEqual( + j["bluesky_handles"]["total_bluesky_handles"], + 0, + "bluesky_handles should be 0", + ) + self.assertEqual( + j["photo_size_stats"]["total_photos_analyzed"], + 0, + "photo_size_stats should be 0", + ) + self.assertEqual( + j["potential_duplicate_photos"]["potential_duplicate_groups"], + 0, + "potential_duplicate_photos should be 0", + ) + + def test_stats_with_bluesky_handles(self): + """ + Test stats with Bluesky handles + """ + from ivatar.ivataraccount.models import ConfirmedEmail, ConfirmedOpenId + + # Create email with Bluesky handle + email = ConfirmedEmail.objects.create( + user=self.user, + email=f"{random_string()}@{random_string()}.{random_string(2)}", + ip_address=random_ip_address(), + ) + email.bluesky_handle = f"{random_string()}.bsky.social" + email.access_count = 100 + email.save() + + # Create OpenID with Bluesky handle + openid = ConfirmedOpenId.objects.create( + user=self.user, + openid=f"http://{random_string()}.{random_string()}.org/", + ip_address=random_ip_address(), + ) + openid.bluesky_handle = f"{random_string()}.bsky.social" + openid.access_count = 50 + openid.save() + + response = self.client.get("/stats/") + self.assertEqual(response.status_code, 200, "unable to fetch stats!") + j = json.loads(response.content) + + # Test Bluesky handles stats + self.assertEqual( + j["bluesky_handles"]["total_bluesky_handles"], + 2, + "total bluesky handles incorrect", + ) + self.assertEqual( + j["bluesky_handles"]["bluesky_emails"], 1, "bluesky emails count incorrect" + ) + self.assertEqual( + j["bluesky_handles"]["bluesky_openids"], + 1, + "bluesky openids count incorrect", + ) + + def test_stats_photo_duplicates(self): + """ + Test potential duplicate photos detection + """ + from ivatar.ivataraccount.models import Photo + + # Create photos with same format and size (potential duplicates) + # PNG header + minimal data + png_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\nIDATx\x9cc```\x00\x00\x00\x04\x00\x01\xdd\x8d\xb4\x1c\x00\x00\x00\x00IEND\xaeB`\x82" + + Photo.objects.create( + user=self.user, data=png_data, format="png", ip_address=random_ip_address() + ) + Photo.objects.create( + user=self.user, + data=png_data, # Same size + format="png", # Same format + ip_address=random_ip_address(), + ) + Photo.objects.create( + user=self.user, + data=png_data, # Same size but different format + format="png", # Same format for testing + ip_address=random_ip_address(), + ) + + response = self.client.get("/stats/") + self.assertEqual(response.status_code, 200, "unable to fetch stats!") + j = json.loads(response.content) + + # Should detect potential duplicates + self.assertEqual( + j["potential_duplicate_photos"]["potential_duplicate_groups"], + 1, + "should have 1 duplicate group", + ) + self.assertEqual( + j["potential_duplicate_photos"]["total_potential_duplicate_photos"], + 3, + "should have 3 potential duplicate photos", + ) + self.assertEqual( + len(j["potential_duplicate_photos"]["potential_duplicate_groups_detail"]), + 1, + "should have 1 duplicate group detail", + ) diff --git a/ivatar/utils.py b/ivatar/utils.py index 3e50824..3df96bf 100644 --- a/ivatar/utils.py +++ b/ivatar/utils.py @@ -111,6 +111,13 @@ def random_string(length=10): ) +def random_ip_address(): + """ + Return a random IP address (IPv4) + """ + return f"{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}" + + def openid_variations(openid): """ Return the various OpenID variations, ALWAYS in the same order: diff --git a/ivatar/views.py b/ivatar/views.py index d63d082..25cc3b3 100644 --- a/ivatar/views.py +++ b/ivatar/views.py @@ -560,4 +560,202 @@ class StatsView(TemplateView, JsonResponse): "avatars": Photo.objects.count(), # pylint: disable=no-member } + # Top 10 viewed avatars + top_photos = Photo.objects.order_by("-access_count")[:10] + top_photos_data = [] + for photo in top_photos: + # Find the associated email or openid with highest access count + associated_emails = photo.emails.all().order_by("-access_count") + associated_openids = photo.openids.all().order_by("-access_count") + + # Get the one with highest access count + top_associated = None + if associated_emails and associated_openids: + if ( + associated_emails[0].access_count + >= associated_openids[0].access_count + ): + top_associated = associated_emails[0] + else: + top_associated = associated_openids[0] + elif associated_emails: + top_associated = associated_emails[0] + elif associated_openids: + top_associated = associated_openids[0] + + if top_associated: + if hasattr(top_associated, "email"): + # It's a ConfirmedEmail + top_photos_data.append( + { + "access_count": top_associated.access_count, + "digest_sha256": top_associated.digest_sha256, + } + ) + else: + # It's a ConfirmedOpenId + top_photos_data.append( + { + "access_count": top_associated.access_count, + "digest_sha256": top_associated.digest, + } + ) + + retval["top_viewed_avatars"] = top_photos_data + + # Top 10 queried email addresses + top_emails = ConfirmedEmail.objects.order_by("-access_count")[:10] + top_emails_data = [] + for email in top_emails: + top_emails_data.append( + { + "access_count": email.access_count, + "digest_sha256": email.digest_sha256, + } + ) + + retval["top_queried_emails"] = top_emails_data + + # Top 10 queried OpenIDs + top_openids = ConfirmedOpenId.objects.order_by("-access_count")[:10] + top_openids_data = [] + for openid in top_openids: + top_openids_data.append( + { + "access_count": openid.access_count, + "digest_sha256": openid.digest, + } + ) + + retval["top_queried_openids"] = top_openids_data + + # Photo format distribution + from django.db.models import Count + + format_distribution = ( + Photo.objects.values("format") + .annotate(count=Count("format")) + .order_by("-count") + ) + retval["photo_format_distribution"] = list(format_distribution) + + # User activity statistics + users_with_multiple_photos = ( + User.objects.annotate(photo_count=Count("photo")) + .filter(photo_count__gt=1) + .count() + ) + users_with_both_email_and_openid = ( + User.objects.filter( + confirmedemail__isnull=False, confirmedopenid__isnull=False + ) + .distinct() + .count() + ) + + # Calculate average photos per user + total_photos = Photo.objects.count() + total_users = User.objects.count() + avg_photos_per_user = total_photos / total_users if total_users > 0 else 0 + + retval["user_activity"] = { + "users_with_multiple_photos": users_with_multiple_photos, + "users_with_both_email_and_openid": users_with_both_email_and_openid, + "average_photos_per_user": round(avg_photos_per_user, 2), + } + + # Bluesky handles statistics + bluesky_emails = ConfirmedEmail.objects.filter( + bluesky_handle__isnull=False + ).count() + bluesky_openids = ConfirmedOpenId.objects.filter( + bluesky_handle__isnull=False + ).count() + total_bluesky_handles = bluesky_emails + bluesky_openids + + # Top Bluesky handles by access count + retval["bluesky_handles"] = { + "total_bluesky_handles": total_bluesky_handles, + "bluesky_emails": bluesky_emails, + "bluesky_openids": bluesky_openids, + } + + # Average photo size statistics using raw SQL + from django.db import connection + + with connection.cursor() as cursor: + # SQL to calculate average photo size + cursor.execute( + """ + SELECT + COUNT(*) as photo_count, + AVG(LENGTH(data)) as avg_size_bytes + FROM ivataraccount_photo + WHERE data IS NOT NULL + """ + ) + result = cursor.fetchone() + + if result and result[0] > 0: + photo_count, avg_size_bytes = result + # Convert to float in case database returns string + avg_size_bytes = float(avg_size_bytes) if avg_size_bytes else 0 + avg_size_kb = round(avg_size_bytes / 1024, 2) if avg_size_bytes else 0 + avg_size_mb = ( + round(avg_size_bytes / (1024 * 1024), 2) if avg_size_bytes else 0 + ) + + retval["photo_size_stats"] = { + "average_size_bytes": round(avg_size_bytes, 2) + if avg_size_bytes + else 0, + "average_size_kb": avg_size_kb, + "average_size_mb": avg_size_mb, + "total_photos_analyzed": photo_count, + } + else: + retval["photo_size_stats"] = { + "average_size_bytes": 0, + "average_size_kb": 0, + "average_size_mb": 0, + "total_photos_analyzed": 0, + } + + # For potential duplicate photos, we'll check for photos with the same format and size + # Note: This is not definitive - different images can have the same format and size + # but it's a good indicator of potential duplicates that might warrant investigation + with connection.cursor() as cursor: + cursor.execute( + """ + SELECT + format, + LENGTH(data) as file_size, + COUNT(*) as count + FROM ivataraccount_photo + WHERE data IS NOT NULL + GROUP BY format, LENGTH(data) + HAVING COUNT(*) > 1 + ORDER BY count DESC + LIMIT 10 + """ + ) + duplicate_groups = cursor.fetchall() + + total_potential_duplicate_photos = sum( + group[2] for group in duplicate_groups + ) + + # Convert to list of dictionaries for JSON serialization + duplicate_groups_detail = [ + {"format": group[0], "file_size": group[1], "count": group[2]} + for group in duplicate_groups + ] + + retval["potential_duplicate_photos"] = { + "potential_duplicate_groups": len(duplicate_groups), + "total_potential_duplicate_photos": total_potential_duplicate_photos, + "potential_duplicate_groups_detail": duplicate_groups_detail, + "note": "Potential duplicates are identified by matching file format and size - not definitive duplicates", + } + return JsonResponse(retval) diff --git a/requirements.txt b/requirements.txt index 8b3d9bd..0f8c139 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,9 +15,10 @@ fabric flake8-respect-noqa git+https://github.com/daboth/pagan.git git+https://github.com/ercpe/pydenticon5.git -git+https://github.com/flavono123/identicon.git git+https://github.com/necaris/python3-openid.git git+https://github.com/ofalk/django-openid-auth +#git+https://github.com/flavono123/identicon.git +git+https://github.com/ofalk/identicon.git git+https://github.com/ofalk/monsterid.git git+https://github.com/ofalk/Robohash.git@devel notsetuptools