diff --git a/ivatar/ivataraccount/auth_models.py b/ivatar/ivataraccount/auth_models.py index 1d839a7..c6e2d44 100644 --- a/ivatar/ivataraccount/auth_models.py +++ b/ivatar/ivataraccount/auth_models.py @@ -70,18 +70,32 @@ class BruteForceAttempt(models.Model): if self.is_blocked: return True - # Reset attempts if outside time window - if timezone.now() - self.first_attempt > timezone.timedelta( - minutes=time_window_minutes - ): - self.attempt_count = 1 - self.first_attempt = timezone.now() - self.is_blocked = False - self.save() + # Check if outside time window + time_diff = timezone.now() - self.first_attempt + time_window = timezone.timedelta(minutes=time_window_minutes) + + if time_diff > time_window: + # Time window has expired, should not block return False return self.attempt_count >= max_attempts + def reset_if_expired(self, time_window_minutes: int = 15) -> bool: + """ + Reset attempt if time window has expired + Returns True if reset was performed, False otherwise + """ + time_diff = timezone.now() - self.first_attempt + time_window = timezone.timedelta(minutes=time_window_minutes) + + if time_diff > time_window: + self.attempt_count = 0 + self.first_attempt = timezone.now() + self.is_blocked = False + self.save() + return True + return False + def increment_attempt(self) -> None: """Increment the attempt count""" self.attempt_count += 1 diff --git a/ivatar/ivataraccount/auth_views.py b/ivatar/ivataraccount/auth_views.py index 6d8cc54..a3c23c5 100644 --- a/ivatar/ivataraccount/auth_views.py +++ b/ivatar/ivataraccount/auth_views.py @@ -46,6 +46,9 @@ class ExternalAuthView(View): defaults={"attempt_count": 0}, ) + # Reset attempts if time window has expired + brute_force_attempt.reset_if_expired() + if brute_force_attempt.should_block(): return JsonResponse( { @@ -70,7 +73,16 @@ class ExternalAuthView(View): # If group is specified, check if user is in that group if group: - if not request.user.groups.filter(name=group).exists(): + # Special handling for 'admin' group: also check if user is superuser + if group == "admin": + is_in_group = ( + request.user.groups.filter(name=group).exists() + or request.user.is_superuser + ) + else: + is_in_group = request.user.groups.filter(name=group).exists() + + if not is_in_group: brute_force_attempt.increment_attempt() return JsonResponse( { diff --git a/ivatar/ivataraccount/test_auth_models.py b/ivatar/ivataraccount/test_auth_models.py index 1268e21..d0623f2 100644 --- a/ivatar/ivataraccount/test_auth_models.py +++ b/ivatar/ivataraccount/test_auth_models.py @@ -13,6 +13,7 @@ os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings" django.setup() from ivatar.ivataraccount.auth_models import AuthToken, BruteForceAttempt +from ivatar.utils import random_string class AuthTokenTestCase(TestCase): @@ -22,8 +23,11 @@ class AuthTokenTestCase(TestCase): def setUp(self): """Set up test data""" + self.username = random_string() + self.password = random_string() + self.email = f"{random_string()}@{random_string()}.com" self.user = User.objects.create_user( - username="testuser", password="testpass123", email="test@example.com" + username=self.username, password=self.password, email=self.email ) def test_auth_token_creation(self): @@ -89,8 +93,11 @@ class BruteForceAttemptTestCase(TestCase): def setUp(self): """Set up test data""" + self.username = random_string() + self.password = random_string() + self.email = f"{random_string()}@{random_string()}.com" self.user = User.objects.create_user( - username="testuser", password="testpass123", email="test@example.com" + username=self.username, password=self.password, email=self.email ) self.ip_address = "127.0.0.1" self.user_agent = "Mozilla/5.0 Test Browser" @@ -111,7 +118,7 @@ class BruteForceAttemptTestCase(TestCase): def test_brute_force_attempt_creation_unauthenticated_user(self): """Test BruteForceAttempt creation for unauthenticated user""" - username = "testusername" + username = random_string() attempt = BruteForceAttempt.objects.create( username=username, ip_address=self.ip_address, user_agent=self.user_agent ) @@ -164,9 +171,12 @@ class BruteForceAttemptTestCase(TestCase): # Should reset and not block (outside 15-minute window) self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=15)) + # Reset the attempt count using the new method + attempt.reset_if_expired(time_window_minutes=15) + # Should have reset attempt count attempt.refresh_from_db() - self.assertEqual(attempt.attempt_count, 1) + self.assertEqual(attempt.attempt_count, 0) self.assertFalse(attempt.is_blocked) def test_should_block_when_explicitly_blocked(self): @@ -212,10 +222,11 @@ class BruteForceAttemptTestCase(TestCase): self.assertEqual(str(attempt_with_user), expected_str_user) # Test with username only + test_username = random_string() attempt_with_username = BruteForceAttempt.objects.create( - username="testusername", ip_address=self.ip_address, attempt_count=2 + username=test_username, ip_address=self.ip_address, attempt_count=2 ) - expected_str_username = "Brute force attempt for testusername (2 attempts)" + expected_str_username = f"Brute force attempt for {test_username} (2 attempts)" self.assertEqual(str(attempt_with_username), expected_str_username) # Test with IP only @@ -227,15 +238,16 @@ class BruteForceAttemptTestCase(TestCase): def test_unique_together_constraint(self): """Test unique_together constraint""" + test_username = random_string() # Create first attempt BruteForceAttempt.objects.create( - user=self.user, username="testuser", ip_address=self.ip_address + user=self.user, username=test_username, ip_address=self.ip_address ) # Try to create duplicate (should fail) with self.assertRaises(Exception): # IntegrityError BruteForceAttempt.objects.create( - user=self.user, username="testuser", ip_address=self.ip_address + user=self.user, username=test_username, ip_address=self.ip_address ) def test_get_or_create_behavior(self): diff --git a/ivatar/ivataraccount/test_auth_views.py b/ivatar/ivataraccount/test_auth_views.py index b60ca97..151e2a0 100644 --- a/ivatar/ivataraccount/test_auth_views.py +++ b/ivatar/ivataraccount/test_auth_views.py @@ -105,6 +105,102 @@ class ExternalAuthViewTestCase(TestCase): self.assertEqual(data["reason"], "not_in_group") self.assertEqual(data["message"], f"User is not in group: {self.group_name}") + def test_external_auth_admin_group_superuser_access(self): + """Test that superusers can access admin group even without being in admin group""" + # Make the existing user a superuser + self.user.is_superuser = True + self.user.save() + + # Login as superuser + self.client.login(username=self.username, password=self.password) + + # Try to access admin group (superuser should have access even without being in admin group) + response = self.client.get("/accounts/api/external/admin/") + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["authenticated"]) + self.assertIn("token", data) + + # Verify token was created for superuser + token_obj = AuthToken.objects.get(token=data["token"]) + self.assertEqual(token_obj.user, self.user) + + def test_external_auth_admin_group_regular_user_in_admin_group(self): + """Test that regular users in admin group can access admin group""" + # Create admin group + admin_group = Group.objects.create(name="admin") + + # Add user to admin group + self.user.groups.add(admin_group) + + # Login as user + self.client.login(username=self.username, password=self.password) + + # Try to access admin group + response = self.client.get("/accounts/api/external/admin/") + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["authenticated"]) + self.assertIn("token", data) + + def test_external_auth_admin_group_regular_user_not_in_admin_group(self): + """Test that regular users not in admin group cannot access admin group""" + # Login as user (not in admin group) + self.client.login(username=self.username, password=self.password) + + # Try to access admin group + response = self.client.get("/accounts/api/external/admin/") + + self.assertEqual(response.status_code, 403) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "not_in_group") + self.assertEqual(data["message"], "User is not in group: admin") + + def test_external_auth_admin_group_superuser_and_in_admin_group(self): + """Test that superusers who are also in admin group can access admin group""" + # Create admin group + admin_group = Group.objects.create(name="admin") + + # Make the existing user a superuser + self.user.is_superuser = True + self.user.save() + + # Add user to admin group as well + self.user.groups.add(admin_group) + + # Login as superuser + self.client.login(username=self.username, password=self.password) + + # Try to access admin group + response = self.client.get("/accounts/api/external/admin/") + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["authenticated"]) + self.assertIn("token", data) + + def test_external_auth_non_admin_group_normal_behavior(self): + """Test that non-admin groups work normally (no superuser privilege)""" + # Make the existing user a superuser + self.user.is_superuser = True + self.user.save() + + # Login as superuser + self.client.login(username=self.username, password=self.password) + + # Try to access a different group that the user is NOT in (superuser should NOT have automatic access) + different_group = random_string() + response = self.client.get(f"/accounts/api/external/{different_group}/") + + self.assertEqual(response.status_code, 403) + data = json.loads(response.content) + self.assertFalse(data["authenticated"]) + self.assertEqual(data["reason"], "not_in_group") + self.assertEqual(data["message"], f"User is not in group: {different_group}") + def test_external_auth_nonexistent_group(self): """Test external auth endpoint with non-existent group""" nonexistent_group = random_string() @@ -230,15 +326,16 @@ class ExternalAuthViewTestCase(TestCase): def test_external_auth_brute_force_tracking_by_ip_and_username(self): """Test that brute force attempts are tracked by IP and username for unauthenticated users""" + test_username = random_string() # Make requests with username parameter for i in range(3): response = self.client.get( - "/accounts/api/external/", {"username": "testuser"} + "/accounts/api/external/", {"username": test_username} ) self.assertEqual(response.status_code, 401) # Check that attempt is tracked by username and IP - attempt = BruteForceAttempt.objects.get(username="testuser") + attempt = BruteForceAttempt.objects.get(username=test_username) self.assertEqual(attempt.attempt_count, 3) self.assertIsNotNone(attempt.ip_address) diff --git a/ivatar/ivataraccount/test_brute_force.py b/ivatar/ivataraccount/test_brute_force.py index 2dace56..03636d4 100644 --- a/ivatar/ivataraccount/test_brute_force.py +++ b/ivatar/ivataraccount/test_brute_force.py @@ -83,10 +83,11 @@ class BruteForceProtectionTestCase(TestCase): def test_brute_force_protection_with_username_parameter(self): """Test brute force protection with username parameter for unauthenticated users""" + test_username = random_string() # Make requests with username parameter for i in range(7): response = self.client.get( - "/accounts/api/external/", {"username": "testuser"} + "/accounts/api/external/", {"username": test_username} ) if i < 5: @@ -97,7 +98,7 @@ class BruteForceProtectionTestCase(TestCase): self.assertEqual(response.status_code, 429) # Verify brute force attempt was created with username - attempt = BruteForceAttempt.objects.get(username="testuser") + attempt = BruteForceAttempt.objects.get(username=test_username) self.assertEqual(attempt.attempt_count, 5) self.assertTrue(attempt.is_blocked) self.assertIsNotNone(attempt.ip_address) @@ -118,6 +119,7 @@ class BruteForceProtectionTestCase(TestCase): attempt = BruteForceAttempt.objects.get(user=self.user) old_time = timezone.now() - timezone.timedelta(minutes=20) attempt.first_attempt = old_time + attempt.last_attempt = old_time # Also set last_attempt to old time attempt.save() # Next request should reset and succeed @@ -131,7 +133,11 @@ class BruteForceProtectionTestCase(TestCase): def test_brute_force_protection_successful_auth_resets_attempts(self): """Test that successful authentication resets brute force attempts""" - self.client.login(username="testuser", password="testpass123") + # Login with the correct credentials from setUp + login_success = self.client.login( + username=self.username, password=self.password + ) + self.assertTrue(login_success, "Login should succeed") # Make some requests to build up attempt count for i in range(3): @@ -145,9 +151,10 @@ class BruteForceProtectionTestCase(TestCase): def test_brute_force_protection_different_users_independent(self): """Test that brute force protection is independent for different users""" + user2_password = random_string() user2 = User.objects.create_user( username=random_string(), - password=random_string(), + password=user2_password, email=f"{random_string()}@{random_string()}.com", ) @@ -160,7 +167,11 @@ class BruteForceProtectionTestCase(TestCase): self.assertEqual(response.status_code, 429) # Second user should still be able to authenticate - self.client.login(username=user2.username, password=user2.password) + login_success = self.client.login( + username=user2.username, password=user2_password + ) + self.assertTrue(login_success, "Second user login should succeed") + response = self.client.get("/accounts/api/external/") self.assertEqual(response.status_code, 200) @@ -199,7 +210,10 @@ class BruteForceProtectionTestCase(TestCase): def test_brute_force_protection_user_agent_tracking(self): """Test that user agent is properly tracked""" - self.client.login(username="testuser", password="testpass123") + login_success = self.client.login( + username=self.username, password=self.password + ) + self.assertTrue(login_success, "Login should succeed") # Make request with custom user agent response = self.client.get( @@ -214,7 +228,10 @@ class BruteForceProtectionTestCase(TestCase): def test_brute_force_protection_custom_thresholds(self): """Test brute force protection with custom thresholds""" - self.client.login(username="testuser", password="testpass123") + login_success = self.client.login( + username=self.username, password=self.password + ) + self.assertTrue(login_success, "Login should succeed") # Create attempt with custom threshold attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2) @@ -235,12 +252,26 @@ class BruteForceProtectionTestCase(TestCase): attempt = BruteForceAttempt.objects.create( user=self.user, attempt_count=10, first_attempt=old_time ) + # Override last_attempt to be old as well (since save() sets it to now) + attempt.last_attempt = old_time + attempt.save() - # Should not block with 5-minute window (outside window) - self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=5)) + # Test with 5-minute window (should not block - outside window) + should_block_5min = attempt.should_block(max_attempts=5, time_window_minutes=5) + self.assertFalse(should_block_5min) - # Should block with 15-minute window (within window) - self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15)) + # Create a fresh attempt for the 15-minute test to avoid side effects + attempt2 = BruteForceAttempt.objects.create( + user=self.user, attempt_count=10, first_attempt=old_time + ) + attempt2.last_attempt = old_time + attempt2.save() + + # Test with 15-minute window (should block - within window) + should_block_15min = attempt2.should_block( + max_attempts=5, time_window_minutes=15 + ) + self.assertTrue(should_block_15min) def test_brute_force_protection_explicit_blocking(self): """Test explicit blocking of brute force attempts""" @@ -279,15 +310,16 @@ class BruteForceProtectionTestCase(TestCase): def test_brute_force_protection_unique_constraints(self): """Test unique constraints for brute force attempts""" + test_username = random_string() # Create first attempt BruteForceAttempt.objects.create( - user=self.user, username="testuser", ip_address=self.ip_address + user=self.user, username=test_username, ip_address=self.ip_address ) # Try to create duplicate (should fail) with self.assertRaises(Exception): # IntegrityError BruteForceAttempt.objects.create( - user=self.user, username="testuser", ip_address=self.ip_address + user=self.user, username=test_username, ip_address=self.ip_address ) def test_brute_force_protection_mixed_authentication_states(self): @@ -295,12 +327,15 @@ class BruteForceProtectionTestCase(TestCase): # Start unauthenticated for i in range(3): response = self.client.get( - "/accounts/api/external/", {"username": "testuser"} + "/accounts/api/external/", {"username": self.username} ) self.assertEqual(response.status_code, 401) # Switch to authenticated - self.client.login(username="testuser", password="testpass123") + login_success = self.client.login( + username=self.username, password=self.password + ) + self.assertTrue(login_success, "Login should succeed") # Should still track by user now response = self.client.get("/accounts/api/external/") diff --git a/ivatar/ivataraccount/test_token_auth.py b/ivatar/ivataraccount/test_token_auth.py index ea57835..d0bf29f 100644 --- a/ivatar/ivataraccount/test_token_auth.py +++ b/ivatar/ivataraccount/test_token_auth.py @@ -15,6 +15,7 @@ django.setup() from ivatar.ivataraccount.auth_models import AuthToken from ivatar.ivataraccount.token_auth import TokenAuthenticationBackend +from ivatar.utils import random_string class TokenAuthenticationBackendTestCase(TestCase): @@ -24,8 +25,11 @@ class TokenAuthenticationBackendTestCase(TestCase): def setUp(self): """Set up test data""" + self.username = random_string() + self.password = random_string() + self.email = f"{random_string()}@{random_string()}.com" self.user = User.objects.create_user( - username="testuser", password="testpass123", email="test@example.com" + username=self.username, password=self.password, email=self.email ) self.backend = TokenAuthenticationBackend() self.request = HttpRequest() @@ -107,8 +111,8 @@ class TokenAuthenticationBackendTestCase(TestCase): user = self.backend.get_user(self.user.id) self.assertEqual(user, self.user) - self.assertEqual(user.username, "testuser") - self.assertEqual(user.email, "test@example.com") + self.assertEqual(user.username, self.username) + self.assertEqual(user.email, self.email) def test_get_user_with_invalid_id(self): """Test get_user with invalid user ID""" @@ -174,8 +178,8 @@ class TokenAuthenticationBackendTestCase(TestCase): short_token = "abc" AuthToken.objects.create(token=short_token, user=self.user) - # Long token - long_token = "a" * 100 + # Long token (within database limit) + long_token = "a" * 64 AuthToken.objects.create(token=long_token, user=self.user) # Both should work