Files
ivatar/ivatar/ivataraccount/test_brute_force.py

383 lines
15 KiB
Python

# -*- coding: utf-8 -*-
"""
Test brute force protection mechanisms for ivatar
"""
import os
import django
import json
from unittest.mock import patch
from django.test import TestCase, Client
from django.contrib.auth.models import User
from django.utils import timezone
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
from ivatar.ivataraccount.auth_models import BruteForceAttempt
from ivatar.utils import random_string
class BruteForceProtectionTestCase(TestCase):
"""
Test cases for brute force protection mechanisms
"""
def setUp(self):
"""Set up test data"""
self.client = Client()
self.username = random_string()
self.password = random_string()
self.email = f"{self.username}@{random_string()}.com"
self.user = User.objects.create_user(
username=self.username, password=self.password, email=self.email
)
self.ip_address = "192.168.1.100"
self.user_agent = "Mozilla/5.0 Test Browser"
def test_brute_force_protection_authenticated_user_tracking(self):
"""Test brute force protection for authenticated users"""
self.client.login(username=self.username, password=self.password)
# Make requests that will fail (user not in group) to trigger brute force protection
nonexistent_group = random_string()
for i in range(7):
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
if i < 5:
# First 5 requests should fail with 403 (not in group)
self.assertEqual(response.status_code, 403)
else:
# Requests 6+ should be blocked
self.assertEqual(response.status_code, 429)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "too_many_attempts")
# Verify brute force attempt was created and blocked
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 5) # Blocked at threshold
self.assertTrue(attempt.is_blocked)
def test_brute_force_protection_unauthenticated_user_tracking(self):
"""Test brute force protection for unauthenticated users"""
# Make requests without authentication
for i in range(7):
response = self.client.get("/accounts/api/external/")
if i < 5:
# First 5 requests should return 401
self.assertEqual(response.status_code, 401)
else:
# Requests 6+ should be blocked
self.assertEqual(response.status_code, 429)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "too_many_attempts")
# Verify brute force attempt was created and blocked
attempt = BruteForceAttempt.objects.get(username=None, ip_address__isnull=False)
self.assertEqual(attempt.attempt_count, 5) # Blocked at threshold
self.assertTrue(attempt.is_blocked)
def test_brute_force_protection_with_username_parameter(self):
"""Test brute force protection with username parameter for unauthenticated users"""
test_username = random_string()
# Make requests with username parameter
for i in range(7):
response = self.client.get(
"/accounts/api/external/", {"username": test_username}
)
if i < 5:
# First 5 requests should return 401
self.assertEqual(response.status_code, 401)
else:
# Requests 6+ should be blocked
self.assertEqual(response.status_code, 429)
# Verify brute force attempt was created with username
attempt = BruteForceAttempt.objects.get(username=test_username)
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)
self.assertIsNotNone(attempt.ip_address)
def test_brute_force_protection_time_window_reset(self):
"""Test that brute force protection resets after time window"""
self.client.login(username=self.username, password=self.password)
# Make requests to trigger blocking
nonexistent_group = random_string()
for i in range(6):
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
# Should be blocked now
self.assertEqual(response.status_code, 429)
# Manually reset the time window by updating the attempt
attempt = BruteForceAttempt.objects.get(user=self.user)
old_time = timezone.now() - timezone.timedelta(minutes=20)
attempt.first_attempt = old_time
attempt.last_attempt = old_time # Also set last_attempt to old time
attempt.save()
# Next request should reset and succeed
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Verify attempt was reset
attempt.refresh_from_db()
self.assertEqual(attempt.attempt_count, 0)
self.assertFalse(attempt.is_blocked)
def test_brute_force_protection_successful_auth_resets_attempts(self):
"""Test that successful authentication resets brute force attempts"""
# Login with the correct credentials from setUp
login_success = self.client.login(
username=self.username, password=self.password
)
self.assertTrue(login_success, "Login should succeed")
# Make some requests to build up attempt count
for i in range(3):
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Check that attempt count was reset after successful auth
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 0)
self.assertFalse(attempt.is_blocked)
def test_brute_force_protection_different_users_independent(self):
"""Test that brute force protection is independent for different users"""
user2_password = random_string()
user2 = User.objects.create_user(
username=random_string(),
password=user2_password,
email=f"{random_string()}@{random_string()}.com",
)
# Block first user
self.client.login(username=self.username, password=self.password)
nonexistent_group = random_string()
for i in range(6):
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
self.assertEqual(response.status_code, 429)
# Second user should still be able to authenticate
login_success = self.client.login(
username=user2.username, password=user2_password
)
self.assertTrue(login_success, "Second user login should succeed")
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Verify separate attempts were tracked
attempt1 = BruteForceAttempt.objects.get(user=self.user)
attempt2 = BruteForceAttempt.objects.get(user=user2)
self.assertTrue(attempt1.is_blocked)
self.assertFalse(attempt2.is_blocked)
def test_brute_force_protection_different_ips_independent(self):
"""Test that brute force protection is independent for different IPs"""
# Mock different IP addresses
with patch("ivatar.ivataraccount.auth_views.get_client_ip") as mock_get_ip:
# First IP gets blocked
mock_get_ip.return_value = ("192.168.1.100", False)
for i in range(6):
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 429)
# Second IP should still work
mock_get_ip.return_value = ("192.168.1.200", False)
response = self.client.get("/accounts/api/external/")
self.assertEqual(
response.status_code, 401
) # Not authenticated, but not blocked
# Verify separate attempts were tracked
attempt1 = BruteForceAttempt.objects.get(ip_address="192.168.1.100")
attempt2 = BruteForceAttempt.objects.get(ip_address="192.168.1.200")
self.assertTrue(attempt1.is_blocked)
self.assertFalse(attempt2.is_blocked)
def test_brute_force_protection_user_agent_tracking(self):
"""Test that user agent is properly tracked"""
login_success = self.client.login(
username=self.username, password=self.password
)
self.assertTrue(login_success, "Login should succeed")
# Make request with custom user agent
response = self.client.get(
"/accounts/api/external/", HTTP_USER_AGENT="Custom Test Agent"
)
self.assertEqual(response.status_code, 200)
# Verify user agent was tracked
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.user_agent, "Custom Test Agent")
def test_brute_force_protection_custom_thresholds(self):
"""Test brute force protection with custom thresholds"""
login_success = self.client.login(
username=self.username, password=self.password
)
self.assertTrue(login_success, "Login should succeed")
# Create attempt with custom threshold
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2)
# Test with custom max_attempts
self.assertFalse(attempt.should_block(max_attempts=3, time_window_minutes=15))
attempt.attempt_count = 3
attempt.save()
self.assertTrue(attempt.should_block(max_attempts=3, time_window_minutes=15))
def test_brute_force_protection_custom_time_window(self):
"""Test brute force protection with custom time window"""
self.client.login(username=self.username, password=self.password)
# Create attempt with old timestamp
old_time = timezone.now() - timezone.timedelta(minutes=10)
attempt = BruteForceAttempt.objects.create(
user=self.user, attempt_count=10, first_attempt=old_time
)
# Override last_attempt to be old as well (since save() sets it to now)
attempt.last_attempt = old_time
attempt.save()
# Test with 5-minute window (should not block - outside window)
should_block_5min = attempt.should_block(max_attempts=5, time_window_minutes=5)
self.assertFalse(should_block_5min)
# Create a fresh attempt for the 15-minute test to avoid side effects
attempt2 = BruteForceAttempt.objects.create(
user=self.user, attempt_count=10, first_attempt=old_time
)
attempt2.last_attempt = old_time
attempt2.save()
# Test with 15-minute window (should block - within window)
should_block_15min = attempt2.should_block(
max_attempts=5, time_window_minutes=15
)
self.assertTrue(should_block_15min)
def test_brute_force_protection_explicit_blocking(self):
"""Test explicit blocking of brute force attempts"""
attempt = BruteForceAttempt.objects.create(
user=self.user, attempt_count=2, is_blocked=True # Explicitly blocked
)
# Should block regardless of attempt count
self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15))
def test_brute_force_protection_increment_blocks_at_threshold(self):
"""Test that increment_attempt blocks at threshold"""
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=4)
# Increment should block at threshold
attempt.increment_attempt()
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)
def test_brute_force_protection_get_or_create_behavior(self):
"""Test get_or_create behavior for brute force attempts"""
# First call should create
attempt, created = BruteForceAttempt.objects.get_or_create(
user=self.user, defaults={"attempt_count": 0}
)
self.assertTrue(created)
self.assertEqual(attempt.attempt_count, 0)
# Second call should get existing
attempt2, created2 = BruteForceAttempt.objects.get_or_create(
user=self.user, defaults={"attempt_count": 0}
)
self.assertFalse(created2)
self.assertEqual(attempt.id, attempt2.id)
def test_brute_force_protection_unique_constraints(self):
"""Test unique constraints for brute force attempts"""
test_username = random_string()
# Create first attempt
BruteForceAttempt.objects.create(
user=self.user, username=test_username, ip_address=self.ip_address
)
# Try to create duplicate (should fail)
with self.assertRaises(Exception): # IntegrityError
BruteForceAttempt.objects.create(
user=self.user, username=test_username, ip_address=self.ip_address
)
def test_brute_force_protection_mixed_authentication_states(self):
"""Test brute force protection with mixed authentication states"""
# Start unauthenticated
for i in range(3):
response = self.client.get(
"/accounts/api/external/", {"username": self.username}
)
self.assertEqual(response.status_code, 401)
# Switch to authenticated
login_success = self.client.login(
username=self.username, password=self.password
)
self.assertTrue(login_success, "Login should succeed")
# Should still track by user now
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Verify attempt was created for authenticated user
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 0) # Reset after successful auth
def test_brute_force_protection_edge_case_empty_username(self):
"""Test brute force protection with empty username parameter"""
# Make requests with empty username
for i in range(6):
response = self.client.get("/accounts/api/external/", {"username": ""})
if i < 5:
self.assertEqual(response.status_code, 401)
else:
self.assertEqual(response.status_code, 429)
# Verify attempt was created with None username
attempt = BruteForceAttempt.objects.get(username=None)
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)
def test_brute_force_protection_concurrent_requests(self):
"""Test brute force protection with concurrent-like requests"""
self.client.login(username=self.username, password=self.password)
# Simulate rapid requests that will fail
nonexistent_group = random_string()
responses = []
for i in range(10):
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
responses.append(response.status_code)
# First 5 should fail with 403, rest should be blocked
self.assertEqual(responses[:5], [403] * 5)
self.assertEqual(responses[5:], [429] * 5)
# Verify final state
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)