§Fix and enhance tests, esp. remove occurances of hardcoded username/pass/email. Also treat request to admin group special and also allow superusers, which is a flag on the userobject and not a group

This commit is contained in:
Oliver Falk
2025-09-06 11:13:11 +02:00
parent aa742ea181
commit 85c06cd42c
6 changed files with 213 additions and 39 deletions

View File

@@ -70,18 +70,32 @@ class BruteForceAttempt(models.Model):
if self.is_blocked:
return True
# Reset attempts if outside time window
if timezone.now() - self.first_attempt > timezone.timedelta(
minutes=time_window_minutes
):
self.attempt_count = 1
self.first_attempt = timezone.now()
self.is_blocked = False
self.save()
# Check if outside time window
time_diff = timezone.now() - self.first_attempt
time_window = timezone.timedelta(minutes=time_window_minutes)
if time_diff > time_window:
# Time window has expired, should not block
return False
return self.attempt_count >= max_attempts
def reset_if_expired(self, time_window_minutes: int = 15) -> bool:
"""
Reset attempt if time window has expired
Returns True if reset was performed, False otherwise
"""
time_diff = timezone.now() - self.first_attempt
time_window = timezone.timedelta(minutes=time_window_minutes)
if time_diff > time_window:
self.attempt_count = 0
self.first_attempt = timezone.now()
self.is_blocked = False
self.save()
return True
return False
def increment_attempt(self) -> None:
"""Increment the attempt count"""
self.attempt_count += 1

View File

@@ -46,6 +46,9 @@ class ExternalAuthView(View):
defaults={"attempt_count": 0},
)
# Reset attempts if time window has expired
brute_force_attempt.reset_if_expired()
if brute_force_attempt.should_block():
return JsonResponse(
{
@@ -70,7 +73,16 @@ class ExternalAuthView(View):
# If group is specified, check if user is in that group
if group:
if not request.user.groups.filter(name=group).exists():
# Special handling for 'admin' group: also check if user is superuser
if group == "admin":
is_in_group = (
request.user.groups.filter(name=group).exists()
or request.user.is_superuser
)
else:
is_in_group = request.user.groups.filter(name=group).exists()
if not is_in_group:
brute_force_attempt.increment_attempt()
return JsonResponse(
{

View File

@@ -13,6 +13,7 @@ os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
from ivatar.ivataraccount.auth_models import AuthToken, BruteForceAttempt
from ivatar.utils import random_string
class AuthTokenTestCase(TestCase):
@@ -22,8 +23,11 @@ class AuthTokenTestCase(TestCase):
def setUp(self):
"""Set up test data"""
self.username = random_string()
self.password = random_string()
self.email = f"{random_string()}@{random_string()}.com"
self.user = User.objects.create_user(
username="testuser", password="testpass123", email="test@example.com"
username=self.username, password=self.password, email=self.email
)
def test_auth_token_creation(self):
@@ -89,8 +93,11 @@ class BruteForceAttemptTestCase(TestCase):
def setUp(self):
"""Set up test data"""
self.username = random_string()
self.password = random_string()
self.email = f"{random_string()}@{random_string()}.com"
self.user = User.objects.create_user(
username="testuser", password="testpass123", email="test@example.com"
username=self.username, password=self.password, email=self.email
)
self.ip_address = "127.0.0.1"
self.user_agent = "Mozilla/5.0 Test Browser"
@@ -111,7 +118,7 @@ class BruteForceAttemptTestCase(TestCase):
def test_brute_force_attempt_creation_unauthenticated_user(self):
"""Test BruteForceAttempt creation for unauthenticated user"""
username = "testusername"
username = random_string()
attempt = BruteForceAttempt.objects.create(
username=username, ip_address=self.ip_address, user_agent=self.user_agent
)
@@ -164,9 +171,12 @@ class BruteForceAttemptTestCase(TestCase):
# Should reset and not block (outside 15-minute window)
self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=15))
# Reset the attempt count using the new method
attempt.reset_if_expired(time_window_minutes=15)
# Should have reset attempt count
attempt.refresh_from_db()
self.assertEqual(attempt.attempt_count, 1)
self.assertEqual(attempt.attempt_count, 0)
self.assertFalse(attempt.is_blocked)
def test_should_block_when_explicitly_blocked(self):
@@ -212,10 +222,11 @@ class BruteForceAttemptTestCase(TestCase):
self.assertEqual(str(attempt_with_user), expected_str_user)
# Test with username only
test_username = random_string()
attempt_with_username = BruteForceAttempt.objects.create(
username="testusername", ip_address=self.ip_address, attempt_count=2
username=test_username, ip_address=self.ip_address, attempt_count=2
)
expected_str_username = "Brute force attempt for testusername (2 attempts)"
expected_str_username = f"Brute force attempt for {test_username} (2 attempts)"
self.assertEqual(str(attempt_with_username), expected_str_username)
# Test with IP only
@@ -227,15 +238,16 @@ class BruteForceAttemptTestCase(TestCase):
def test_unique_together_constraint(self):
"""Test unique_together constraint"""
test_username = random_string()
# Create first attempt
BruteForceAttempt.objects.create(
user=self.user, username="testuser", ip_address=self.ip_address
user=self.user, username=test_username, ip_address=self.ip_address
)
# Try to create duplicate (should fail)
with self.assertRaises(Exception): # IntegrityError
BruteForceAttempt.objects.create(
user=self.user, username="testuser", ip_address=self.ip_address
user=self.user, username=test_username, ip_address=self.ip_address
)
def test_get_or_create_behavior(self):

View File

@@ -105,6 +105,102 @@ class ExternalAuthViewTestCase(TestCase):
self.assertEqual(data["reason"], "not_in_group")
self.assertEqual(data["message"], f"User is not in group: {self.group_name}")
def test_external_auth_admin_group_superuser_access(self):
"""Test that superusers can access admin group even without being in admin group"""
# Make the existing user a superuser
self.user.is_superuser = True
self.user.save()
# Login as superuser
self.client.login(username=self.username, password=self.password)
# Try to access admin group (superuser should have access even without being in admin group)
response = self.client.get("/accounts/api/external/admin/")
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
self.assertTrue(data["authenticated"])
self.assertIn("token", data)
# Verify token was created for superuser
token_obj = AuthToken.objects.get(token=data["token"])
self.assertEqual(token_obj.user, self.user)
def test_external_auth_admin_group_regular_user_in_admin_group(self):
"""Test that regular users in admin group can access admin group"""
# Create admin group
admin_group = Group.objects.create(name="admin")
# Add user to admin group
self.user.groups.add(admin_group)
# Login as user
self.client.login(username=self.username, password=self.password)
# Try to access admin group
response = self.client.get("/accounts/api/external/admin/")
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
self.assertTrue(data["authenticated"])
self.assertIn("token", data)
def test_external_auth_admin_group_regular_user_not_in_admin_group(self):
"""Test that regular users not in admin group cannot access admin group"""
# Login as user (not in admin group)
self.client.login(username=self.username, password=self.password)
# Try to access admin group
response = self.client.get("/accounts/api/external/admin/")
self.assertEqual(response.status_code, 403)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "not_in_group")
self.assertEqual(data["message"], "User is not in group: admin")
def test_external_auth_admin_group_superuser_and_in_admin_group(self):
"""Test that superusers who are also in admin group can access admin group"""
# Create admin group
admin_group = Group.objects.create(name="admin")
# Make the existing user a superuser
self.user.is_superuser = True
self.user.save()
# Add user to admin group as well
self.user.groups.add(admin_group)
# Login as superuser
self.client.login(username=self.username, password=self.password)
# Try to access admin group
response = self.client.get("/accounts/api/external/admin/")
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
self.assertTrue(data["authenticated"])
self.assertIn("token", data)
def test_external_auth_non_admin_group_normal_behavior(self):
"""Test that non-admin groups work normally (no superuser privilege)"""
# Make the existing user a superuser
self.user.is_superuser = True
self.user.save()
# Login as superuser
self.client.login(username=self.username, password=self.password)
# Try to access a different group that the user is NOT in (superuser should NOT have automatic access)
different_group = random_string()
response = self.client.get(f"/accounts/api/external/{different_group}/")
self.assertEqual(response.status_code, 403)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "not_in_group")
self.assertEqual(data["message"], f"User is not in group: {different_group}")
def test_external_auth_nonexistent_group(self):
"""Test external auth endpoint with non-existent group"""
nonexistent_group = random_string()
@@ -230,15 +326,16 @@ class ExternalAuthViewTestCase(TestCase):
def test_external_auth_brute_force_tracking_by_ip_and_username(self):
"""Test that brute force attempts are tracked by IP and username for unauthenticated users"""
test_username = random_string()
# Make requests with username parameter
for i in range(3):
response = self.client.get(
"/accounts/api/external/", {"username": "testuser"}
"/accounts/api/external/", {"username": test_username}
)
self.assertEqual(response.status_code, 401)
# Check that attempt is tracked by username and IP
attempt = BruteForceAttempt.objects.get(username="testuser")
attempt = BruteForceAttempt.objects.get(username=test_username)
self.assertEqual(attempt.attempt_count, 3)
self.assertIsNotNone(attempt.ip_address)

View File

@@ -83,10 +83,11 @@ class BruteForceProtectionTestCase(TestCase):
def test_brute_force_protection_with_username_parameter(self):
"""Test brute force protection with username parameter for unauthenticated users"""
test_username = random_string()
# Make requests with username parameter
for i in range(7):
response = self.client.get(
"/accounts/api/external/", {"username": "testuser"}
"/accounts/api/external/", {"username": test_username}
)
if i < 5:
@@ -97,7 +98,7 @@ class BruteForceProtectionTestCase(TestCase):
self.assertEqual(response.status_code, 429)
# Verify brute force attempt was created with username
attempt = BruteForceAttempt.objects.get(username="testuser")
attempt = BruteForceAttempt.objects.get(username=test_username)
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)
self.assertIsNotNone(attempt.ip_address)
@@ -118,6 +119,7 @@ class BruteForceProtectionTestCase(TestCase):
attempt = BruteForceAttempt.objects.get(user=self.user)
old_time = timezone.now() - timezone.timedelta(minutes=20)
attempt.first_attempt = old_time
attempt.last_attempt = old_time # Also set last_attempt to old time
attempt.save()
# Next request should reset and succeed
@@ -131,7 +133,11 @@ class BruteForceProtectionTestCase(TestCase):
def test_brute_force_protection_successful_auth_resets_attempts(self):
"""Test that successful authentication resets brute force attempts"""
self.client.login(username="testuser", password="testpass123")
# Login with the correct credentials from setUp
login_success = self.client.login(
username=self.username, password=self.password
)
self.assertTrue(login_success, "Login should succeed")
# Make some requests to build up attempt count
for i in range(3):
@@ -145,9 +151,10 @@ class BruteForceProtectionTestCase(TestCase):
def test_brute_force_protection_different_users_independent(self):
"""Test that brute force protection is independent for different users"""
user2_password = random_string()
user2 = User.objects.create_user(
username=random_string(),
password=random_string(),
password=user2_password,
email=f"{random_string()}@{random_string()}.com",
)
@@ -160,7 +167,11 @@ class BruteForceProtectionTestCase(TestCase):
self.assertEqual(response.status_code, 429)
# Second user should still be able to authenticate
self.client.login(username=user2.username, password=user2.password)
login_success = self.client.login(
username=user2.username, password=user2_password
)
self.assertTrue(login_success, "Second user login should succeed")
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
@@ -199,7 +210,10 @@ class BruteForceProtectionTestCase(TestCase):
def test_brute_force_protection_user_agent_tracking(self):
"""Test that user agent is properly tracked"""
self.client.login(username="testuser", password="testpass123")
login_success = self.client.login(
username=self.username, password=self.password
)
self.assertTrue(login_success, "Login should succeed")
# Make request with custom user agent
response = self.client.get(
@@ -214,7 +228,10 @@ class BruteForceProtectionTestCase(TestCase):
def test_brute_force_protection_custom_thresholds(self):
"""Test brute force protection with custom thresholds"""
self.client.login(username="testuser", password="testpass123")
login_success = self.client.login(
username=self.username, password=self.password
)
self.assertTrue(login_success, "Login should succeed")
# Create attempt with custom threshold
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2)
@@ -235,12 +252,26 @@ class BruteForceProtectionTestCase(TestCase):
attempt = BruteForceAttempt.objects.create(
user=self.user, attempt_count=10, first_attempt=old_time
)
# Override last_attempt to be old as well (since save() sets it to now)
attempt.last_attempt = old_time
attempt.save()
# Should not block with 5-minute window (outside window)
self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=5))
# Test with 5-minute window (should not block - outside window)
should_block_5min = attempt.should_block(max_attempts=5, time_window_minutes=5)
self.assertFalse(should_block_5min)
# Should block with 15-minute window (within window)
self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15))
# Create a fresh attempt for the 15-minute test to avoid side effects
attempt2 = BruteForceAttempt.objects.create(
user=self.user, attempt_count=10, first_attempt=old_time
)
attempt2.last_attempt = old_time
attempt2.save()
# Test with 15-minute window (should block - within window)
should_block_15min = attempt2.should_block(
max_attempts=5, time_window_minutes=15
)
self.assertTrue(should_block_15min)
def test_brute_force_protection_explicit_blocking(self):
"""Test explicit blocking of brute force attempts"""
@@ -279,15 +310,16 @@ class BruteForceProtectionTestCase(TestCase):
def test_brute_force_protection_unique_constraints(self):
"""Test unique constraints for brute force attempts"""
test_username = random_string()
# Create first attempt
BruteForceAttempt.objects.create(
user=self.user, username="testuser", ip_address=self.ip_address
user=self.user, username=test_username, ip_address=self.ip_address
)
# Try to create duplicate (should fail)
with self.assertRaises(Exception): # IntegrityError
BruteForceAttempt.objects.create(
user=self.user, username="testuser", ip_address=self.ip_address
user=self.user, username=test_username, ip_address=self.ip_address
)
def test_brute_force_protection_mixed_authentication_states(self):
@@ -295,12 +327,15 @@ class BruteForceProtectionTestCase(TestCase):
# Start unauthenticated
for i in range(3):
response = self.client.get(
"/accounts/api/external/", {"username": "testuser"}
"/accounts/api/external/", {"username": self.username}
)
self.assertEqual(response.status_code, 401)
# Switch to authenticated
self.client.login(username="testuser", password="testpass123")
login_success = self.client.login(
username=self.username, password=self.password
)
self.assertTrue(login_success, "Login should succeed")
# Should still track by user now
response = self.client.get("/accounts/api/external/")

View File

@@ -15,6 +15,7 @@ django.setup()
from ivatar.ivataraccount.auth_models import AuthToken
from ivatar.ivataraccount.token_auth import TokenAuthenticationBackend
from ivatar.utils import random_string
class TokenAuthenticationBackendTestCase(TestCase):
@@ -24,8 +25,11 @@ class TokenAuthenticationBackendTestCase(TestCase):
def setUp(self):
"""Set up test data"""
self.username = random_string()
self.password = random_string()
self.email = f"{random_string()}@{random_string()}.com"
self.user = User.objects.create_user(
username="testuser", password="testpass123", email="test@example.com"
username=self.username, password=self.password, email=self.email
)
self.backend = TokenAuthenticationBackend()
self.request = HttpRequest()
@@ -107,8 +111,8 @@ class TokenAuthenticationBackendTestCase(TestCase):
user = self.backend.get_user(self.user.id)
self.assertEqual(user, self.user)
self.assertEqual(user.username, "testuser")
self.assertEqual(user.email, "test@example.com")
self.assertEqual(user.username, self.username)
self.assertEqual(user.email, self.email)
def test_get_user_with_invalid_id(self):
"""Test get_user with invalid user ID"""
@@ -174,8 +178,8 @@ class TokenAuthenticationBackendTestCase(TestCase):
short_token = "abc"
AuthToken.objects.create(token=short_token, user=self.user)
# Long token
long_token = "a" * 100
# Long token (within database limit)
long_token = "a" * 64
AuthToken.objects.create(token=long_token, user=self.user)
# Both should work