From aa742ea181338ba90d269f7e0fc6626c94ce2c86 Mon Sep 17 00:00:00 2001 From: Oliver Falk Date: Sat, 6 Sep 2025 10:28:50 +0200 Subject: [PATCH] Implement ExternalAuth for token based authorization --- config.py | 3 +- ivatar/ivataraccount/auth_models.py | 96 +++++ ivatar/ivataraccount/auth_views.py | 108 ++++++ ..._add_auth_token_and_brute_force_attempt.py | 99 +++++ ivatar/ivataraccount/test_auth_models.py | 267 ++++++++++++++ ivatar/ivataraccount/test_auth_views.py | 323 ++++++++++++++++ ivatar/ivataraccount/test_brute_force.py | 347 ++++++++++++++++++ ivatar/ivataraccount/test_token_auth.py | 228 ++++++++++++ ivatar/ivataraccount/token_auth.py | 49 +++ ivatar/ivataraccount/urls.py | 6 + 10 files changed, 1525 insertions(+), 1 deletion(-) create mode 100644 ivatar/ivataraccount/auth_models.py create mode 100644 ivatar/ivataraccount/auth_views.py create mode 100644 ivatar/ivataraccount/migrations/0021_add_auth_token_and_brute_force_attempt.py create mode 100644 ivatar/ivataraccount/test_auth_models.py create mode 100644 ivatar/ivataraccount/test_auth_views.py create mode 100644 ivatar/ivataraccount/test_brute_force.py create mode 100644 ivatar/ivataraccount/test_token_auth.py create mode 100644 ivatar/ivataraccount/token_auth.py diff --git a/config.py b/config.py index aed54fb..b055eee 100644 --- a/config.py +++ b/config.py @@ -45,6 +45,7 @@ AUTHENTICATION_BACKENDS = ( # 'django_auth_ldap.backend.LDAPBackend', "django_openid_auth.auth.OpenIDBackend", "ivatar.ivataraccount.auth.FedoraOpenIdConnect", + "ivatar.ivataraccount.token_auth.TokenAuthenticationBackend", "django.contrib.auth.backends.ModelBackend", ) @@ -213,7 +214,7 @@ CACHES = { "LOCATION": [ "127.0.0.1:11211", ], - #"OPTIONS": {"MAX_ENTRIES": 1000000}, + # "OPTIONS": {"MAX_ENTRIES": 1000000}, }, "filesystem": { "BACKEND": "django.core.cache.backends.filebased.FileBasedCache", diff --git a/ivatar/ivataraccount/auth_models.py b/ivatar/ivataraccount/auth_models.py new file mode 100644 index 0000000..1d839a7 --- /dev/null +++ b/ivatar/ivataraccount/auth_models.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +""" +Authentication-related models for ivatar +""" + +from django.contrib.auth.models import User +from django.db import models +from django.utils import timezone +from django.utils.translation import gettext_lazy as _ + + +class AuthToken(models.Model): + """ + Model for storing authentication tokens + """ + + token = models.CharField(max_length=64, unique=True) + user = models.ForeignKey(User, on_delete=models.CASCADE) + created_at = models.DateTimeField(default=timezone.now) + expires_at = models.DateTimeField() + is_active = models.BooleanField(default=True) + ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True, blank=True) + + class Meta: + verbose_name = _("auth token") + verbose_name_plural = _("auth tokens") + + def save(self, *args, **kwargs) -> None: + if not self.expires_at: + # Set expiration to 1 hour from now + self.expires_at = timezone.now() + timezone.timedelta(hours=1) + super().save(*args, **kwargs) + + def is_expired(self) -> bool: + return timezone.now() > self.expires_at + + def __str__(self) -> str: + return f"Token for {self.user.username} (expires: {self.expires_at})" + + +class BruteForceAttempt(models.Model): + """ + Model for tracking brute force attempts per user + """ + + user = models.ForeignKey(User, on_delete=models.CASCADE, null=True, blank=True) + username = models.CharField(max_length=150, null=True, blank=True) + ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True, blank=True) + user_agent = models.TextField(blank=True, null=True) + attempt_count = models.PositiveIntegerField(default=1) + first_attempt = models.DateTimeField(default=timezone.now) + last_attempt = models.DateTimeField(default=timezone.now) + is_blocked = models.BooleanField(default=False) + + class Meta: + verbose_name = _("brute force attempt") + verbose_name_plural = _("brute force attempts") + unique_together = ["user", "username", "ip_address"] + + def save(self, *args, **kwargs) -> None: + self.last_attempt = timezone.now() + super().save(*args, **kwargs) + + def should_block( + self, max_attempts: int = 5, time_window_minutes: int = 15 + ) -> bool: + """ + Check if this user/IP should be blocked based on attempt count and time window + """ + if self.is_blocked: + return True + + # Reset attempts if outside time window + if timezone.now() - self.first_attempt > timezone.timedelta( + minutes=time_window_minutes + ): + self.attempt_count = 1 + self.first_attempt = timezone.now() + self.is_blocked = False + self.save() + return False + + return self.attempt_count >= max_attempts + + def increment_attempt(self) -> None: + """Increment the attempt count""" + self.attempt_count += 1 + if self.should_block(): + self.is_blocked = True + self.save() + + def __str__(self) -> str: + identifier = ( + self.user.username if self.user else self.username or self.ip_address + ) + return f"Brute force attempt for {identifier} ({self.attempt_count} attempts)" diff --git a/ivatar/ivataraccount/auth_views.py b/ivatar/ivataraccount/auth_views.py new file mode 100644 index 0000000..6d8cc54 --- /dev/null +++ b/ivatar/ivataraccount/auth_views.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +""" +External authentication views for ivatar +""" + +import secrets +from typing import Optional +from django.views.generic.base import View +from django.http import JsonResponse, HttpRequest +from ipware import get_client_ip + +from .auth_models import AuthToken, BruteForceAttempt + + +class ExternalAuthView(View): + """ + View for external authentication with optional group checking + """ + + def get(self, request: HttpRequest, *args, **kwargs) -> JsonResponse: + """ + Handle GET request for external authentication + """ + group: Optional[str] = kwargs.get("group", None) + client_ip = get_client_ip(request)[0] + user_agent = request.META.get("HTTP_USER_AGENT", "") + + # Determine identifier for brute force tracking + if request.user.is_authenticated: + # For authenticated users, track by user + brute_force_attempt, created = BruteForceAttempt.objects.get_or_create( + user=request.user, + defaults={ + "attempt_count": 0, + "ip_address": client_ip, + "user_agent": user_agent, + }, + ) + else: + # For unauthenticated users, track by IP and username if available + username = request.GET.get("username", "") + brute_force_attempt, created = BruteForceAttempt.objects.get_or_create( + username=username or None, + ip_address=client_ip, + user_agent=user_agent, + defaults={"attempt_count": 0}, + ) + + if brute_force_attempt.should_block(): + return JsonResponse( + { + "authenticated": False, + "reason": "too_many_attempts", + "message": "Too many authentication attempts. Please try again later.", + }, + status=429, + ) + + # Check if user is authenticated + if not request.user.is_authenticated: + brute_force_attempt.increment_attempt() + return JsonResponse( + { + "authenticated": False, + "reason": "not_authenticated", + "message": "User is not authenticated", + }, + status=401, + ) + + # If group is specified, check if user is in that group + if group: + if not request.user.groups.filter(name=group).exists(): + brute_force_attempt.increment_attempt() + return JsonResponse( + { + "authenticated": False, + "reason": "not_in_group", + "message": f"User is not in group: {group}", + }, + status=403, + ) + + # Generate a new token + token = secrets.token_urlsafe(48) + + # Create auth token + auth_token = AuthToken.objects.create( + token=token, user=request.user, ip_address=client_ip + ) + + # Reset brute force attempts on successful authentication + brute_force_attempt.attempt_count = 0 + brute_force_attempt.is_blocked = False + brute_force_attempt.save() + + return JsonResponse( + { + "authenticated": True, + "token": token, + "expires_at": auth_token.expires_at.isoformat(), + "user": { + "id": request.user.id, + "username": request.user.username, + "email": request.user.email, + }, + } + ) diff --git a/ivatar/ivataraccount/migrations/0021_add_auth_token_and_brute_force_attempt.py b/ivatar/ivataraccount/migrations/0021_add_auth_token_and_brute_force_attempt.py new file mode 100644 index 0000000..3e90533 --- /dev/null +++ b/ivatar/ivataraccount/migrations/0021_add_auth_token_and_brute_force_attempt.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Generated by Django 5.2.1 on 2025-09-06 08:24 + +import django.db.models.deletion +import django.utils.timezone +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("ivataraccount", "0020_confirmedopenid_bluesky_handle"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name="AuthToken", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("token", models.CharField(max_length=64, unique=True)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("expires_at", models.DateTimeField()), + ("is_active", models.BooleanField(default=True)), + ( + "ip_address", + models.GenericIPAddressField( + blank=True, null=True, unpack_ipv4=True + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "verbose_name": "auth token", + "verbose_name_plural": "auth tokens", + }, + ), + migrations.CreateModel( + name="BruteForceAttempt", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("username", models.CharField(blank=True, max_length=150, null=True)), + ( + "ip_address", + models.GenericIPAddressField( + blank=True, null=True, unpack_ipv4=True + ), + ), + ("user_agent", models.TextField(blank=True, null=True)), + ("attempt_count", models.PositiveIntegerField(default=1)), + ( + "first_attempt", + models.DateTimeField(default=django.utils.timezone.now), + ), + ( + "last_attempt", + models.DateTimeField(default=django.utils.timezone.now), + ), + ("is_blocked", models.BooleanField(default=False)), + ( + "user", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "verbose_name": "brute force attempt", + "verbose_name_plural": "brute force attempts", + "unique_together": {("user", "username", "ip_address")}, + }, + ), + ] diff --git a/ivatar/ivataraccount/test_auth_models.py b/ivatar/ivataraccount/test_auth_models.py new file mode 100644 index 0000000..1268e21 --- /dev/null +++ b/ivatar/ivataraccount/test_auth_models.py @@ -0,0 +1,267 @@ +# -*- coding: utf-8 -*- +""" +Test authentication models for ivatar +""" + +import os +import django +from django.test import TestCase +from django.contrib.auth.models import User +from django.utils import timezone + +os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" +django.setup() + +from ivatar.ivataraccount.auth_models import AuthToken, BruteForceAttempt + + +class AuthTokenTestCase(TestCase): + """ + Test cases for AuthToken model + """ + + def setUp(self): + """Set up test data""" + self.user = User.objects.create_user( + username="testuser", password="testpass123", email="test@example.com" + ) + + def test_auth_token_creation(self): + """Test basic AuthToken creation""" + token = AuthToken.objects.create( + token="test-token-123", user=self.user, ip_address="127.0.0.1" + ) + + self.assertEqual(token.user, self.user) + self.assertEqual(token.token, "test-token-123") + self.assertEqual(token.ip_address, "127.0.0.1") + self.assertTrue(token.is_active) + self.assertIsNotNone(token.created_at) + self.assertIsNotNone(token.expires_at) + + def test_auth_token_auto_expiration(self): + """Test that expiration is set automatically""" + token = AuthToken.objects.create(token="test-token-456", user=self.user) + + # Should be set to 1 hour from now + expected_expiry = timezone.now() + timezone.timedelta(hours=1) + time_diff = abs((token.expires_at - expected_expiry).total_seconds()) + self.assertLess(time_diff, 5) # Within 5 seconds + + def test_auth_token_is_expired(self): + """Test token expiration checking""" + # Create expired token + past_time = timezone.now() - timezone.timedelta(hours=2) + token = AuthToken.objects.create(token="expired-token", user=self.user) + token.expires_at = past_time + token.save() + + self.assertTrue(token.is_expired()) + + # Create valid token + future_time = timezone.now() + timezone.timedelta(hours=1) + valid_token = AuthToken.objects.create(token="valid-token", user=self.user) + valid_token.expires_at = future_time + valid_token.save() + + self.assertFalse(valid_token.is_expired()) + + def test_auth_token_string_representation(self): + """Test string representation of AuthToken""" + token = AuthToken.objects.create(token="test-token-789", user=self.user) + + expected_str = f"Token for {self.user.username} (expires: {token.expires_at})" + self.assertEqual(str(token), expected_str) + + def test_auth_token_unique_constraint(self): + """Test that tokens must be unique""" + AuthToken.objects.create(token="unique-token", user=self.user) + + # Try to create another token with the same value + with self.assertRaises(Exception): # IntegrityError + AuthToken.objects.create(token="unique-token", user=self.user) + + +class BruteForceAttemptTestCase(TestCase): + """ + Test cases for BruteForceAttempt model + """ + + def setUp(self): + """Set up test data""" + self.user = User.objects.create_user( + username="testuser", password="testpass123", email="test@example.com" + ) + self.ip_address = "127.0.0.1" + self.user_agent = "Mozilla/5.0 Test Browser" + + def test_brute_force_attempt_creation_authenticated_user(self): + """Test BruteForceAttempt creation for authenticated user""" + attempt = BruteForceAttempt.objects.create( + user=self.user, ip_address=self.ip_address, user_agent=self.user_agent + ) + + self.assertEqual(attempt.user, self.user) + self.assertEqual(attempt.ip_address, self.ip_address) + self.assertEqual(attempt.user_agent, self.user_agent) + self.assertEqual(attempt.attempt_count, 1) + self.assertFalse(attempt.is_blocked) + self.assertIsNotNone(attempt.first_attempt) + self.assertIsNotNone(attempt.last_attempt) + + def test_brute_force_attempt_creation_unauthenticated_user(self): + """Test BruteForceAttempt creation for unauthenticated user""" + username = "testusername" + attempt = BruteForceAttempt.objects.create( + username=username, ip_address=self.ip_address, user_agent=self.user_agent + ) + + self.assertIsNone(attempt.user) + self.assertEqual(attempt.username, username) + self.assertEqual(attempt.ip_address, self.ip_address) + self.assertEqual(attempt.user_agent, self.user_agent) + self.assertEqual(attempt.attempt_count, 1) + self.assertFalse(attempt.is_blocked) + + def test_brute_force_attempt_save_updates_last_attempt(self): + """Test that save() updates last_attempt timestamp""" + attempt = BruteForceAttempt.objects.create( + user=self.user, ip_address=self.ip_address + ) + + original_last_attempt = attempt.last_attempt + + # Wait a moment and save again + import time + + time.sleep(0.1) + attempt.save() + + self.assertGreater(attempt.last_attempt, original_last_attempt) + + def test_should_block_within_time_window(self): + """Test should_block() method within time window""" + attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=3) + + # Should not block with 3 attempts (below threshold of 5) + self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=15)) + + # Should block with 6 attempts (above threshold) + attempt.attempt_count = 6 + attempt.save() + self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15)) + + def test_should_block_outside_time_window(self): + """Test should_block() resets when outside time window""" + # Create attempt with old timestamp + old_time = timezone.now() - timezone.timedelta(minutes=20) + attempt = BruteForceAttempt.objects.create( + user=self.user, attempt_count=10 # Above threshold + ) + attempt.first_attempt = old_time + attempt.save() + + # Should reset and not block (outside 15-minute window) + self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=15)) + + # Should have reset attempt count + attempt.refresh_from_db() + self.assertEqual(attempt.attempt_count, 1) + self.assertFalse(attempt.is_blocked) + + def test_should_block_when_explicitly_blocked(self): + """Test should_block() when is_blocked is True""" + attempt = BruteForceAttempt.objects.create( + user=self.user, attempt_count=2, is_blocked=True + ) + + # Should block regardless of attempt count + self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15)) + + def test_increment_attempt(self): + """Test increment_attempt() method""" + attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=3) + + attempt.increment_attempt() + + self.assertEqual(attempt.attempt_count, 4) + self.assertFalse(attempt.is_blocked) # Still below threshold + + # Increment to threshold + attempt.increment_attempt() + + self.assertEqual(attempt.attempt_count, 5) + self.assertTrue(attempt.is_blocked) # Should be blocked now + + def test_increment_attempt_blocks_at_threshold(self): + """Test that increment_attempt() blocks at threshold""" + attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=4) + + attempt.increment_attempt() + + self.assertEqual(attempt.attempt_count, 5) + self.assertTrue(attempt.is_blocked) + + def test_brute_force_attempt_string_representation(self): + """Test string representation of BruteForceAttempt""" + # Test with user + attempt_with_user = BruteForceAttempt.objects.create( + user=self.user, attempt_count=3 + ) + expected_str_user = f"Brute force attempt for {self.user.username} (3 attempts)" + self.assertEqual(str(attempt_with_user), expected_str_user) + + # Test with username only + attempt_with_username = BruteForceAttempt.objects.create( + username="testusername", ip_address=self.ip_address, attempt_count=2 + ) + expected_str_username = "Brute force attempt for testusername (2 attempts)" + self.assertEqual(str(attempt_with_username), expected_str_username) + + # Test with IP only + attempt_with_ip = BruteForceAttempt.objects.create( + ip_address=self.ip_address, attempt_count=1 + ) + expected_str_ip = f"Brute force attempt for {self.ip_address} (1 attempts)" + self.assertEqual(str(attempt_with_ip), expected_str_ip) + + def test_unique_together_constraint(self): + """Test unique_together constraint""" + # Create first attempt + BruteForceAttempt.objects.create( + user=self.user, username="testuser", ip_address=self.ip_address + ) + + # Try to create duplicate (should fail) + with self.assertRaises(Exception): # IntegrityError + BruteForceAttempt.objects.create( + user=self.user, username="testuser", ip_address=self.ip_address + ) + + def test_get_or_create_behavior(self): + """Test get_or_create behavior for brute force attempts""" + # First call should create + attempt, created = BruteForceAttempt.objects.get_or_create( + user=self.user, defaults={"attempt_count": 0} + ) + self.assertTrue(created) + self.assertEqual(attempt.attempt_count, 0) + + # Second call should get existing + attempt2, created2 = BruteForceAttempt.objects.get_or_create( + user=self.user, defaults={"attempt_count": 0} + ) + self.assertFalse(created2) + self.assertEqual(attempt.id, attempt2.id) + self.assertEqual(attempt2.attempt_count, 0) + + def test_custom_max_attempts_and_time_window(self): + """Test custom max_attempts and time_window parameters""" + attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2) + + # Test with custom parameters + self.assertFalse(attempt.should_block(max_attempts=3, time_window_minutes=30)) + + attempt.attempt_count = 3 + attempt.save() + self.assertTrue(attempt.should_block(max_attempts=3, time_window_minutes=30)) diff --git a/ivatar/ivataraccount/test_auth_views.py b/ivatar/ivataraccount/test_auth_views.py new file mode 100644 index 0000000..b60ca97 --- /dev/null +++ b/ivatar/ivataraccount/test_auth_views.py @@ -0,0 +1,323 @@ +# -*- coding: utf-8 -*- +""" +Test external authentication views for ivatar +""" + +import os +import django +import json +from unittest.mock import patch +from django.test import TestCase, Client +from django.contrib.auth.models import User, Group +from django.utils import timezone + +os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" +django.setup() + +from ivatar.ivataraccount.auth_models import AuthToken, BruteForceAttempt +from ivatar.utils import random_string + + +class ExternalAuthViewTestCase(TestCase): + """ + Test cases for ExternalAuthView + """ + + def setUp(self): + """Set up test data""" + self.client = Client() + self.username = random_string() + self.password = random_string() + self.email = f"{self.username}@{random_string()}.com" + self.group_name = random_string() + + self.user = User.objects.create_user( + username=self.username, password=self.password, email=self.email + ) + self.group = Group.objects.create(name=self.group_name) + self.user.groups.add(self.group) + + # Create another user without group + self.username_no_group = random_string() + self.password_no_group = random_string() + self.email_no_group = f"{self.username_no_group}@{random_string()}.com" + + self.user_no_group = User.objects.create_user( + username=self.username_no_group, + password=self.password_no_group, + email=self.email_no_group, + ) + + def test_external_auth_not_authenticated(self): + """Test external auth endpoint when user is not authenticated""" + response = self.client.get("/accounts/api/external/") + + self.assertEqual(response.status_code, 401) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "not_authenticated") + self.assertEqual(data["message"], "User is not authenticated") + + def test_external_auth_authenticated_no_group(self): + """Test external auth endpoint when user is authenticated but no group specified""" + self.client.login(username=self.username, password=self.password) + response = self.client.get("/accounts/api/external/") + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["authenticated"]) + self.assertIn("token", data) + self.assertIn("expires_at", data) + self.assertIn("user", data) + + # Check user data + self.assertEqual(data["user"]["id"], self.user.id) + self.assertEqual(data["user"]["username"], self.user.username) + self.assertEqual(data["user"]["email"], self.user.email) + + # Verify token was created + token_obj = AuthToken.objects.get(token=data["token"]) + self.assertEqual(token_obj.user, self.user) + self.assertTrue(token_obj.is_active) + + def test_external_auth_authenticated_with_group_success(self): + """Test external auth endpoint when user is authenticated and in specified group""" + self.client.login(username=self.username, password=self.password) + response = self.client.get(f"/accounts/api/external/{self.group_name}/") + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["authenticated"]) + self.assertIn("token", data) + self.assertIn("expires_at", data) + self.assertIn("user", data) + + def test_external_auth_authenticated_not_in_group(self): + """Test external auth endpoint when user is authenticated but not in specified group""" + self.client.login( + username=self.username_no_group, password=self.password_no_group + ) + response = self.client.get(f"/accounts/api/external/{self.group_name}/") + + self.assertEqual(response.status_code, 403) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "not_in_group") + self.assertEqual(data["message"], f"User is not in group: {self.group_name}") + + def test_external_auth_nonexistent_group(self): + """Test external auth endpoint with non-existent group""" + nonexistent_group = random_string() + self.client.login(username=self.username, password=self.password) + response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") + + self.assertEqual(response.status_code, 403) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "not_in_group") + self.assertEqual(data["message"], f"User is not in group: {nonexistent_group}") + + def test_external_auth_token_expiration(self): + """Test that generated tokens have proper expiration""" + self.client.login(username=self.username, password=self.password) + response = self.client.get("/accounts/api/external/") + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + + # Check token expiration + token_obj = AuthToken.objects.get(token=data["token"]) + expected_expiry = timezone.now() + timezone.timedelta(hours=1) + time_diff = abs((token_obj.expires_at - expected_expiry).total_seconds()) + self.assertLess(time_diff, 5) # Within 5 seconds + + def test_external_auth_ip_address_tracking(self): + """Test that IP address is tracked in auth tokens""" + self.client.login(username=self.username, password=self.password) + + # Mock the IP address + with patch("ivatar.ivataraccount.auth_views.get_client_ip") as mock_get_ip: + mock_get_ip.return_value = ("192.168.1.100", False) + + response = self.client.get("/accounts/api/external/") + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + + token_obj = AuthToken.objects.get(token=data["token"]) + self.assertEqual(token_obj.ip_address, "192.168.1.100") + + def test_external_auth_brute_force_protection_authenticated_user(self): + """Test brute force protection for authenticated users""" + self.client.login(username=self.username, password=self.password) + + # Make requests that will fail (user not in group) to trigger brute force protection + nonexistent_group = random_string() + for i in range(6): # Exceed the threshold of 5 + response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") + + if i < 5: + # First 5 requests should fail with 403 (not in group) + self.assertEqual(response.status_code, 403) + else: + # 6th request should be blocked + self.assertEqual(response.status_code, 429) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "too_many_attempts") + self.assertEqual( + data["message"], + "Too many authentication attempts. Please try again later.", + ) + + def test_external_auth_brute_force_protection_unauthenticated_user(self): + """Test brute force protection for unauthenticated users""" + # Make multiple requests without authentication + for i in range(6): # Exceed the threshold of 5 + response = self.client.get("/accounts/api/external/") + + if i < 5: + # First 5 requests should return 401 + self.assertEqual(response.status_code, 401) + else: + # 6th request should be blocked + self.assertEqual(response.status_code, 429) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "too_many_attempts") + + def test_external_auth_brute_force_reset_on_success(self): + """Test that brute force attempts are reset on successful authentication""" + # Make some failed attempts first (by accessing without group) + self.client.login( + username=self.username_no_group, password=self.password_no_group + ) + for i in range(3): + response = self.client.get(f"/accounts/api/external/{self.group_name}/") + self.assertEqual(response.status_code, 403) + + # Check that brute force attempt was created for the user who made failed attempts + attempt = BruteForceAttempt.objects.get(user=self.user_no_group) + self.assertEqual(attempt.attempt_count, 3) + + # Now make a successful request (user with group) + self.client.login(username=self.username, password=self.password) + response = self.client.get(f"/accounts/api/external/{self.group_name}/") + self.assertEqual(response.status_code, 200) + + # Check that brute force attempt was reset for the successful user (different user) + attempt_successful = BruteForceAttempt.objects.get(user=self.user) + self.assertEqual(attempt_successful.attempt_count, 0) + self.assertFalse(attempt_successful.is_blocked) + + # The original user's attempt count should still be 3 (not reset) + attempt.refresh_from_db() + self.assertEqual(attempt.attempt_count, 3) + + def test_external_auth_brute_force_tracking_by_user(self): + """Test that brute force attempts are tracked per user for authenticated users""" + self.client.login(username=self.username, password=self.password) + + # Make some requests + for i in range(3): + response = self.client.get("/accounts/api/external/") + self.assertEqual(response.status_code, 200) + + # Check that attempt is tracked by user + attempt = BruteForceAttempt.objects.get(user=self.user) + self.assertEqual(attempt.attempt_count, 0) # Reset after successful auth + self.assertFalse(attempt.is_blocked) + + def test_external_auth_brute_force_tracking_by_ip_and_username(self): + """Test that brute force attempts are tracked by IP and username for unauthenticated users""" + # Make requests with username parameter + for i in range(3): + response = self.client.get( + "/accounts/api/external/", {"username": "testuser"} + ) + self.assertEqual(response.status_code, 401) + + # Check that attempt is tracked by username and IP + attempt = BruteForceAttempt.objects.get(username="testuser") + self.assertEqual(attempt.attempt_count, 3) + self.assertIsNotNone(attempt.ip_address) + + def test_external_auth_user_agent_tracking(self): + """Test that user agent is tracked in brute force attempts""" + self.client.login(username=self.username, password=self.password) + + # Make request with custom user agent + response = self.client.get( + "/accounts/api/external/", HTTP_USER_AGENT="Custom Test Agent" + ) + + self.assertEqual(response.status_code, 200) + + # Check that user agent was tracked + attempt = BruteForceAttempt.objects.get(user=self.user) + self.assertEqual(attempt.user_agent, "Custom Test Agent") + + def test_external_auth_token_uniqueness(self): + """Test that generated tokens are unique""" + self.client.login(username=self.username, password=self.password) + + tokens = set() + for i in range(10): + response = self.client.get("/accounts/api/external/") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + tokens.add(data["token"]) + + # All tokens should be unique + self.assertEqual(len(tokens), 10) + + def test_external_auth_url_patterns(self): + """Test that URL patterns work correctly""" + self.client.login(username=self.username, password=self.password) + + # Test without group + response = self.client.get("/accounts/api/external/") + self.assertEqual(response.status_code, 200) + + # Test with group + response = self.client.get(f"/accounts/api/external/{self.group_name}/") + self.assertEqual(response.status_code, 200) + + # Test with trailing slash + response = self.client.get(f"/accounts/api/external/{self.group_name}") + self.assertEqual(response.status_code, 200) + + def test_external_auth_json_response_format(self): + """Test that JSON response has correct format""" + self.client.login(username=self.username, password=self.password) + response = self.client.get("/accounts/api/external/") + + self.assertEqual(response.status_code, 200) + self.assertEqual(response["Content-Type"], "application/json") + + data = json.loads(response.content) + required_fields = ["authenticated", "token", "expires_at", "user"] + for field in required_fields: + self.assertIn(field, data) + + # Check user object structure + user_data = data["user"] + required_user_fields = ["id", "username", "email"] + for field in required_user_fields: + self.assertIn(field, user_data) + + def test_external_auth_error_response_format(self): + """Test that error responses have correct format""" + response = self.client.get("/accounts/api/external/") + + self.assertEqual(response.status_code, 401) + self.assertEqual(response["Content-Type"], "application/json") + + data = json.loads(response.content) + required_fields = ["authenticated", "reason", "message"] + for field in required_fields: + self.assertIn(field, data) + + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "not_authenticated") + self.assertIsInstance(data["message"], str) diff --git a/ivatar/ivataraccount/test_brute_force.py b/ivatar/ivataraccount/test_brute_force.py new file mode 100644 index 0000000..2dace56 --- /dev/null +++ b/ivatar/ivataraccount/test_brute_force.py @@ -0,0 +1,347 @@ +# -*- coding: utf-8 -*- +""" +Test brute force protection mechanisms for ivatar +""" + +import os +import django +import json +from unittest.mock import patch +from django.test import TestCase, Client +from django.contrib.auth.models import User +from django.utils import timezone + +os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" +django.setup() + +from ivatar.ivataraccount.auth_models import BruteForceAttempt +from ivatar.utils import random_string + + +class BruteForceProtectionTestCase(TestCase): + """ + Test cases for brute force protection mechanisms + """ + + def setUp(self): + """Set up test data""" + self.client = Client() + self.username = random_string() + self.password = random_string() + self.email = f"{self.username}@{random_string()}.com" + + self.user = User.objects.create_user( + username=self.username, password=self.password, email=self.email + ) + self.ip_address = "192.168.1.100" + self.user_agent = "Mozilla/5.0 Test Browser" + + def test_brute_force_protection_authenticated_user_tracking(self): + """Test brute force protection for authenticated users""" + self.client.login(username=self.username, password=self.password) + + # Make requests that will fail (user not in group) to trigger brute force protection + nonexistent_group = random_string() + for i in range(7): + response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") + + if i < 5: + # First 5 requests should fail with 403 (not in group) + self.assertEqual(response.status_code, 403) + else: + # Requests 6+ should be blocked + self.assertEqual(response.status_code, 429) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "too_many_attempts") + + # Verify brute force attempt was created and blocked + attempt = BruteForceAttempt.objects.get(user=self.user) + self.assertEqual(attempt.attempt_count, 5) # Blocked at threshold + self.assertTrue(attempt.is_blocked) + + def test_brute_force_protection_unauthenticated_user_tracking(self): + """Test brute force protection for unauthenticated users""" + # Make requests without authentication + for i in range(7): + response = self.client.get("/accounts/api/external/") + + if i < 5: + # First 5 requests should return 401 + self.assertEqual(response.status_code, 401) + else: + # Requests 6+ should be blocked + self.assertEqual(response.status_code, 429) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "too_many_attempts") + + # Verify brute force attempt was created and blocked + attempt = BruteForceAttempt.objects.get(username=None, ip_address__isnull=False) + self.assertEqual(attempt.attempt_count, 5) # Blocked at threshold + self.assertTrue(attempt.is_blocked) + + def test_brute_force_protection_with_username_parameter(self): + """Test brute force protection with username parameter for unauthenticated users""" + # Make requests with username parameter + for i in range(7): + response = self.client.get( + "/accounts/api/external/", {"username": "testuser"} + ) + + if i < 5: + # First 5 requests should return 401 + self.assertEqual(response.status_code, 401) + else: + # Requests 6+ should be blocked + self.assertEqual(response.status_code, 429) + + # Verify brute force attempt was created with username + attempt = BruteForceAttempt.objects.get(username="testuser") + self.assertEqual(attempt.attempt_count, 5) + self.assertTrue(attempt.is_blocked) + self.assertIsNotNone(attempt.ip_address) + + def test_brute_force_protection_time_window_reset(self): + """Test that brute force protection resets after time window""" + self.client.login(username=self.username, password=self.password) + + # Make requests to trigger blocking + nonexistent_group = random_string() + for i in range(6): + response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") + + # Should be blocked now + self.assertEqual(response.status_code, 429) + + # Manually reset the time window by updating the attempt + attempt = BruteForceAttempt.objects.get(user=self.user) + old_time = timezone.now() - timezone.timedelta(minutes=20) + attempt.first_attempt = old_time + attempt.save() + + # Next request should reset and succeed + response = self.client.get("/accounts/api/external/") + self.assertEqual(response.status_code, 200) + + # Verify attempt was reset + attempt.refresh_from_db() + self.assertEqual(attempt.attempt_count, 0) + self.assertFalse(attempt.is_blocked) + + def test_brute_force_protection_successful_auth_resets_attempts(self): + """Test that successful authentication resets brute force attempts""" + self.client.login(username="testuser", password="testpass123") + + # Make some requests to build up attempt count + for i in range(3): + response = self.client.get("/accounts/api/external/") + self.assertEqual(response.status_code, 200) + + # Check that attempt count was reset after successful auth + attempt = BruteForceAttempt.objects.get(user=self.user) + self.assertEqual(attempt.attempt_count, 0) + self.assertFalse(attempt.is_blocked) + + def test_brute_force_protection_different_users_independent(self): + """Test that brute force protection is independent for different users""" + user2 = User.objects.create_user( + username=random_string(), + password=random_string(), + email=f"{random_string()}@{random_string()}.com", + ) + + # Block first user + self.client.login(username=self.username, password=self.password) + nonexistent_group = random_string() + for i in range(6): + response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") + + self.assertEqual(response.status_code, 429) + + # Second user should still be able to authenticate + self.client.login(username=user2.username, password=user2.password) + response = self.client.get("/accounts/api/external/") + self.assertEqual(response.status_code, 200) + + # Verify separate attempts were tracked + attempt1 = BruteForceAttempt.objects.get(user=self.user) + attempt2 = BruteForceAttempt.objects.get(user=user2) + + self.assertTrue(attempt1.is_blocked) + self.assertFalse(attempt2.is_blocked) + + def test_brute_force_protection_different_ips_independent(self): + """Test that brute force protection is independent for different IPs""" + # Mock different IP addresses + with patch("ivatar.ivataraccount.auth_views.get_client_ip") as mock_get_ip: + # First IP gets blocked + mock_get_ip.return_value = ("192.168.1.100", False) + + for i in range(6): + response = self.client.get("/accounts/api/external/") + + self.assertEqual(response.status_code, 429) + + # Second IP should still work + mock_get_ip.return_value = ("192.168.1.200", False) + response = self.client.get("/accounts/api/external/") + self.assertEqual( + response.status_code, 401 + ) # Not authenticated, but not blocked + + # Verify separate attempts were tracked + attempt1 = BruteForceAttempt.objects.get(ip_address="192.168.1.100") + attempt2 = BruteForceAttempt.objects.get(ip_address="192.168.1.200") + + self.assertTrue(attempt1.is_blocked) + self.assertFalse(attempt2.is_blocked) + + def test_brute_force_protection_user_agent_tracking(self): + """Test that user agent is properly tracked""" + self.client.login(username="testuser", password="testpass123") + + # Make request with custom user agent + response = self.client.get( + "/accounts/api/external/", HTTP_USER_AGENT="Custom Test Agent" + ) + + self.assertEqual(response.status_code, 200) + + # Verify user agent was tracked + attempt = BruteForceAttempt.objects.get(user=self.user) + self.assertEqual(attempt.user_agent, "Custom Test Agent") + + def test_brute_force_protection_custom_thresholds(self): + """Test brute force protection with custom thresholds""" + self.client.login(username="testuser", password="testpass123") + + # Create attempt with custom threshold + attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2) + + # Test with custom max_attempts + self.assertFalse(attempt.should_block(max_attempts=3, time_window_minutes=15)) + + attempt.attempt_count = 3 + attempt.save() + self.assertTrue(attempt.should_block(max_attempts=3, time_window_minutes=15)) + + def test_brute_force_protection_custom_time_window(self): + """Test brute force protection with custom time window""" + self.client.login(username=self.username, password=self.password) + + # Create attempt with old timestamp + old_time = timezone.now() - timezone.timedelta(minutes=10) + attempt = BruteForceAttempt.objects.create( + user=self.user, attempt_count=10, first_attempt=old_time + ) + + # Should not block with 5-minute window (outside window) + self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=5)) + + # Should block with 15-minute window (within window) + self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15)) + + def test_brute_force_protection_explicit_blocking(self): + """Test explicit blocking of brute force attempts""" + attempt = BruteForceAttempt.objects.create( + user=self.user, attempt_count=2, is_blocked=True # Explicitly blocked + ) + + # Should block regardless of attempt count + self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15)) + + def test_brute_force_protection_increment_blocks_at_threshold(self): + """Test that increment_attempt blocks at threshold""" + attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=4) + + # Increment should block at threshold + attempt.increment_attempt() + + self.assertEqual(attempt.attempt_count, 5) + self.assertTrue(attempt.is_blocked) + + def test_brute_force_protection_get_or_create_behavior(self): + """Test get_or_create behavior for brute force attempts""" + # First call should create + attempt, created = BruteForceAttempt.objects.get_or_create( + user=self.user, defaults={"attempt_count": 0} + ) + self.assertTrue(created) + self.assertEqual(attempt.attempt_count, 0) + + # Second call should get existing + attempt2, created2 = BruteForceAttempt.objects.get_or_create( + user=self.user, defaults={"attempt_count": 0} + ) + self.assertFalse(created2) + self.assertEqual(attempt.id, attempt2.id) + + def test_brute_force_protection_unique_constraints(self): + """Test unique constraints for brute force attempts""" + # Create first attempt + BruteForceAttempt.objects.create( + user=self.user, username="testuser", ip_address=self.ip_address + ) + + # Try to create duplicate (should fail) + with self.assertRaises(Exception): # IntegrityError + BruteForceAttempt.objects.create( + user=self.user, username="testuser", ip_address=self.ip_address + ) + + def test_brute_force_protection_mixed_authentication_states(self): + """Test brute force protection with mixed authentication states""" + # Start unauthenticated + for i in range(3): + response = self.client.get( + "/accounts/api/external/", {"username": "testuser"} + ) + self.assertEqual(response.status_code, 401) + + # Switch to authenticated + self.client.login(username="testuser", password="testpass123") + + # Should still track by user now + response = self.client.get("/accounts/api/external/") + self.assertEqual(response.status_code, 200) + + # Verify attempt was created for authenticated user + attempt = BruteForceAttempt.objects.get(user=self.user) + self.assertEqual(attempt.attempt_count, 0) # Reset after successful auth + + def test_brute_force_protection_edge_case_empty_username(self): + """Test brute force protection with empty username parameter""" + # Make requests with empty username + for i in range(6): + response = self.client.get("/accounts/api/external/", {"username": ""}) + + if i < 5: + self.assertEqual(response.status_code, 401) + else: + self.assertEqual(response.status_code, 429) + + # Verify attempt was created with None username + attempt = BruteForceAttempt.objects.get(username=None) + self.assertEqual(attempt.attempt_count, 5) + self.assertTrue(attempt.is_blocked) + + def test_brute_force_protection_concurrent_requests(self): + """Test brute force protection with concurrent-like requests""" + self.client.login(username=self.username, password=self.password) + + # Simulate rapid requests that will fail + nonexistent_group = random_string() + responses = [] + for i in range(10): + response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") + responses.append(response.status_code) + + # First 5 should fail with 403, rest should be blocked + self.assertEqual(responses[:5], [403] * 5) + self.assertEqual(responses[5:], [429] * 5) + + # Verify final state + attempt = BruteForceAttempt.objects.get(user=self.user) + self.assertEqual(attempt.attempt_count, 5) + self.assertTrue(attempt.is_blocked) diff --git a/ivatar/ivataraccount/test_token_auth.py b/ivatar/ivataraccount/test_token_auth.py new file mode 100644 index 0000000..ea57835 --- /dev/null +++ b/ivatar/ivataraccount/test_token_auth.py @@ -0,0 +1,228 @@ +# -*- coding: utf-8 -*- +""" +Test token authentication backend for ivatar +""" + +import os +import django +from django.test import TestCase +from django.contrib.auth.models import User +from django.utils import timezone +from django.http import HttpRequest + +os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" +django.setup() + +from ivatar.ivataraccount.auth_models import AuthToken +from ivatar.ivataraccount.token_auth import TokenAuthenticationBackend + + +class TokenAuthenticationBackendTestCase(TestCase): + """ + Test cases for TokenAuthenticationBackend + """ + + def setUp(self): + """Set up test data""" + self.user = User.objects.create_user( + username="testuser", password="testpass123", email="test@example.com" + ) + self.backend = TokenAuthenticationBackend() + self.request = HttpRequest() + + def test_authenticate_with_valid_token(self): + """Test authentication with valid token""" + # Create a valid token + _token = AuthToken.objects.create( + token="valid-token-123", user=self.user, ip_address="127.0.0.1" + ) + + # Authenticate with the token + authenticated_user = self.backend.authenticate( + request=self.request, token="valid-token-123" + ) + + self.assertEqual(authenticated_user, self.user) + + # Token should still be active + _token.refresh_from_db() + self.assertTrue(_token.is_active) + + def test_authenticate_with_invalid_token(self): + """Test authentication with invalid token""" + # Try to authenticate with non-existent token + authenticated_user = self.backend.authenticate( + request=self.request, token="invalid-token-456" + ) + + self.assertIsNone(authenticated_user) + + def test_authenticate_with_expired_token(self): + """Test authentication with expired token""" + # Create an expired token + past_time = timezone.now() - timezone.timedelta(hours=2) + _token = AuthToken.objects.create(token="expired-token-789", user=self.user) + _token.expires_at = past_time + _token.save() + + # Try to authenticate with expired token + authenticated_user = self.backend.authenticate( + request=self.request, token="expired-token-789" + ) + + self.assertIsNone(authenticated_user) + + # Token should be deactivated + _token.refresh_from_db() + self.assertFalse(_token.is_active) + + def test_authenticate_with_inactive_token(self): + """Test authentication with inactive token""" + # Create an inactive token + AuthToken.objects.create( + token="inactive-token-101", user=self.user, is_active=False + ) + + # Try to authenticate with inactive token + authenticated_user = self.backend.authenticate( + request=self.request, token="inactive-token-101" + ) + + self.assertIsNone(authenticated_user) + + def test_authenticate_without_token(self): + """Test authentication without token parameter""" + authenticated_user = self.backend.authenticate(request=self.request, token=None) + + self.assertIsNone(authenticated_user) + + def test_authenticate_with_empty_token(self): + """Test authentication with empty token""" + authenticated_user = self.backend.authenticate(request=self.request, token="") + + self.assertIsNone(authenticated_user) + + def test_get_user_with_valid_id(self): + """Test get_user with valid user ID""" + user = self.backend.get_user(self.user.id) + + self.assertEqual(user, self.user) + self.assertEqual(user.username, "testuser") + self.assertEqual(user.email, "test@example.com") + + def test_get_user_with_invalid_id(self): + """Test get_user with invalid user ID""" + user = self.backend.get_user(99999) # Non-existent ID + + self.assertIsNone(user) + + def test_get_user_with_none_id(self): + """Test get_user with None ID""" + user = self.backend.get_user(None) + + self.assertIsNone(user) + + def test_authenticate_token_case_sensitivity(self): + """Test that token authentication is case sensitive""" + # Create a token with specific case + # Create token with case-sensitive name + AuthToken.objects.create(token="Case-Sensitive-Token", user=self.user) + + # Try with different case + authenticated_user = self.backend.authenticate( + request=self.request, token="case-sensitive-token" + ) + + self.assertIsNone(authenticated_user) + + # Try with correct case + authenticated_user = self.backend.authenticate( + request=self.request, token="Case-Sensitive-Token" + ) + + self.assertEqual(authenticated_user, self.user) + + def test_authenticate_multiple_tokens_same_user(self): + """Test authentication with multiple tokens for same user""" + # Create multiple tokens for the same user + AuthToken.objects.create(token="token-1", user=self.user) + AuthToken.objects.create(token="token-2", user=self.user) + + # Both tokens should authenticate to the same user + user1 = self.backend.authenticate(request=self.request, token="token-1") + user2 = self.backend.authenticate(request=self.request, token="token-2") + + self.assertEqual(user1, self.user) + self.assertEqual(user2, self.user) + self.assertEqual(user1, user2) + + def test_authenticate_token_with_special_characters(self): + """Test authentication with token containing special characters""" + special_token = "token-with-special-chars!@#$%^&*()_+-=[]{}|;:,.<>?" + + AuthToken.objects.create(token=special_token, user=self.user) + + authenticated_user = self.backend.authenticate( + request=self.request, token=special_token + ) + + self.assertEqual(authenticated_user, self.user) + + def test_authenticate_token_length_variations(self): + """Test authentication with tokens of different lengths""" + # Short token + short_token = "abc" + AuthToken.objects.create(token=short_token, user=self.user) + + # Long token + long_token = "a" * 100 + AuthToken.objects.create(token=long_token, user=self.user) + + # Both should work + user1 = self.backend.authenticate(request=self.request, token=short_token) + user2 = self.backend.authenticate(request=self.request, token=long_token) + + self.assertEqual(user1, self.user) + self.assertEqual(user2, self.user) + + def test_authenticate_with_deleted_user(self): + """Test authentication with token for deleted user""" + # Create token for user + AuthToken.objects.create(token="token-for-deleted-user", user=self.user) + + # Delete the user + self.user.delete() + + # Try to authenticate with token for deleted user + authenticated_user = self.backend.authenticate( + request=self.request, token="token-for-deleted-user" + ) + + # Should return None (token should be invalid) + self.assertIsNone(authenticated_user) + + def test_authenticate_token_unicode_characters(self): + """Test authentication with token containing unicode characters""" + unicode_token = "token-with-unicode-ñáéíóú-中文-🚀" + + AuthToken.objects.create(token=unicode_token, user=self.user) + + authenticated_user = self.backend.authenticate( + request=self.request, token=unicode_token + ) + + self.assertEqual(authenticated_user, self.user) + + def test_backend_name(self): + """Test that backend has correct name""" + self.assertEqual(self.backend.__class__.__name__, "TokenAuthenticationBackend") + + def test_authenticate_with_request_none(self): + """Test authentication with None request""" + AuthToken.objects.create(token="token-with-none-request", user=self.user) + + authenticated_user = self.backend.authenticate( + request=None, token="token-with-none-request" + ) + + self.assertEqual(authenticated_user, self.user) diff --git a/ivatar/ivataraccount/token_auth.py b/ivatar/ivataraccount/token_auth.py new file mode 100644 index 0000000..92bb11f --- /dev/null +++ b/ivatar/ivataraccount/token_auth.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +""" +Token-based authentication backend for ivatar +""" + +from typing import Optional +from django.contrib.auth.backends import BaseBackend +from django.contrib.auth.models import User +from django.http import HttpRequest +from .auth_models import AuthToken + + +class TokenAuthenticationBackend(BaseBackend): + """ + Authentication backend that uses tokens for authentication + """ + + def authenticate( + self, request: Optional[HttpRequest], token: Optional[str] = None, **kwargs + ) -> Optional[User]: + """ + Authenticate using a token + """ + if not token: + return None + + try: + auth_token = AuthToken.objects.get(token=token, is_active=True) + + # Check if token is expired + if auth_token.is_expired(): + # Mark token as inactive + auth_token.is_active = False + auth_token.save() + return None + + return auth_token.user + + except AuthToken.DoesNotExist: + return None + + def get_user(self, user_id: int) -> Optional[User]: + """ + Get user by ID + """ + try: + return User.objects.get(pk=user_id) + except User.DoesNotExist: + return None diff --git a/ivatar/ivataraccount/urls.py b/ivatar/ivataraccount/urls.py index ae6a1c4..1adfcb2 100644 --- a/ivatar/ivataraccount/urls.py +++ b/ivatar/ivataraccount/urls.py @@ -27,6 +27,7 @@ from .views import ResendConfirmationMailView from .views import IvatarLoginView from .views import DeleteAccountView from .views import ExportView +from .auth_views import ExternalAuthView # Define URL patterns, self documenting # To see the fancy, colorful evaluation of these use: @@ -166,4 +167,9 @@ urlpatterns = [ # pylint: disable=invalid-name ResendConfirmationMailView.as_view(), name="resend_confirmation_mail", ), + re_path( + r"api/external/(?P\w+)?/?$", + ExternalAuthView.as_view(), + name="external_auth", + ), ]