diff --git a/config.py b/config.py index b055eee..ac00796 100644 --- a/config.py +++ b/config.py @@ -45,7 +45,6 @@ AUTHENTICATION_BACKENDS = ( # 'django_auth_ldap.backend.LDAPBackend', "django_openid_auth.auth.OpenIDBackend", "ivatar.ivataraccount.auth.FedoraOpenIdConnect", - "ivatar.ivataraccount.token_auth.TokenAuthenticationBackend", "django.contrib.auth.backends.ModelBackend", ) diff --git a/ivatar/ivataraccount/auth_models.py b/ivatar/ivataraccount/auth_models.py deleted file mode 100644 index a029af8..0000000 --- a/ivatar/ivataraccount/auth_models.py +++ /dev/null @@ -1,122 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Authentication-related models for ivatar -""" - -from django.contrib.auth.models import User -from django.db import models -from django.utils import timezone -from django.utils.translation import gettext_lazy as _ - - -class AuthToken(models.Model): - """ - Model for storing authentication tokens - """ - - token = models.CharField(max_length=64, unique=True) - user = models.ForeignKey(User, on_delete=models.CASCADE) - created_at = models.DateTimeField(default=timezone.now) - expires_at = models.DateTimeField() - is_active = models.BooleanField(default=True) - ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True, blank=True) - - class Meta: - verbose_name = _("auth token") - verbose_name_plural = _("auth tokens") - - def save(self, *args, **kwargs) -> None: - if not self.expires_at: - # Set expiration to 1 hour from now - self.expires_at = timezone.now() + timezone.timedelta(hours=1) - super().save(*args, **kwargs) - - def is_expired(self) -> bool: - return timezone.now() > self.expires_at - - def __str__(self) -> str: - return f"Token for {self.user.username} (expires: {self.expires_at})" - - -class BruteForceAttempt(models.Model): - """ - Model for tracking brute force attempts per user - """ - - user = models.ForeignKey(User, on_delete=models.CASCADE, null=True, blank=True) - username = models.CharField(max_length=150, null=True, blank=True) - ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True, blank=True) - user_agent = models.TextField(blank=True, null=True) - attempt_count = models.PositiveIntegerField(default=1) - first_attempt = models.DateTimeField(default=timezone.now) - last_attempt = models.DateTimeField(default=timezone.now) - is_blocked = models.BooleanField(default=False) - - class Meta: - verbose_name = _("brute force attempt") - verbose_name_plural = _("brute force attempts") - # Unique constraint: either user+ip OR username+ip (but not both) - constraints = [ - models.UniqueConstraint( - fields=["user", "ip_address"], - condition=models.Q(user__isnull=False), - name="unique_user_ip", - ), - models.UniqueConstraint( - fields=["username", "ip_address"], - condition=models.Q(username__isnull=False, user__isnull=True), - name="unique_username_ip", - ), - ] - - def save(self, *args, **kwargs) -> None: - self.last_attempt = timezone.now() - super().save(*args, **kwargs) - - def should_block( - self, max_attempts: int = 5, time_window_minutes: int = 15 - ) -> bool: - """ - Check if this user/IP should be blocked based on attempt count and time window - """ - if self.is_blocked: - return True - - # Check if outside time window - time_diff = timezone.now() - self.first_attempt - time_window = timezone.timedelta(minutes=time_window_minutes) - - if time_diff > time_window: - # Time window has expired, should not block - return False - - return self.attempt_count >= max_attempts - - def reset_if_expired(self, time_window_minutes: int = 15) -> bool: - """ - Reset attempt if time window has expired - Returns True if reset was performed, False otherwise - """ - time_diff = timezone.now() - self.first_attempt - time_window = timezone.timedelta(minutes=time_window_minutes) - - if time_diff > time_window: - self.attempt_count = 0 - self.first_attempt = timezone.now() - self.is_blocked = False - self.save() - return True - return False - - def increment_attempt(self) -> None: - """Increment the attempt count""" - self.attempt_count += 1 - if self.should_block(): - self.is_blocked = True - self.save() - - def __str__(self) -> str: - identifier = ( - self.user.username if self.user else self.username or self.ip_address - ) - return f"Brute force attempt for {identifier} ({self.attempt_count} attempts)" diff --git a/ivatar/ivataraccount/auth_views.py b/ivatar/ivataraccount/auth_views.py deleted file mode 100644 index 45278e9..0000000 --- a/ivatar/ivataraccount/auth_views.py +++ /dev/null @@ -1,131 +0,0 @@ -# -*- coding: utf-8 -*- -""" -External authentication views for ivatar -""" - -import secrets -from typing import Optional -from django.views.generic.base import View -from django.http import JsonResponse, HttpRequest, HttpResponseRedirect -from urllib.parse import urlencode, urlparse, urlunparse, parse_qs, quote -from ipware import get_client_ip - -from .auth_models import AuthToken, BruteForceAttempt - - -class ExternalAuthView(View): - """ - View for external authentication with optional group checking - """ - - def get(self, request: HttpRequest, *args, **kwargs) -> JsonResponse: - """ - Handle GET request for external authentication - """ - group: Optional[str] = kwargs.get("group", None) - client_ip = get_client_ip(request)[0] - user_agent = request.META.get("HTTP_USER_AGENT", "") - - # Check if user is authenticated - if not request.user.is_authenticated: - # Redirect to login page with next parameter - current_path = request.get_full_path() - # URL-encode the current path to preserve query parameters - encoded_path = quote(current_path, safe="") - redirect_url = f"/accounts/login/?next={encoded_path}" - return HttpResponseRedirect(redirect_url) - - # Determine identifier for brute force tracking (only for authenticated users) - brute_force_attempt, created = BruteForceAttempt.objects.get_or_create( - user=request.user, - ip_address=client_ip, - defaults={ - "attempt_count": 0, - "user_agent": user_agent, - }, - ) - - # Reset attempts if time window has expired - brute_force_attempt.reset_if_expired() - - if brute_force_attempt.should_block(): - return JsonResponse( - { - "authenticated": False, - "reason": "too_many_attempts", - "message": "Too many authentication attempts. Please try again later.", - }, - status=429, - ) - - # If group is specified, check if user is in that group - if group: - # Special handling for 'admin' group: also check if user is superuser - if group == "admin": - is_in_group = ( - request.user.groups.filter(name=group).exists() - or request.user.is_superuser - ) - else: - is_in_group = request.user.groups.filter(name=group).exists() - - if not is_in_group: - brute_force_attempt.increment_attempt() - return JsonResponse( - { - "authenticated": False, - "reason": "not_in_group", - "message": f"User is not in group: {group}", - }, - status=403, - ) - - # Generate a new token - token = secrets.token_urlsafe(48) - - # Create auth token - auth_token = AuthToken.objects.create( - token=token, user=request.user, ip_address=client_ip - ) - - # Reset brute force attempts on successful authentication - brute_force_attempt.attempt_count = 0 - brute_force_attempt.is_blocked = False - brute_force_attempt.save() - - # Check if there's a 'next' parameter for redirect - next_url = request.GET.get("next") - if next_url: - # Parse the next URL and add the token as a parameter - parsed_url = urlparse(next_url) - query_params = parse_qs(parsed_url.query) - query_params["token"] = [token] - - # Rebuild the URL with the token parameter - new_query = urlencode(query_params, doseq=True) - redirect_url = urlunparse( - ( - parsed_url.scheme, - parsed_url.netloc, - parsed_url.path, - parsed_url.params, - new_query, - parsed_url.fragment, - ) - ) - - return HttpResponseRedirect(redirect_url) - - # If no next parameter, return JSON response as before - return JsonResponse( - { - "authenticated": True, - "token": token, - "expires_at": auth_token.expires_at.isoformat(), - "user": { - "id": request.user.id, - "username": request.user.username, - "email": request.user.email, - }, - } - ) diff --git a/ivatar/ivataraccount/migrations/0021_add_auth_token_and_brute_force_attempt.py b/ivatar/ivataraccount/migrations/0021_add_auth_token_and_brute_force_attempt.py deleted file mode 100644 index 3e90533..0000000 --- a/ivatar/ivataraccount/migrations/0021_add_auth_token_and_brute_force_attempt.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by Django 5.2.1 on 2025-09-06 08:24 - -import django.db.models.deletion -import django.utils.timezone -from django.conf import settings -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("ivataraccount", "0020_confirmedopenid_bluesky_handle"), - migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ] - - operations = [ - migrations.CreateModel( - name="AuthToken", - fields=[ - ( - "id", - models.BigAutoField( - auto_created=True, - primary_key=True, - serialize=False, - verbose_name="ID", - ), - ), - ("token", models.CharField(max_length=64, unique=True)), - ("created_at", models.DateTimeField(default=django.utils.timezone.now)), - ("expires_at", models.DateTimeField()), - ("is_active", models.BooleanField(default=True)), - ( - "ip_address", - models.GenericIPAddressField( - blank=True, null=True, unpack_ipv4=True - ), - ), - ( - "user", - models.ForeignKey( - on_delete=django.db.models.deletion.CASCADE, - to=settings.AUTH_USER_MODEL, - ), - ), - ], - options={ - "verbose_name": "auth token", - "verbose_name_plural": "auth tokens", - }, - ), - migrations.CreateModel( - name="BruteForceAttempt", - fields=[ - ( - "id", - models.BigAutoField( - auto_created=True, - primary_key=True, - serialize=False, - verbose_name="ID", - ), - ), - ("username", models.CharField(blank=True, max_length=150, null=True)), - ( - "ip_address", - models.GenericIPAddressField( - blank=True, null=True, unpack_ipv4=True - ), - ), - ("user_agent", models.TextField(blank=True, null=True)), - ("attempt_count", models.PositiveIntegerField(default=1)), - ( - "first_attempt", - models.DateTimeField(default=django.utils.timezone.now), - ), - ( - "last_attempt", - models.DateTimeField(default=django.utils.timezone.now), - ), - ("is_blocked", models.BooleanField(default=False)), - ( - "user", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.CASCADE, - to=settings.AUTH_USER_MODEL, - ), - ), - ], - options={ - "verbose_name": "brute force attempt", - "verbose_name_plural": "brute force attempts", - "unique_together": {("user", "username", "ip_address")}, - }, - ), - ] diff --git a/ivatar/ivataraccount/migrations/0022_fix_brute_force_unique_constraints.py b/ivatar/ivataraccount/migrations/0022_fix_brute_force_unique_constraints.py deleted file mode 100644 index 66af2c7..0000000 --- a/ivatar/ivataraccount/migrations/0022_fix_brute_force_unique_constraints.py +++ /dev/null @@ -1,36 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by Django 5.2.1 on 2025-09-07 09:58 - -from django.conf import settings -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("ivataraccount", "0021_add_auth_token_and_brute_force_attempt"), - migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ] - - operations = [ - migrations.AlterUniqueTogether( - name="bruteforceattempt", - unique_together=set(), - ), - migrations.AddConstraint( - model_name="bruteforceattempt", - constraint=models.UniqueConstraint( - condition=models.Q(("user__isnull", False)), - fields=("user", "ip_address"), - name="unique_user_ip", - ), - ), - migrations.AddConstraint( - model_name="bruteforceattempt", - constraint=models.UniqueConstraint( - condition=models.Q(("user__isnull", True), ("username__isnull", False)), - fields=("username", "ip_address"), - name="unique_username_ip", - ), - ), - ] diff --git a/ivatar/ivataraccount/test_auth_models.py b/ivatar/ivataraccount/test_auth_models.py deleted file mode 100644 index d0623f2..0000000 --- a/ivatar/ivataraccount/test_auth_models.py +++ /dev/null @@ -1,279 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Test authentication models for ivatar -""" - -import os -import django -from django.test import TestCase -from django.contrib.auth.models import User -from django.utils import timezone - -os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" -django.setup() - -from ivatar.ivataraccount.auth_models import AuthToken, BruteForceAttempt -from ivatar.utils import random_string - - -class AuthTokenTestCase(TestCase): - """ - Test cases for AuthToken model - """ - - def setUp(self): - """Set up test data""" - self.username = random_string() - self.password = random_string() - self.email = f"{random_string()}@{random_string()}.com" - self.user = User.objects.create_user( - username=self.username, password=self.password, email=self.email - ) - - def test_auth_token_creation(self): - """Test basic AuthToken creation""" - token = AuthToken.objects.create( - token="test-token-123", user=self.user, ip_address="127.0.0.1" - ) - - self.assertEqual(token.user, self.user) - self.assertEqual(token.token, "test-token-123") - self.assertEqual(token.ip_address, "127.0.0.1") - self.assertTrue(token.is_active) - self.assertIsNotNone(token.created_at) - self.assertIsNotNone(token.expires_at) - - def test_auth_token_auto_expiration(self): - """Test that expiration is set automatically""" - token = AuthToken.objects.create(token="test-token-456", user=self.user) - - # Should be set to 1 hour from now - expected_expiry = timezone.now() + timezone.timedelta(hours=1) - time_diff = abs((token.expires_at - expected_expiry).total_seconds()) - self.assertLess(time_diff, 5) # Within 5 seconds - - def test_auth_token_is_expired(self): - """Test token expiration checking""" - # Create expired token - past_time = timezone.now() - timezone.timedelta(hours=2) - token = AuthToken.objects.create(token="expired-token", user=self.user) - token.expires_at = past_time - token.save() - - self.assertTrue(token.is_expired()) - - # Create valid token - future_time = timezone.now() + timezone.timedelta(hours=1) - valid_token = AuthToken.objects.create(token="valid-token", user=self.user) - valid_token.expires_at = future_time - valid_token.save() - - self.assertFalse(valid_token.is_expired()) - - def test_auth_token_string_representation(self): - """Test string representation of AuthToken""" - token = AuthToken.objects.create(token="test-token-789", user=self.user) - - expected_str = f"Token for {self.user.username} (expires: {token.expires_at})" - self.assertEqual(str(token), expected_str) - - def test_auth_token_unique_constraint(self): - """Test that tokens must be unique""" - AuthToken.objects.create(token="unique-token", user=self.user) - - # Try to create another token with the same value - with self.assertRaises(Exception): # IntegrityError - AuthToken.objects.create(token="unique-token", user=self.user) - - -class BruteForceAttemptTestCase(TestCase): - """ - Test cases for BruteForceAttempt model - """ - - def setUp(self): - """Set up test data""" - self.username = random_string() - self.password = random_string() - self.email = f"{random_string()}@{random_string()}.com" - self.user = User.objects.create_user( - username=self.username, password=self.password, email=self.email - ) - self.ip_address = "127.0.0.1" - self.user_agent = "Mozilla/5.0 Test Browser" - - def test_brute_force_attempt_creation_authenticated_user(self): - """Test BruteForceAttempt creation for authenticated user""" - attempt = BruteForceAttempt.objects.create( - user=self.user, ip_address=self.ip_address, user_agent=self.user_agent - ) - - self.assertEqual(attempt.user, self.user) - self.assertEqual(attempt.ip_address, self.ip_address) - self.assertEqual(attempt.user_agent, self.user_agent) - self.assertEqual(attempt.attempt_count, 1) - self.assertFalse(attempt.is_blocked) - self.assertIsNotNone(attempt.first_attempt) - self.assertIsNotNone(attempt.last_attempt) - - def test_brute_force_attempt_creation_unauthenticated_user(self): - """Test BruteForceAttempt creation for unauthenticated user""" - username = random_string() - attempt = BruteForceAttempt.objects.create( - username=username, ip_address=self.ip_address, user_agent=self.user_agent - ) - - self.assertIsNone(attempt.user) - self.assertEqual(attempt.username, username) - self.assertEqual(attempt.ip_address, self.ip_address) - self.assertEqual(attempt.user_agent, self.user_agent) - self.assertEqual(attempt.attempt_count, 1) - self.assertFalse(attempt.is_blocked) - - def test_brute_force_attempt_save_updates_last_attempt(self): - """Test that save() updates last_attempt timestamp""" - attempt = BruteForceAttempt.objects.create( - user=self.user, ip_address=self.ip_address - ) - - original_last_attempt = attempt.last_attempt - - # Wait a moment and save again - import time - - time.sleep(0.1) - attempt.save() - - self.assertGreater(attempt.last_attempt, original_last_attempt) - - def test_should_block_within_time_window(self): - """Test should_block() method within time window""" - attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=3) - - # Should not block with 3 attempts (below threshold of 5) - self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=15)) - - # Should block with 6 attempts (above threshold) - attempt.attempt_count = 6 - attempt.save() - self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15)) - - def test_should_block_outside_time_window(self): - """Test should_block() resets when outside time window""" - # Create attempt with old timestamp - old_time = timezone.now() - timezone.timedelta(minutes=20) - attempt = BruteForceAttempt.objects.create( - user=self.user, attempt_count=10 # Above threshold - ) - attempt.first_attempt = old_time - attempt.save() - - # Should reset and not block (outside 15-minute window) - self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=15)) - - # Reset the attempt count using the new method - attempt.reset_if_expired(time_window_minutes=15) - - # Should have reset attempt count - attempt.refresh_from_db() - self.assertEqual(attempt.attempt_count, 0) - self.assertFalse(attempt.is_blocked) - - def test_should_block_when_explicitly_blocked(self): - """Test should_block() when is_blocked is True""" - attempt = BruteForceAttempt.objects.create( - user=self.user, attempt_count=2, is_blocked=True - ) - - # Should block regardless of attempt count - self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15)) - - def test_increment_attempt(self): - """Test increment_attempt() method""" - attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=3) - - attempt.increment_attempt() - - self.assertEqual(attempt.attempt_count, 4) - self.assertFalse(attempt.is_blocked) # Still below threshold - - # Increment to threshold - attempt.increment_attempt() - - self.assertEqual(attempt.attempt_count, 5) - self.assertTrue(attempt.is_blocked) # Should be blocked now - - def test_increment_attempt_blocks_at_threshold(self): - """Test that increment_attempt() blocks at threshold""" - attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=4) - - attempt.increment_attempt() - - self.assertEqual(attempt.attempt_count, 5) - self.assertTrue(attempt.is_blocked) - - def test_brute_force_attempt_string_representation(self): - """Test string representation of BruteForceAttempt""" - # Test with user - attempt_with_user = BruteForceAttempt.objects.create( - user=self.user, attempt_count=3 - ) - expected_str_user = f"Brute force attempt for {self.user.username} (3 attempts)" - self.assertEqual(str(attempt_with_user), expected_str_user) - - # Test with username only - test_username = random_string() - attempt_with_username = BruteForceAttempt.objects.create( - username=test_username, ip_address=self.ip_address, attempt_count=2 - ) - expected_str_username = f"Brute force attempt for {test_username} (2 attempts)" - self.assertEqual(str(attempt_with_username), expected_str_username) - - # Test with IP only - attempt_with_ip = BruteForceAttempt.objects.create( - ip_address=self.ip_address, attempt_count=1 - ) - expected_str_ip = f"Brute force attempt for {self.ip_address} (1 attempts)" - self.assertEqual(str(attempt_with_ip), expected_str_ip) - - def test_unique_together_constraint(self): - """Test unique_together constraint""" - test_username = random_string() - # Create first attempt - BruteForceAttempt.objects.create( - user=self.user, username=test_username, ip_address=self.ip_address - ) - - # Try to create duplicate (should fail) - with self.assertRaises(Exception): # IntegrityError - BruteForceAttempt.objects.create( - user=self.user, username=test_username, ip_address=self.ip_address - ) - - def test_get_or_create_behavior(self): - """Test get_or_create behavior for brute force attempts""" - # First call should create - attempt, created = BruteForceAttempt.objects.get_or_create( - user=self.user, defaults={"attempt_count": 0} - ) - self.assertTrue(created) - self.assertEqual(attempt.attempt_count, 0) - - # Second call should get existing - attempt2, created2 = BruteForceAttempt.objects.get_or_create( - user=self.user, defaults={"attempt_count": 0} - ) - self.assertFalse(created2) - self.assertEqual(attempt.id, attempt2.id) - self.assertEqual(attempt2.attempt_count, 0) - - def test_custom_max_attempts_and_time_window(self): - """Test custom max_attempts and time_window parameters""" - attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2) - - # Test with custom parameters - self.assertFalse(attempt.should_block(max_attempts=3, time_window_minutes=30)) - - attempt.attempt_count = 3 - attempt.save() - self.assertTrue(attempt.should_block(max_attempts=3, time_window_minutes=30)) diff --git a/ivatar/ivataraccount/test_auth_views.py b/ivatar/ivataraccount/test_auth_views.py deleted file mode 100644 index 6495e15..0000000 --- a/ivatar/ivataraccount/test_auth_views.py +++ /dev/null @@ -1,421 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Test external authentication views for ivatar -""" - -import os -import django -import json -from unittest.mock import patch -from django.test import TestCase, Client -from django.contrib.auth.models import User, Group -from django.utils import timezone - -os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" -django.setup() - -from ivatar.ivataraccount.auth_models import AuthToken, BruteForceAttempt -from ivatar.utils import random_string - - -class ExternalAuthViewTestCase(TestCase): - """ - Test cases for ExternalAuthView - """ - - def setUp(self): - """Set up test data""" - self.client = Client() - self.username = random_string() - self.password = random_string() - self.email = f"{self.username}@{random_string()}.com" - self.group_name = random_string() - - self.user = User.objects.create_user( - username=self.username, password=self.password, email=self.email - ) - self.group = Group.objects.create(name=self.group_name) - self.user.groups.add(self.group) - - # Create another user without group - self.username_no_group = random_string() - self.password_no_group = random_string() - self.email_no_group = f"{self.username_no_group}@{random_string()}.com" - - self.user_no_group = User.objects.create_user( - username=self.username_no_group, - password=self.password_no_group, - email=self.email_no_group, - ) - - def test_external_auth_not_authenticated(self): - """Test external auth endpoint when user is not authenticated""" - response = self.client.get("/accounts/api/external/") - - # Should redirect to login page - self.assertEqual(response.status_code, 302) - self.assertIn("/accounts/login/", response.url) - self.assertIn("next=", response.url) - - def test_external_auth_authenticated_no_group(self): - """Test external auth endpoint when user is authenticated but no group specified""" - self.client.login(username=self.username, password=self.password) - response = self.client.get("/accounts/api/external/") - - self.assertEqual(response.status_code, 200) - data = json.loads(response.content) - self.assertTrue(data["authenticated"]) - self.assertIn("token", data) - self.assertIn("expires_at", data) - self.assertIn("user", data) - - # Check user data - self.assertEqual(data["user"]["id"], self.user.id) - self.assertEqual(data["user"]["username"], self.user.username) - self.assertEqual(data["user"]["email"], self.user.email) - - # Verify token was created - token_obj = AuthToken.objects.get(token=data["token"]) - self.assertEqual(token_obj.user, self.user) - self.assertTrue(token_obj.is_active) - - def test_external_auth_authenticated_with_group_success(self): - """Test external auth endpoint when user is authenticated and in specified group""" - self.client.login(username=self.username, password=self.password) - response = self.client.get(f"/accounts/api/external/{self.group_name}/") - - self.assertEqual(response.status_code, 200) - data = json.loads(response.content) - self.assertTrue(data["authenticated"]) - self.assertIn("token", data) - self.assertIn("expires_at", data) - self.assertIn("user", data) - - def test_external_auth_authenticated_not_in_group(self): - """Test external auth endpoint when user is authenticated but not in specified group""" - self.client.login( - username=self.username_no_group, password=self.password_no_group - ) - response = self.client.get(f"/accounts/api/external/{self.group_name}/") - - self.assertEqual(response.status_code, 403) - data = json.loads(response.content) - self.assertFalse(data["authenticated"]) - self.assertEqual(data["reason"], "not_in_group") - self.assertEqual(data["message"], f"User is not in group: {self.group_name}") - - def test_external_auth_admin_group_superuser_access(self): - """Test that superusers can access admin group even without being in admin group""" - # Make the existing user a superuser - self.user.is_superuser = True - self.user.save() - - # Login as superuser - self.client.login(username=self.username, password=self.password) - - # Try to access admin group (superuser should have access even without being in admin group) - response = self.client.get("/accounts/api/external/admin/") - - self.assertEqual(response.status_code, 200) - data = json.loads(response.content) - self.assertTrue(data["authenticated"]) - self.assertIn("token", data) - - # Verify token was created for superuser - token_obj = AuthToken.objects.get(token=data["token"]) - self.assertEqual(token_obj.user, self.user) - - def test_external_auth_admin_group_regular_user_in_admin_group(self): - """Test that regular users in admin group can access admin group""" - # Create admin group - admin_group = Group.objects.create(name="admin") - - # Add user to admin group - self.user.groups.add(admin_group) - - # Login as user - self.client.login(username=self.username, password=self.password) - - # Try to access admin group - response = self.client.get("/accounts/api/external/admin/") - - self.assertEqual(response.status_code, 200) - data = json.loads(response.content) - self.assertTrue(data["authenticated"]) - self.assertIn("token", data) - - def test_external_auth_admin_group_regular_user_not_in_admin_group(self): - """Test that regular users not in admin group cannot access admin group""" - # Login as user (not in admin group) - self.client.login(username=self.username, password=self.password) - - # Try to access admin group - response = self.client.get("/accounts/api/external/admin/") - - self.assertEqual(response.status_code, 403) - data = json.loads(response.content) - self.assertFalse(data["authenticated"]) - self.assertEqual(data["reason"], "not_in_group") - self.assertEqual(data["message"], "User is not in group: admin") - - def test_external_auth_admin_group_superuser_and_in_admin_group(self): - """Test that superusers who are also in admin group can access admin group""" - # Create admin group - admin_group = Group.objects.create(name="admin") - - # Make the existing user a superuser - self.user.is_superuser = True - self.user.save() - - # Add user to admin group as well - self.user.groups.add(admin_group) - - # Login as superuser - self.client.login(username=self.username, password=self.password) - - # Try to access admin group - response = self.client.get("/accounts/api/external/admin/") - - self.assertEqual(response.status_code, 200) - data = json.loads(response.content) - self.assertTrue(data["authenticated"]) - self.assertIn("token", data) - - def test_external_auth_non_admin_group_normal_behavior(self): - """Test that non-admin groups work normally (no superuser privilege)""" - # Make the existing user a superuser - self.user.is_superuser = True - self.user.save() - - # Login as superuser - self.client.login(username=self.username, password=self.password) - - # Try to access a different group that the user is NOT in (superuser should NOT have automatic access) - different_group = random_string() - response = self.client.get(f"/accounts/api/external/{different_group}/") - - self.assertEqual(response.status_code, 403) - data = json.loads(response.content) - self.assertFalse(data["authenticated"]) - self.assertEqual(data["reason"], "not_in_group") - self.assertEqual(data["message"], f"User is not in group: {different_group}") - - def test_external_auth_nonexistent_group(self): - """Test external auth endpoint with non-existent group""" - nonexistent_group = random_string() - self.client.login(username=self.username, password=self.password) - response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") - - self.assertEqual(response.status_code, 403) - data = json.loads(response.content) - self.assertFalse(data["authenticated"]) - self.assertEqual(data["reason"], "not_in_group") - self.assertEqual(data["message"], f"User is not in group: {nonexistent_group}") - - def test_external_auth_token_expiration(self): - """Test that generated tokens have proper expiration""" - self.client.login(username=self.username, password=self.password) - response = self.client.get("/accounts/api/external/") - - self.assertEqual(response.status_code, 200) - data = json.loads(response.content) - - # Check token expiration - token_obj = AuthToken.objects.get(token=data["token"]) - expected_expiry = timezone.now() + timezone.timedelta(hours=1) - time_diff = abs((token_obj.expires_at - expected_expiry).total_seconds()) - self.assertLess(time_diff, 5) # Within 5 seconds - - def test_external_auth_ip_address_tracking(self): - """Test that IP address is tracked in auth tokens""" - self.client.login(username=self.username, password=self.password) - - # Mock the IP address - with patch("ivatar.ivataraccount.auth_views.get_client_ip") as mock_get_ip: - mock_get_ip.return_value = ("192.168.1.100", False) - - response = self.client.get("/accounts/api/external/") - - self.assertEqual(response.status_code, 200) - data = json.loads(response.content) - - token_obj = AuthToken.objects.get(token=data["token"]) - self.assertEqual(token_obj.ip_address, "192.168.1.100") - - def test_external_auth_brute_force_protection_authenticated_user(self): - """Test brute force protection for authenticated users""" - self.client.login(username=self.username, password=self.password) - - # Make requests that will fail (user not in group) to trigger brute force protection - nonexistent_group = random_string() - for i in range(6): # Exceed the threshold of 5 - response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") - - if i < 5: - # First 5 requests should fail with 403 (not in group) - self.assertEqual(response.status_code, 403) - else: - # 6th request should be blocked - self.assertEqual(response.status_code, 429) - data = json.loads(response.content) - self.assertFalse(data["authenticated"]) - self.assertEqual(data["reason"], "too_many_attempts") - self.assertEqual( - data["message"], - "Too many authentication attempts. Please try again later.", - ) - - def test_external_auth_brute_force_protection_unauthenticated_user(self): - """Test brute force protection for unauthenticated users""" - # With the new redirect flow, unauthenticated users are redirected to login - # This test is no longer applicable as brute force protection only applies to authenticated users - # who fail group checks - - # Test that unauthenticated users get redirected - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 302) - self.assertIn("/accounts/login/", response.url) - - def test_external_auth_brute_force_reset_on_success(self): - """Test that brute force attempts are reset on successful authentication""" - # Make some failed attempts first (by accessing without group) - self.client.login( - username=self.username_no_group, password=self.password_no_group - ) - for i in range(3): - response = self.client.get(f"/accounts/api/external/{self.group_name}/") - self.assertEqual(response.status_code, 403) - - # Check that brute force attempt was created for the user who made failed attempts - attempt = BruteForceAttempt.objects.get(user=self.user_no_group) - self.assertEqual(attempt.attempt_count, 3) - - # Now make a successful request (user with group) - self.client.login(username=self.username, password=self.password) - response = self.client.get(f"/accounts/api/external/{self.group_name}/") - self.assertEqual(response.status_code, 200) - - # Check that brute force attempt was reset for the successful user (different user) - attempt_successful = BruteForceAttempt.objects.get(user=self.user) - self.assertEqual(attempt_successful.attempt_count, 0) - self.assertFalse(attempt_successful.is_blocked) - - # The original user's attempt count should still be 3 (not reset) - attempt.refresh_from_db() - self.assertEqual(attempt.attempt_count, 3) - - def test_external_auth_brute_force_tracking_by_user(self): - """Test that brute force attempts are tracked per user for authenticated users""" - self.client.login(username=self.username, password=self.password) - - # Make some requests - for i in range(3): - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 200) - - # Check that attempt is tracked by user - attempt = BruteForceAttempt.objects.get(user=self.user) - self.assertEqual(attempt.attempt_count, 0) # Reset after successful auth - self.assertFalse(attempt.is_blocked) - - def test_external_auth_brute_force_tracking_by_ip_and_username(self): - """Test that brute force attempts are tracked by IP and username for unauthenticated users""" - # With the new redirect flow, unauthenticated users are redirected to login - # This test is no longer applicable as brute force tracking only applies to authenticated users - - # Test that unauthenticated users get redirected regardless of username parameter - test_username = random_string() - response = self.client.get( - "/accounts/api/external/", {"username": test_username} - ) - self.assertEqual(response.status_code, 302) - self.assertIn("/accounts/login/", response.url) - - def test_external_auth_user_agent_tracking(self): - """Test that user agent is tracked in brute force attempts""" - self.client.login(username=self.username, password=self.password) - - # Make request with custom user agent - response = self.client.get( - "/accounts/api/external/", HTTP_USER_AGENT="Custom Test Agent" - ) - - self.assertEqual(response.status_code, 200) - - # Check that user agent was tracked - attempt = BruteForceAttempt.objects.get(user=self.user) - self.assertEqual(attempt.user_agent, "Custom Test Agent") - - def test_external_auth_token_uniqueness(self): - """Test that generated tokens are unique""" - self.client.login(username=self.username, password=self.password) - - tokens = set() - for i in range(10): - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 200) - data = json.loads(response.content) - tokens.add(data["token"]) - - # All tokens should be unique - self.assertEqual(len(tokens), 10) - - def test_external_auth_url_patterns(self): - """Test that URL patterns work correctly""" - self.client.login(username=self.username, password=self.password) - - # Test without group - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 200) - - # Test with group - response = self.client.get(f"/accounts/api/external/{self.group_name}/") - self.assertEqual(response.status_code, 200) - - # Test with trailing slash - response = self.client.get(f"/accounts/api/external/{self.group_name}") - self.assertEqual(response.status_code, 200) - - def test_external_auth_json_response_format(self): - """Test that JSON response has correct format""" - self.client.login(username=self.username, password=self.password) - response = self.client.get("/accounts/api/external/") - - self.assertEqual(response.status_code, 200) - self.assertEqual(response["Content-Type"], "application/json") - - data = json.loads(response.content) - required_fields = ["authenticated", "token", "expires_at", "user"] - for field in required_fields: - self.assertIn(field, data) - - # Check user object structure - user_data = data["user"] - required_user_fields = ["id", "username", "email"] - for field in required_user_fields: - self.assertIn(field, user_data) - - def test_external_auth_error_response_format(self): - """Test that error responses have correct format""" - # Test unauthenticated redirect - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 302) - self.assertIn("/accounts/login/", response.url) - - # Test authenticated user not in group (should return JSON error) - self.client.login( - username=self.username_no_group, password=self.password_no_group - ) - response = self.client.get("/accounts/api/external/admin/") - - self.assertEqual(response.status_code, 403) - self.assertEqual(response["Content-Type"], "application/json") - - data = json.loads(response.content) - required_fields = ["authenticated", "reason", "message"] - for field in required_fields: - self.assertIn(field, data) - - self.assertFalse(data["authenticated"]) - self.assertEqual(data["reason"], "not_in_group") - self.assertIsInstance(data["message"], str) diff --git a/ivatar/ivataraccount/test_brute_force.py b/ivatar/ivataraccount/test_brute_force.py deleted file mode 100644 index 03636d4..0000000 --- a/ivatar/ivataraccount/test_brute_force.py +++ /dev/null @@ -1,382 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Test brute force protection mechanisms for ivatar -""" - -import os -import django -import json -from unittest.mock import patch -from django.test import TestCase, Client -from django.contrib.auth.models import User -from django.utils import timezone - -os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" -django.setup() - -from ivatar.ivataraccount.auth_models import BruteForceAttempt -from ivatar.utils import random_string - - -class BruteForceProtectionTestCase(TestCase): - """ - Test cases for brute force protection mechanisms - """ - - def setUp(self): - """Set up test data""" - self.client = Client() - self.username = random_string() - self.password = random_string() - self.email = f"{self.username}@{random_string()}.com" - - self.user = User.objects.create_user( - username=self.username, password=self.password, email=self.email - ) - self.ip_address = "192.168.1.100" - self.user_agent = "Mozilla/5.0 Test Browser" - - def test_brute_force_protection_authenticated_user_tracking(self): - """Test brute force protection for authenticated users""" - self.client.login(username=self.username, password=self.password) - - # Make requests that will fail (user not in group) to trigger brute force protection - nonexistent_group = random_string() - for i in range(7): - response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") - - if i < 5: - # First 5 requests should fail with 403 (not in group) - self.assertEqual(response.status_code, 403) - else: - # Requests 6+ should be blocked - self.assertEqual(response.status_code, 429) - data = json.loads(response.content) - self.assertFalse(data["authenticated"]) - self.assertEqual(data["reason"], "too_many_attempts") - - # Verify brute force attempt was created and blocked - attempt = BruteForceAttempt.objects.get(user=self.user) - self.assertEqual(attempt.attempt_count, 5) # Blocked at threshold - self.assertTrue(attempt.is_blocked) - - def test_brute_force_protection_unauthenticated_user_tracking(self): - """Test brute force protection for unauthenticated users""" - # Make requests without authentication - for i in range(7): - response = self.client.get("/accounts/api/external/") - - if i < 5: - # First 5 requests should return 401 - self.assertEqual(response.status_code, 401) - else: - # Requests 6+ should be blocked - self.assertEqual(response.status_code, 429) - data = json.loads(response.content) - self.assertFalse(data["authenticated"]) - self.assertEqual(data["reason"], "too_many_attempts") - - # Verify brute force attempt was created and blocked - attempt = BruteForceAttempt.objects.get(username=None, ip_address__isnull=False) - self.assertEqual(attempt.attempt_count, 5) # Blocked at threshold - self.assertTrue(attempt.is_blocked) - - def test_brute_force_protection_with_username_parameter(self): - """Test brute force protection with username parameter for unauthenticated users""" - test_username = random_string() - # Make requests with username parameter - for i in range(7): - response = self.client.get( - "/accounts/api/external/", {"username": test_username} - ) - - if i < 5: - # First 5 requests should return 401 - self.assertEqual(response.status_code, 401) - else: - # Requests 6+ should be blocked - self.assertEqual(response.status_code, 429) - - # Verify brute force attempt was created with username - attempt = BruteForceAttempt.objects.get(username=test_username) - self.assertEqual(attempt.attempt_count, 5) - self.assertTrue(attempt.is_blocked) - self.assertIsNotNone(attempt.ip_address) - - def test_brute_force_protection_time_window_reset(self): - """Test that brute force protection resets after time window""" - self.client.login(username=self.username, password=self.password) - - # Make requests to trigger blocking - nonexistent_group = random_string() - for i in range(6): - response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") - - # Should be blocked now - self.assertEqual(response.status_code, 429) - - # Manually reset the time window by updating the attempt - attempt = BruteForceAttempt.objects.get(user=self.user) - old_time = timezone.now() - timezone.timedelta(minutes=20) - attempt.first_attempt = old_time - attempt.last_attempt = old_time # Also set last_attempt to old time - attempt.save() - - # Next request should reset and succeed - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 200) - - # Verify attempt was reset - attempt.refresh_from_db() - self.assertEqual(attempt.attempt_count, 0) - self.assertFalse(attempt.is_blocked) - - def test_brute_force_protection_successful_auth_resets_attempts(self): - """Test that successful authentication resets brute force attempts""" - # Login with the correct credentials from setUp - login_success = self.client.login( - username=self.username, password=self.password - ) - self.assertTrue(login_success, "Login should succeed") - - # Make some requests to build up attempt count - for i in range(3): - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 200) - - # Check that attempt count was reset after successful auth - attempt = BruteForceAttempt.objects.get(user=self.user) - self.assertEqual(attempt.attempt_count, 0) - self.assertFalse(attempt.is_blocked) - - def test_brute_force_protection_different_users_independent(self): - """Test that brute force protection is independent for different users""" - user2_password = random_string() - user2 = User.objects.create_user( - username=random_string(), - password=user2_password, - email=f"{random_string()}@{random_string()}.com", - ) - - # Block first user - self.client.login(username=self.username, password=self.password) - nonexistent_group = random_string() - for i in range(6): - response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") - - self.assertEqual(response.status_code, 429) - - # Second user should still be able to authenticate - login_success = self.client.login( - username=user2.username, password=user2_password - ) - self.assertTrue(login_success, "Second user login should succeed") - - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 200) - - # Verify separate attempts were tracked - attempt1 = BruteForceAttempt.objects.get(user=self.user) - attempt2 = BruteForceAttempt.objects.get(user=user2) - - self.assertTrue(attempt1.is_blocked) - self.assertFalse(attempt2.is_blocked) - - def test_brute_force_protection_different_ips_independent(self): - """Test that brute force protection is independent for different IPs""" - # Mock different IP addresses - with patch("ivatar.ivataraccount.auth_views.get_client_ip") as mock_get_ip: - # First IP gets blocked - mock_get_ip.return_value = ("192.168.1.100", False) - - for i in range(6): - response = self.client.get("/accounts/api/external/") - - self.assertEqual(response.status_code, 429) - - # Second IP should still work - mock_get_ip.return_value = ("192.168.1.200", False) - response = self.client.get("/accounts/api/external/") - self.assertEqual( - response.status_code, 401 - ) # Not authenticated, but not blocked - - # Verify separate attempts were tracked - attempt1 = BruteForceAttempt.objects.get(ip_address="192.168.1.100") - attempt2 = BruteForceAttempt.objects.get(ip_address="192.168.1.200") - - self.assertTrue(attempt1.is_blocked) - self.assertFalse(attempt2.is_blocked) - - def test_brute_force_protection_user_agent_tracking(self): - """Test that user agent is properly tracked""" - login_success = self.client.login( - username=self.username, password=self.password - ) - self.assertTrue(login_success, "Login should succeed") - - # Make request with custom user agent - response = self.client.get( - "/accounts/api/external/", HTTP_USER_AGENT="Custom Test Agent" - ) - - self.assertEqual(response.status_code, 200) - - # Verify user agent was tracked - attempt = BruteForceAttempt.objects.get(user=self.user) - self.assertEqual(attempt.user_agent, "Custom Test Agent") - - def test_brute_force_protection_custom_thresholds(self): - """Test brute force protection with custom thresholds""" - login_success = self.client.login( - username=self.username, password=self.password - ) - self.assertTrue(login_success, "Login should succeed") - - # Create attempt with custom threshold - attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2) - - # Test with custom max_attempts - self.assertFalse(attempt.should_block(max_attempts=3, time_window_minutes=15)) - - attempt.attempt_count = 3 - attempt.save() - self.assertTrue(attempt.should_block(max_attempts=3, time_window_minutes=15)) - - def test_brute_force_protection_custom_time_window(self): - """Test brute force protection with custom time window""" - self.client.login(username=self.username, password=self.password) - - # Create attempt with old timestamp - old_time = timezone.now() - timezone.timedelta(minutes=10) - attempt = BruteForceAttempt.objects.create( - user=self.user, attempt_count=10, first_attempt=old_time - ) - # Override last_attempt to be old as well (since save() sets it to now) - attempt.last_attempt = old_time - attempt.save() - - # Test with 5-minute window (should not block - outside window) - should_block_5min = attempt.should_block(max_attempts=5, time_window_minutes=5) - self.assertFalse(should_block_5min) - - # Create a fresh attempt for the 15-minute test to avoid side effects - attempt2 = BruteForceAttempt.objects.create( - user=self.user, attempt_count=10, first_attempt=old_time - ) - attempt2.last_attempt = old_time - attempt2.save() - - # Test with 15-minute window (should block - within window) - should_block_15min = attempt2.should_block( - max_attempts=5, time_window_minutes=15 - ) - self.assertTrue(should_block_15min) - - def test_brute_force_protection_explicit_blocking(self): - """Test explicit blocking of brute force attempts""" - attempt = BruteForceAttempt.objects.create( - user=self.user, attempt_count=2, is_blocked=True # Explicitly blocked - ) - - # Should block regardless of attempt count - self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15)) - - def test_brute_force_protection_increment_blocks_at_threshold(self): - """Test that increment_attempt blocks at threshold""" - attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=4) - - # Increment should block at threshold - attempt.increment_attempt() - - self.assertEqual(attempt.attempt_count, 5) - self.assertTrue(attempt.is_blocked) - - def test_brute_force_protection_get_or_create_behavior(self): - """Test get_or_create behavior for brute force attempts""" - # First call should create - attempt, created = BruteForceAttempt.objects.get_or_create( - user=self.user, defaults={"attempt_count": 0} - ) - self.assertTrue(created) - self.assertEqual(attempt.attempt_count, 0) - - # Second call should get existing - attempt2, created2 = BruteForceAttempt.objects.get_or_create( - user=self.user, defaults={"attempt_count": 0} - ) - self.assertFalse(created2) - self.assertEqual(attempt.id, attempt2.id) - - def test_brute_force_protection_unique_constraints(self): - """Test unique constraints for brute force attempts""" - test_username = random_string() - # Create first attempt - BruteForceAttempt.objects.create( - user=self.user, username=test_username, ip_address=self.ip_address - ) - - # Try to create duplicate (should fail) - with self.assertRaises(Exception): # IntegrityError - BruteForceAttempt.objects.create( - user=self.user, username=test_username, ip_address=self.ip_address - ) - - def test_brute_force_protection_mixed_authentication_states(self): - """Test brute force protection with mixed authentication states""" - # Start unauthenticated - for i in range(3): - response = self.client.get( - "/accounts/api/external/", {"username": self.username} - ) - self.assertEqual(response.status_code, 401) - - # Switch to authenticated - login_success = self.client.login( - username=self.username, password=self.password - ) - self.assertTrue(login_success, "Login should succeed") - - # Should still track by user now - response = self.client.get("/accounts/api/external/") - self.assertEqual(response.status_code, 200) - - # Verify attempt was created for authenticated user - attempt = BruteForceAttempt.objects.get(user=self.user) - self.assertEqual(attempt.attempt_count, 0) # Reset after successful auth - - def test_brute_force_protection_edge_case_empty_username(self): - """Test brute force protection with empty username parameter""" - # Make requests with empty username - for i in range(6): - response = self.client.get("/accounts/api/external/", {"username": ""}) - - if i < 5: - self.assertEqual(response.status_code, 401) - else: - self.assertEqual(response.status_code, 429) - - # Verify attempt was created with None username - attempt = BruteForceAttempt.objects.get(username=None) - self.assertEqual(attempt.attempt_count, 5) - self.assertTrue(attempt.is_blocked) - - def test_brute_force_protection_concurrent_requests(self): - """Test brute force protection with concurrent-like requests""" - self.client.login(username=self.username, password=self.password) - - # Simulate rapid requests that will fail - nonexistent_group = random_string() - responses = [] - for i in range(10): - response = self.client.get(f"/accounts/api/external/{nonexistent_group}/") - responses.append(response.status_code) - - # First 5 should fail with 403, rest should be blocked - self.assertEqual(responses[:5], [403] * 5) - self.assertEqual(responses[5:], [429] * 5) - - # Verify final state - attempt = BruteForceAttempt.objects.get(user=self.user) - self.assertEqual(attempt.attempt_count, 5) - self.assertTrue(attempt.is_blocked) diff --git a/ivatar/ivataraccount/test_token_auth.py b/ivatar/ivataraccount/test_token_auth.py deleted file mode 100644 index d0bf29f..0000000 --- a/ivatar/ivataraccount/test_token_auth.py +++ /dev/null @@ -1,232 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Test token authentication backend for ivatar -""" - -import os -import django -from django.test import TestCase -from django.contrib.auth.models import User -from django.utils import timezone -from django.http import HttpRequest - -os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" -django.setup() - -from ivatar.ivataraccount.auth_models import AuthToken -from ivatar.ivataraccount.token_auth import TokenAuthenticationBackend -from ivatar.utils import random_string - - -class TokenAuthenticationBackendTestCase(TestCase): - """ - Test cases for TokenAuthenticationBackend - """ - - def setUp(self): - """Set up test data""" - self.username = random_string() - self.password = random_string() - self.email = f"{random_string()}@{random_string()}.com" - self.user = User.objects.create_user( - username=self.username, password=self.password, email=self.email - ) - self.backend = TokenAuthenticationBackend() - self.request = HttpRequest() - - def test_authenticate_with_valid_token(self): - """Test authentication with valid token""" - # Create a valid token - _token = AuthToken.objects.create( - token="valid-token-123", user=self.user, ip_address="127.0.0.1" - ) - - # Authenticate with the token - authenticated_user = self.backend.authenticate( - request=self.request, token="valid-token-123" - ) - - self.assertEqual(authenticated_user, self.user) - - # Token should still be active - _token.refresh_from_db() - self.assertTrue(_token.is_active) - - def test_authenticate_with_invalid_token(self): - """Test authentication with invalid token""" - # Try to authenticate with non-existent token - authenticated_user = self.backend.authenticate( - request=self.request, token="invalid-token-456" - ) - - self.assertIsNone(authenticated_user) - - def test_authenticate_with_expired_token(self): - """Test authentication with expired token""" - # Create an expired token - past_time = timezone.now() - timezone.timedelta(hours=2) - _token = AuthToken.objects.create(token="expired-token-789", user=self.user) - _token.expires_at = past_time - _token.save() - - # Try to authenticate with expired token - authenticated_user = self.backend.authenticate( - request=self.request, token="expired-token-789" - ) - - self.assertIsNone(authenticated_user) - - # Token should be deactivated - _token.refresh_from_db() - self.assertFalse(_token.is_active) - - def test_authenticate_with_inactive_token(self): - """Test authentication with inactive token""" - # Create an inactive token - AuthToken.objects.create( - token="inactive-token-101", user=self.user, is_active=False - ) - - # Try to authenticate with inactive token - authenticated_user = self.backend.authenticate( - request=self.request, token="inactive-token-101" - ) - - self.assertIsNone(authenticated_user) - - def test_authenticate_without_token(self): - """Test authentication without token parameter""" - authenticated_user = self.backend.authenticate(request=self.request, token=None) - - self.assertIsNone(authenticated_user) - - def test_authenticate_with_empty_token(self): - """Test authentication with empty token""" - authenticated_user = self.backend.authenticate(request=self.request, token="") - - self.assertIsNone(authenticated_user) - - def test_get_user_with_valid_id(self): - """Test get_user with valid user ID""" - user = self.backend.get_user(self.user.id) - - self.assertEqual(user, self.user) - self.assertEqual(user.username, self.username) - self.assertEqual(user.email, self.email) - - def test_get_user_with_invalid_id(self): - """Test get_user with invalid user ID""" - user = self.backend.get_user(99999) # Non-existent ID - - self.assertIsNone(user) - - def test_get_user_with_none_id(self): - """Test get_user with None ID""" - user = self.backend.get_user(None) - - self.assertIsNone(user) - - def test_authenticate_token_case_sensitivity(self): - """Test that token authentication is case sensitive""" - # Create a token with specific case - # Create token with case-sensitive name - AuthToken.objects.create(token="Case-Sensitive-Token", user=self.user) - - # Try with different case - authenticated_user = self.backend.authenticate( - request=self.request, token="case-sensitive-token" - ) - - self.assertIsNone(authenticated_user) - - # Try with correct case - authenticated_user = self.backend.authenticate( - request=self.request, token="Case-Sensitive-Token" - ) - - self.assertEqual(authenticated_user, self.user) - - def test_authenticate_multiple_tokens_same_user(self): - """Test authentication with multiple tokens for same user""" - # Create multiple tokens for the same user - AuthToken.objects.create(token="token-1", user=self.user) - AuthToken.objects.create(token="token-2", user=self.user) - - # Both tokens should authenticate to the same user - user1 = self.backend.authenticate(request=self.request, token="token-1") - user2 = self.backend.authenticate(request=self.request, token="token-2") - - self.assertEqual(user1, self.user) - self.assertEqual(user2, self.user) - self.assertEqual(user1, user2) - - def test_authenticate_token_with_special_characters(self): - """Test authentication with token containing special characters""" - special_token = "token-with-special-chars!@#$%^&*()_+-=[]{}|;:,.<>?" - - AuthToken.objects.create(token=special_token, user=self.user) - - authenticated_user = self.backend.authenticate( - request=self.request, token=special_token - ) - - self.assertEqual(authenticated_user, self.user) - - def test_authenticate_token_length_variations(self): - """Test authentication with tokens of different lengths""" - # Short token - short_token = "abc" - AuthToken.objects.create(token=short_token, user=self.user) - - # Long token (within database limit) - long_token = "a" * 64 - AuthToken.objects.create(token=long_token, user=self.user) - - # Both should work - user1 = self.backend.authenticate(request=self.request, token=short_token) - user2 = self.backend.authenticate(request=self.request, token=long_token) - - self.assertEqual(user1, self.user) - self.assertEqual(user2, self.user) - - def test_authenticate_with_deleted_user(self): - """Test authentication with token for deleted user""" - # Create token for user - AuthToken.objects.create(token="token-for-deleted-user", user=self.user) - - # Delete the user - self.user.delete() - - # Try to authenticate with token for deleted user - authenticated_user = self.backend.authenticate( - request=self.request, token="token-for-deleted-user" - ) - - # Should return None (token should be invalid) - self.assertIsNone(authenticated_user) - - def test_authenticate_token_unicode_characters(self): - """Test authentication with token containing unicode characters""" - unicode_token = "token-with-unicode-ñáéíóú-中文-🚀" - - AuthToken.objects.create(token=unicode_token, user=self.user) - - authenticated_user = self.backend.authenticate( - request=self.request, token=unicode_token - ) - - self.assertEqual(authenticated_user, self.user) - - def test_backend_name(self): - """Test that backend has correct name""" - self.assertEqual(self.backend.__class__.__name__, "TokenAuthenticationBackend") - - def test_authenticate_with_request_none(self): - """Test authentication with None request""" - AuthToken.objects.create(token="token-with-none-request", user=self.user) - - authenticated_user = self.backend.authenticate( - request=None, token="token-with-none-request" - ) - - self.assertEqual(authenticated_user, self.user) diff --git a/ivatar/ivataraccount/token_auth.py b/ivatar/ivataraccount/token_auth.py deleted file mode 100644 index 92bb11f..0000000 --- a/ivatar/ivataraccount/token_auth.py +++ /dev/null @@ -1,49 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Token-based authentication backend for ivatar -""" - -from typing import Optional -from django.contrib.auth.backends import BaseBackend -from django.contrib.auth.models import User -from django.http import HttpRequest -from .auth_models import AuthToken - - -class TokenAuthenticationBackend(BaseBackend): - """ - Authentication backend that uses tokens for authentication - """ - - def authenticate( - self, request: Optional[HttpRequest], token: Optional[str] = None, **kwargs - ) -> Optional[User]: - """ - Authenticate using a token - """ - if not token: - return None - - try: - auth_token = AuthToken.objects.get(token=token, is_active=True) - - # Check if token is expired - if auth_token.is_expired(): - # Mark token as inactive - auth_token.is_active = False - auth_token.save() - return None - - return auth_token.user - - except AuthToken.DoesNotExist: - return None - - def get_user(self, user_id: int) -> Optional[User]: - """ - Get user by ID - """ - try: - return User.objects.get(pk=user_id) - except User.DoesNotExist: - return None diff --git a/ivatar/ivataraccount/urls.py b/ivatar/ivataraccount/urls.py index 1adfcb2..ae6a1c4 100644 --- a/ivatar/ivataraccount/urls.py +++ b/ivatar/ivataraccount/urls.py @@ -27,7 +27,6 @@ from .views import ResendConfirmationMailView from .views import IvatarLoginView from .views import DeleteAccountView from .views import ExportView -from .auth_views import ExternalAuthView # Define URL patterns, self documenting # To see the fancy, colorful evaluation of these use: @@ -167,9 +166,4 @@ urlpatterns = [ # pylint: disable=invalid-name ResendConfirmationMailView.as_view(), name="resend_confirmation_mail", ), - re_path( - r"api/external/(?P\w+)?/?$", - ExternalAuthView.as_view(), - name="external_auth", - ), ]