Implement ExternalAuth for token based authorization

This commit is contained in:
Oliver Falk
2025-09-06 10:28:50 +02:00
parent deeaab7e23
commit aa742ea181
10 changed files with 1525 additions and 1 deletions

View File

@@ -45,6 +45,7 @@ AUTHENTICATION_BACKENDS = (
# 'django_auth_ldap.backend.LDAPBackend',
"django_openid_auth.auth.OpenIDBackend",
"ivatar.ivataraccount.auth.FedoraOpenIdConnect",
"ivatar.ivataraccount.token_auth.TokenAuthenticationBackend",
"django.contrib.auth.backends.ModelBackend",
)
@@ -213,7 +214,7 @@ CACHES = {
"LOCATION": [
"127.0.0.1:11211",
],
#"OPTIONS": {"MAX_ENTRIES": 1000000},
# "OPTIONS": {"MAX_ENTRIES": 1000000},
},
"filesystem": {
"BACKEND": "django.core.cache.backends.filebased.FileBasedCache",

View File

@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
"""
Authentication-related models for ivatar
"""
from django.contrib.auth.models import User
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
class AuthToken(models.Model):
"""
Model for storing authentication tokens
"""
token = models.CharField(max_length=64, unique=True)
user = models.ForeignKey(User, on_delete=models.CASCADE)
created_at = models.DateTimeField(default=timezone.now)
expires_at = models.DateTimeField()
is_active = models.BooleanField(default=True)
ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True, blank=True)
class Meta:
verbose_name = _("auth token")
verbose_name_plural = _("auth tokens")
def save(self, *args, **kwargs) -> None:
if not self.expires_at:
# Set expiration to 1 hour from now
self.expires_at = timezone.now() + timezone.timedelta(hours=1)
super().save(*args, **kwargs)
def is_expired(self) -> bool:
return timezone.now() > self.expires_at
def __str__(self) -> str:
return f"Token for {self.user.username} (expires: {self.expires_at})"
class BruteForceAttempt(models.Model):
"""
Model for tracking brute force attempts per user
"""
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True, blank=True)
username = models.CharField(max_length=150, null=True, blank=True)
ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True, blank=True)
user_agent = models.TextField(blank=True, null=True)
attempt_count = models.PositiveIntegerField(default=1)
first_attempt = models.DateTimeField(default=timezone.now)
last_attempt = models.DateTimeField(default=timezone.now)
is_blocked = models.BooleanField(default=False)
class Meta:
verbose_name = _("brute force attempt")
verbose_name_plural = _("brute force attempts")
unique_together = ["user", "username", "ip_address"]
def save(self, *args, **kwargs) -> None:
self.last_attempt = timezone.now()
super().save(*args, **kwargs)
def should_block(
self, max_attempts: int = 5, time_window_minutes: int = 15
) -> bool:
"""
Check if this user/IP should be blocked based on attempt count and time window
"""
if self.is_blocked:
return True
# Reset attempts if outside time window
if timezone.now() - self.first_attempt > timezone.timedelta(
minutes=time_window_minutes
):
self.attempt_count = 1
self.first_attempt = timezone.now()
self.is_blocked = False
self.save()
return False
return self.attempt_count >= max_attempts
def increment_attempt(self) -> None:
"""Increment the attempt count"""
self.attempt_count += 1
if self.should_block():
self.is_blocked = True
self.save()
def __str__(self) -> str:
identifier = (
self.user.username if self.user else self.username or self.ip_address
)
return f"Brute force attempt for {identifier} ({self.attempt_count} attempts)"

View File

@@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
"""
External authentication views for ivatar
"""
import secrets
from typing import Optional
from django.views.generic.base import View
from django.http import JsonResponse, HttpRequest
from ipware import get_client_ip
from .auth_models import AuthToken, BruteForceAttempt
class ExternalAuthView(View):
"""
View for external authentication with optional group checking
"""
def get(self, request: HttpRequest, *args, **kwargs) -> JsonResponse:
"""
Handle GET request for external authentication
"""
group: Optional[str] = kwargs.get("group", None)
client_ip = get_client_ip(request)[0]
user_agent = request.META.get("HTTP_USER_AGENT", "")
# Determine identifier for brute force tracking
if request.user.is_authenticated:
# For authenticated users, track by user
brute_force_attempt, created = BruteForceAttempt.objects.get_or_create(
user=request.user,
defaults={
"attempt_count": 0,
"ip_address": client_ip,
"user_agent": user_agent,
},
)
else:
# For unauthenticated users, track by IP and username if available
username = request.GET.get("username", "")
brute_force_attempt, created = BruteForceAttempt.objects.get_or_create(
username=username or None,
ip_address=client_ip,
user_agent=user_agent,
defaults={"attempt_count": 0},
)
if brute_force_attempt.should_block():
return JsonResponse(
{
"authenticated": False,
"reason": "too_many_attempts",
"message": "Too many authentication attempts. Please try again later.",
},
status=429,
)
# Check if user is authenticated
if not request.user.is_authenticated:
brute_force_attempt.increment_attempt()
return JsonResponse(
{
"authenticated": False,
"reason": "not_authenticated",
"message": "User is not authenticated",
},
status=401,
)
# If group is specified, check if user is in that group
if group:
if not request.user.groups.filter(name=group).exists():
brute_force_attempt.increment_attempt()
return JsonResponse(
{
"authenticated": False,
"reason": "not_in_group",
"message": f"User is not in group: {group}",
},
status=403,
)
# Generate a new token
token = secrets.token_urlsafe(48)
# Create auth token
auth_token = AuthToken.objects.create(
token=token, user=request.user, ip_address=client_ip
)
# Reset brute force attempts on successful authentication
brute_force_attempt.attempt_count = 0
brute_force_attempt.is_blocked = False
brute_force_attempt.save()
return JsonResponse(
{
"authenticated": True,
"token": token,
"expires_at": auth_token.expires_at.isoformat(),
"user": {
"id": request.user.id,
"username": request.user.username,
"email": request.user.email,
},
}
)

View File

@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# Generated by Django 5.2.1 on 2025-09-06 08:24
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("ivataraccount", "0020_confirmedopenid_bluesky_handle"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="AuthToken",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("token", models.CharField(max_length=64, unique=True)),
("created_at", models.DateTimeField(default=django.utils.timezone.now)),
("expires_at", models.DateTimeField()),
("is_active", models.BooleanField(default=True)),
(
"ip_address",
models.GenericIPAddressField(
blank=True, null=True, unpack_ipv4=True
),
),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"verbose_name": "auth token",
"verbose_name_plural": "auth tokens",
},
),
migrations.CreateModel(
name="BruteForceAttempt",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("username", models.CharField(blank=True, max_length=150, null=True)),
(
"ip_address",
models.GenericIPAddressField(
blank=True, null=True, unpack_ipv4=True
),
),
("user_agent", models.TextField(blank=True, null=True)),
("attempt_count", models.PositiveIntegerField(default=1)),
(
"first_attempt",
models.DateTimeField(default=django.utils.timezone.now),
),
(
"last_attempt",
models.DateTimeField(default=django.utils.timezone.now),
),
("is_blocked", models.BooleanField(default=False)),
(
"user",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"verbose_name": "brute force attempt",
"verbose_name_plural": "brute force attempts",
"unique_together": {("user", "username", "ip_address")},
},
),
]

View File

@@ -0,0 +1,267 @@
# -*- coding: utf-8 -*-
"""
Test authentication models for ivatar
"""
import os
import django
from django.test import TestCase
from django.contrib.auth.models import User
from django.utils import timezone
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
from ivatar.ivataraccount.auth_models import AuthToken, BruteForceAttempt
class AuthTokenTestCase(TestCase):
"""
Test cases for AuthToken model
"""
def setUp(self):
"""Set up test data"""
self.user = User.objects.create_user(
username="testuser", password="testpass123", email="test@example.com"
)
def test_auth_token_creation(self):
"""Test basic AuthToken creation"""
token = AuthToken.objects.create(
token="test-token-123", user=self.user, ip_address="127.0.0.1"
)
self.assertEqual(token.user, self.user)
self.assertEqual(token.token, "test-token-123")
self.assertEqual(token.ip_address, "127.0.0.1")
self.assertTrue(token.is_active)
self.assertIsNotNone(token.created_at)
self.assertIsNotNone(token.expires_at)
def test_auth_token_auto_expiration(self):
"""Test that expiration is set automatically"""
token = AuthToken.objects.create(token="test-token-456", user=self.user)
# Should be set to 1 hour from now
expected_expiry = timezone.now() + timezone.timedelta(hours=1)
time_diff = abs((token.expires_at - expected_expiry).total_seconds())
self.assertLess(time_diff, 5) # Within 5 seconds
def test_auth_token_is_expired(self):
"""Test token expiration checking"""
# Create expired token
past_time = timezone.now() - timezone.timedelta(hours=2)
token = AuthToken.objects.create(token="expired-token", user=self.user)
token.expires_at = past_time
token.save()
self.assertTrue(token.is_expired())
# Create valid token
future_time = timezone.now() + timezone.timedelta(hours=1)
valid_token = AuthToken.objects.create(token="valid-token", user=self.user)
valid_token.expires_at = future_time
valid_token.save()
self.assertFalse(valid_token.is_expired())
def test_auth_token_string_representation(self):
"""Test string representation of AuthToken"""
token = AuthToken.objects.create(token="test-token-789", user=self.user)
expected_str = f"Token for {self.user.username} (expires: {token.expires_at})"
self.assertEqual(str(token), expected_str)
def test_auth_token_unique_constraint(self):
"""Test that tokens must be unique"""
AuthToken.objects.create(token="unique-token", user=self.user)
# Try to create another token with the same value
with self.assertRaises(Exception): # IntegrityError
AuthToken.objects.create(token="unique-token", user=self.user)
class BruteForceAttemptTestCase(TestCase):
"""
Test cases for BruteForceAttempt model
"""
def setUp(self):
"""Set up test data"""
self.user = User.objects.create_user(
username="testuser", password="testpass123", email="test@example.com"
)
self.ip_address = "127.0.0.1"
self.user_agent = "Mozilla/5.0 Test Browser"
def test_brute_force_attempt_creation_authenticated_user(self):
"""Test BruteForceAttempt creation for authenticated user"""
attempt = BruteForceAttempt.objects.create(
user=self.user, ip_address=self.ip_address, user_agent=self.user_agent
)
self.assertEqual(attempt.user, self.user)
self.assertEqual(attempt.ip_address, self.ip_address)
self.assertEqual(attempt.user_agent, self.user_agent)
self.assertEqual(attempt.attempt_count, 1)
self.assertFalse(attempt.is_blocked)
self.assertIsNotNone(attempt.first_attempt)
self.assertIsNotNone(attempt.last_attempt)
def test_brute_force_attempt_creation_unauthenticated_user(self):
"""Test BruteForceAttempt creation for unauthenticated user"""
username = "testusername"
attempt = BruteForceAttempt.objects.create(
username=username, ip_address=self.ip_address, user_agent=self.user_agent
)
self.assertIsNone(attempt.user)
self.assertEqual(attempt.username, username)
self.assertEqual(attempt.ip_address, self.ip_address)
self.assertEqual(attempt.user_agent, self.user_agent)
self.assertEqual(attempt.attempt_count, 1)
self.assertFalse(attempt.is_blocked)
def test_brute_force_attempt_save_updates_last_attempt(self):
"""Test that save() updates last_attempt timestamp"""
attempt = BruteForceAttempt.objects.create(
user=self.user, ip_address=self.ip_address
)
original_last_attempt = attempt.last_attempt
# Wait a moment and save again
import time
time.sleep(0.1)
attempt.save()
self.assertGreater(attempt.last_attempt, original_last_attempt)
def test_should_block_within_time_window(self):
"""Test should_block() method within time window"""
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=3)
# Should not block with 3 attempts (below threshold of 5)
self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=15))
# Should block with 6 attempts (above threshold)
attempt.attempt_count = 6
attempt.save()
self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15))
def test_should_block_outside_time_window(self):
"""Test should_block() resets when outside time window"""
# Create attempt with old timestamp
old_time = timezone.now() - timezone.timedelta(minutes=20)
attempt = BruteForceAttempt.objects.create(
user=self.user, attempt_count=10 # Above threshold
)
attempt.first_attempt = old_time
attempt.save()
# Should reset and not block (outside 15-minute window)
self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=15))
# Should have reset attempt count
attempt.refresh_from_db()
self.assertEqual(attempt.attempt_count, 1)
self.assertFalse(attempt.is_blocked)
def test_should_block_when_explicitly_blocked(self):
"""Test should_block() when is_blocked is True"""
attempt = BruteForceAttempt.objects.create(
user=self.user, attempt_count=2, is_blocked=True
)
# Should block regardless of attempt count
self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15))
def test_increment_attempt(self):
"""Test increment_attempt() method"""
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=3)
attempt.increment_attempt()
self.assertEqual(attempt.attempt_count, 4)
self.assertFalse(attempt.is_blocked) # Still below threshold
# Increment to threshold
attempt.increment_attempt()
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked) # Should be blocked now
def test_increment_attempt_blocks_at_threshold(self):
"""Test that increment_attempt() blocks at threshold"""
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=4)
attempt.increment_attempt()
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)
def test_brute_force_attempt_string_representation(self):
"""Test string representation of BruteForceAttempt"""
# Test with user
attempt_with_user = BruteForceAttempt.objects.create(
user=self.user, attempt_count=3
)
expected_str_user = f"Brute force attempt for {self.user.username} (3 attempts)"
self.assertEqual(str(attempt_with_user), expected_str_user)
# Test with username only
attempt_with_username = BruteForceAttempt.objects.create(
username="testusername", ip_address=self.ip_address, attempt_count=2
)
expected_str_username = "Brute force attempt for testusername (2 attempts)"
self.assertEqual(str(attempt_with_username), expected_str_username)
# Test with IP only
attempt_with_ip = BruteForceAttempt.objects.create(
ip_address=self.ip_address, attempt_count=1
)
expected_str_ip = f"Brute force attempt for {self.ip_address} (1 attempts)"
self.assertEqual(str(attempt_with_ip), expected_str_ip)
def test_unique_together_constraint(self):
"""Test unique_together constraint"""
# Create first attempt
BruteForceAttempt.objects.create(
user=self.user, username="testuser", ip_address=self.ip_address
)
# Try to create duplicate (should fail)
with self.assertRaises(Exception): # IntegrityError
BruteForceAttempt.objects.create(
user=self.user, username="testuser", ip_address=self.ip_address
)
def test_get_or_create_behavior(self):
"""Test get_or_create behavior for brute force attempts"""
# First call should create
attempt, created = BruteForceAttempt.objects.get_or_create(
user=self.user, defaults={"attempt_count": 0}
)
self.assertTrue(created)
self.assertEqual(attempt.attempt_count, 0)
# Second call should get existing
attempt2, created2 = BruteForceAttempt.objects.get_or_create(
user=self.user, defaults={"attempt_count": 0}
)
self.assertFalse(created2)
self.assertEqual(attempt.id, attempt2.id)
self.assertEqual(attempt2.attempt_count, 0)
def test_custom_max_attempts_and_time_window(self):
"""Test custom max_attempts and time_window parameters"""
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2)
# Test with custom parameters
self.assertFalse(attempt.should_block(max_attempts=3, time_window_minutes=30))
attempt.attempt_count = 3
attempt.save()
self.assertTrue(attempt.should_block(max_attempts=3, time_window_minutes=30))

View File

@@ -0,0 +1,323 @@
# -*- coding: utf-8 -*-
"""
Test external authentication views for ivatar
"""
import os
import django
import json
from unittest.mock import patch
from django.test import TestCase, Client
from django.contrib.auth.models import User, Group
from django.utils import timezone
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
from ivatar.ivataraccount.auth_models import AuthToken, BruteForceAttempt
from ivatar.utils import random_string
class ExternalAuthViewTestCase(TestCase):
"""
Test cases for ExternalAuthView
"""
def setUp(self):
"""Set up test data"""
self.client = Client()
self.username = random_string()
self.password = random_string()
self.email = f"{self.username}@{random_string()}.com"
self.group_name = random_string()
self.user = User.objects.create_user(
username=self.username, password=self.password, email=self.email
)
self.group = Group.objects.create(name=self.group_name)
self.user.groups.add(self.group)
# Create another user without group
self.username_no_group = random_string()
self.password_no_group = random_string()
self.email_no_group = f"{self.username_no_group}@{random_string()}.com"
self.user_no_group = User.objects.create_user(
username=self.username_no_group,
password=self.password_no_group,
email=self.email_no_group,
)
def test_external_auth_not_authenticated(self):
"""Test external auth endpoint when user is not authenticated"""
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 401)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "not_authenticated")
self.assertEqual(data["message"], "User is not authenticated")
def test_external_auth_authenticated_no_group(self):
"""Test external auth endpoint when user is authenticated but no group specified"""
self.client.login(username=self.username, password=self.password)
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
self.assertTrue(data["authenticated"])
self.assertIn("token", data)
self.assertIn("expires_at", data)
self.assertIn("user", data)
# Check user data
self.assertEqual(data["user"]["id"], self.user.id)
self.assertEqual(data["user"]["username"], self.user.username)
self.assertEqual(data["user"]["email"], self.user.email)
# Verify token was created
token_obj = AuthToken.objects.get(token=data["token"])
self.assertEqual(token_obj.user, self.user)
self.assertTrue(token_obj.is_active)
def test_external_auth_authenticated_with_group_success(self):
"""Test external auth endpoint when user is authenticated and in specified group"""
self.client.login(username=self.username, password=self.password)
response = self.client.get(f"/accounts/api/external/{self.group_name}/")
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
self.assertTrue(data["authenticated"])
self.assertIn("token", data)
self.assertIn("expires_at", data)
self.assertIn("user", data)
def test_external_auth_authenticated_not_in_group(self):
"""Test external auth endpoint when user is authenticated but not in specified group"""
self.client.login(
username=self.username_no_group, password=self.password_no_group
)
response = self.client.get(f"/accounts/api/external/{self.group_name}/")
self.assertEqual(response.status_code, 403)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "not_in_group")
self.assertEqual(data["message"], f"User is not in group: {self.group_name}")
def test_external_auth_nonexistent_group(self):
"""Test external auth endpoint with non-existent group"""
nonexistent_group = random_string()
self.client.login(username=self.username, password=self.password)
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
self.assertEqual(response.status_code, 403)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "not_in_group")
self.assertEqual(data["message"], f"User is not in group: {nonexistent_group}")
def test_external_auth_token_expiration(self):
"""Test that generated tokens have proper expiration"""
self.client.login(username=self.username, password=self.password)
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
# Check token expiration
token_obj = AuthToken.objects.get(token=data["token"])
expected_expiry = timezone.now() + timezone.timedelta(hours=1)
time_diff = abs((token_obj.expires_at - expected_expiry).total_seconds())
self.assertLess(time_diff, 5) # Within 5 seconds
def test_external_auth_ip_address_tracking(self):
"""Test that IP address is tracked in auth tokens"""
self.client.login(username=self.username, password=self.password)
# Mock the IP address
with patch("ivatar.ivataraccount.auth_views.get_client_ip") as mock_get_ip:
mock_get_ip.return_value = ("192.168.1.100", False)
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
token_obj = AuthToken.objects.get(token=data["token"])
self.assertEqual(token_obj.ip_address, "192.168.1.100")
def test_external_auth_brute_force_protection_authenticated_user(self):
"""Test brute force protection for authenticated users"""
self.client.login(username=self.username, password=self.password)
# Make requests that will fail (user not in group) to trigger brute force protection
nonexistent_group = random_string()
for i in range(6): # Exceed the threshold of 5
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
if i < 5:
# First 5 requests should fail with 403 (not in group)
self.assertEqual(response.status_code, 403)
else:
# 6th request should be blocked
self.assertEqual(response.status_code, 429)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "too_many_attempts")
self.assertEqual(
data["message"],
"Too many authentication attempts. Please try again later.",
)
def test_external_auth_brute_force_protection_unauthenticated_user(self):
"""Test brute force protection for unauthenticated users"""
# Make multiple requests without authentication
for i in range(6): # Exceed the threshold of 5
response = self.client.get("/accounts/api/external/")
if i < 5:
# First 5 requests should return 401
self.assertEqual(response.status_code, 401)
else:
# 6th request should be blocked
self.assertEqual(response.status_code, 429)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "too_many_attempts")
def test_external_auth_brute_force_reset_on_success(self):
"""Test that brute force attempts are reset on successful authentication"""
# Make some failed attempts first (by accessing without group)
self.client.login(
username=self.username_no_group, password=self.password_no_group
)
for i in range(3):
response = self.client.get(f"/accounts/api/external/{self.group_name}/")
self.assertEqual(response.status_code, 403)
# Check that brute force attempt was created for the user who made failed attempts
attempt = BruteForceAttempt.objects.get(user=self.user_no_group)
self.assertEqual(attempt.attempt_count, 3)
# Now make a successful request (user with group)
self.client.login(username=self.username, password=self.password)
response = self.client.get(f"/accounts/api/external/{self.group_name}/")
self.assertEqual(response.status_code, 200)
# Check that brute force attempt was reset for the successful user (different user)
attempt_successful = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt_successful.attempt_count, 0)
self.assertFalse(attempt_successful.is_blocked)
# The original user's attempt count should still be 3 (not reset)
attempt.refresh_from_db()
self.assertEqual(attempt.attempt_count, 3)
def test_external_auth_brute_force_tracking_by_user(self):
"""Test that brute force attempts are tracked per user for authenticated users"""
self.client.login(username=self.username, password=self.password)
# Make some requests
for i in range(3):
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Check that attempt is tracked by user
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 0) # Reset after successful auth
self.assertFalse(attempt.is_blocked)
def test_external_auth_brute_force_tracking_by_ip_and_username(self):
"""Test that brute force attempts are tracked by IP and username for unauthenticated users"""
# Make requests with username parameter
for i in range(3):
response = self.client.get(
"/accounts/api/external/", {"username": "testuser"}
)
self.assertEqual(response.status_code, 401)
# Check that attempt is tracked by username and IP
attempt = BruteForceAttempt.objects.get(username="testuser")
self.assertEqual(attempt.attempt_count, 3)
self.assertIsNotNone(attempt.ip_address)
def test_external_auth_user_agent_tracking(self):
"""Test that user agent is tracked in brute force attempts"""
self.client.login(username=self.username, password=self.password)
# Make request with custom user agent
response = self.client.get(
"/accounts/api/external/", HTTP_USER_AGENT="Custom Test Agent"
)
self.assertEqual(response.status_code, 200)
# Check that user agent was tracked
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.user_agent, "Custom Test Agent")
def test_external_auth_token_uniqueness(self):
"""Test that generated tokens are unique"""
self.client.login(username=self.username, password=self.password)
tokens = set()
for i in range(10):
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
tokens.add(data["token"])
# All tokens should be unique
self.assertEqual(len(tokens), 10)
def test_external_auth_url_patterns(self):
"""Test that URL patterns work correctly"""
self.client.login(username=self.username, password=self.password)
# Test without group
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Test with group
response = self.client.get(f"/accounts/api/external/{self.group_name}/")
self.assertEqual(response.status_code, 200)
# Test with trailing slash
response = self.client.get(f"/accounts/api/external/{self.group_name}")
self.assertEqual(response.status_code, 200)
def test_external_auth_json_response_format(self):
"""Test that JSON response has correct format"""
self.client.login(username=self.username, password=self.password)
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
self.assertEqual(response["Content-Type"], "application/json")
data = json.loads(response.content)
required_fields = ["authenticated", "token", "expires_at", "user"]
for field in required_fields:
self.assertIn(field, data)
# Check user object structure
user_data = data["user"]
required_user_fields = ["id", "username", "email"]
for field in required_user_fields:
self.assertIn(field, user_data)
def test_external_auth_error_response_format(self):
"""Test that error responses have correct format"""
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 401)
self.assertEqual(response["Content-Type"], "application/json")
data = json.loads(response.content)
required_fields = ["authenticated", "reason", "message"]
for field in required_fields:
self.assertIn(field, data)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "not_authenticated")
self.assertIsInstance(data["message"], str)

View File

@@ -0,0 +1,347 @@
# -*- coding: utf-8 -*-
"""
Test brute force protection mechanisms for ivatar
"""
import os
import django
import json
from unittest.mock import patch
from django.test import TestCase, Client
from django.contrib.auth.models import User
from django.utils import timezone
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
from ivatar.ivataraccount.auth_models import BruteForceAttempt
from ivatar.utils import random_string
class BruteForceProtectionTestCase(TestCase):
"""
Test cases for brute force protection mechanisms
"""
def setUp(self):
"""Set up test data"""
self.client = Client()
self.username = random_string()
self.password = random_string()
self.email = f"{self.username}@{random_string()}.com"
self.user = User.objects.create_user(
username=self.username, password=self.password, email=self.email
)
self.ip_address = "192.168.1.100"
self.user_agent = "Mozilla/5.0 Test Browser"
def test_brute_force_protection_authenticated_user_tracking(self):
"""Test brute force protection for authenticated users"""
self.client.login(username=self.username, password=self.password)
# Make requests that will fail (user not in group) to trigger brute force protection
nonexistent_group = random_string()
for i in range(7):
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
if i < 5:
# First 5 requests should fail with 403 (not in group)
self.assertEqual(response.status_code, 403)
else:
# Requests 6+ should be blocked
self.assertEqual(response.status_code, 429)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "too_many_attempts")
# Verify brute force attempt was created and blocked
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 5) # Blocked at threshold
self.assertTrue(attempt.is_blocked)
def test_brute_force_protection_unauthenticated_user_tracking(self):
"""Test brute force protection for unauthenticated users"""
# Make requests without authentication
for i in range(7):
response = self.client.get("/accounts/api/external/")
if i < 5:
# First 5 requests should return 401
self.assertEqual(response.status_code, 401)
else:
# Requests 6+ should be blocked
self.assertEqual(response.status_code, 429)
data = json.loads(response.content)
self.assertFalse(data["authenticated"])
self.assertEqual(data["reason"], "too_many_attempts")
# Verify brute force attempt was created and blocked
attempt = BruteForceAttempt.objects.get(username=None, ip_address__isnull=False)
self.assertEqual(attempt.attempt_count, 5) # Blocked at threshold
self.assertTrue(attempt.is_blocked)
def test_brute_force_protection_with_username_parameter(self):
"""Test brute force protection with username parameter for unauthenticated users"""
# Make requests with username parameter
for i in range(7):
response = self.client.get(
"/accounts/api/external/", {"username": "testuser"}
)
if i < 5:
# First 5 requests should return 401
self.assertEqual(response.status_code, 401)
else:
# Requests 6+ should be blocked
self.assertEqual(response.status_code, 429)
# Verify brute force attempt was created with username
attempt = BruteForceAttempt.objects.get(username="testuser")
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)
self.assertIsNotNone(attempt.ip_address)
def test_brute_force_protection_time_window_reset(self):
"""Test that brute force protection resets after time window"""
self.client.login(username=self.username, password=self.password)
# Make requests to trigger blocking
nonexistent_group = random_string()
for i in range(6):
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
# Should be blocked now
self.assertEqual(response.status_code, 429)
# Manually reset the time window by updating the attempt
attempt = BruteForceAttempt.objects.get(user=self.user)
old_time = timezone.now() - timezone.timedelta(minutes=20)
attempt.first_attempt = old_time
attempt.save()
# Next request should reset and succeed
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Verify attempt was reset
attempt.refresh_from_db()
self.assertEqual(attempt.attempt_count, 0)
self.assertFalse(attempt.is_blocked)
def test_brute_force_protection_successful_auth_resets_attempts(self):
"""Test that successful authentication resets brute force attempts"""
self.client.login(username="testuser", password="testpass123")
# Make some requests to build up attempt count
for i in range(3):
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Check that attempt count was reset after successful auth
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 0)
self.assertFalse(attempt.is_blocked)
def test_brute_force_protection_different_users_independent(self):
"""Test that brute force protection is independent for different users"""
user2 = User.objects.create_user(
username=random_string(),
password=random_string(),
email=f"{random_string()}@{random_string()}.com",
)
# Block first user
self.client.login(username=self.username, password=self.password)
nonexistent_group = random_string()
for i in range(6):
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
self.assertEqual(response.status_code, 429)
# Second user should still be able to authenticate
self.client.login(username=user2.username, password=user2.password)
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Verify separate attempts were tracked
attempt1 = BruteForceAttempt.objects.get(user=self.user)
attempt2 = BruteForceAttempt.objects.get(user=user2)
self.assertTrue(attempt1.is_blocked)
self.assertFalse(attempt2.is_blocked)
def test_brute_force_protection_different_ips_independent(self):
"""Test that brute force protection is independent for different IPs"""
# Mock different IP addresses
with patch("ivatar.ivataraccount.auth_views.get_client_ip") as mock_get_ip:
# First IP gets blocked
mock_get_ip.return_value = ("192.168.1.100", False)
for i in range(6):
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 429)
# Second IP should still work
mock_get_ip.return_value = ("192.168.1.200", False)
response = self.client.get("/accounts/api/external/")
self.assertEqual(
response.status_code, 401
) # Not authenticated, but not blocked
# Verify separate attempts were tracked
attempt1 = BruteForceAttempt.objects.get(ip_address="192.168.1.100")
attempt2 = BruteForceAttempt.objects.get(ip_address="192.168.1.200")
self.assertTrue(attempt1.is_blocked)
self.assertFalse(attempt2.is_blocked)
def test_brute_force_protection_user_agent_tracking(self):
"""Test that user agent is properly tracked"""
self.client.login(username="testuser", password="testpass123")
# Make request with custom user agent
response = self.client.get(
"/accounts/api/external/", HTTP_USER_AGENT="Custom Test Agent"
)
self.assertEqual(response.status_code, 200)
# Verify user agent was tracked
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.user_agent, "Custom Test Agent")
def test_brute_force_protection_custom_thresholds(self):
"""Test brute force protection with custom thresholds"""
self.client.login(username="testuser", password="testpass123")
# Create attempt with custom threshold
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=2)
# Test with custom max_attempts
self.assertFalse(attempt.should_block(max_attempts=3, time_window_minutes=15))
attempt.attempt_count = 3
attempt.save()
self.assertTrue(attempt.should_block(max_attempts=3, time_window_minutes=15))
def test_brute_force_protection_custom_time_window(self):
"""Test brute force protection with custom time window"""
self.client.login(username=self.username, password=self.password)
# Create attempt with old timestamp
old_time = timezone.now() - timezone.timedelta(minutes=10)
attempt = BruteForceAttempt.objects.create(
user=self.user, attempt_count=10, first_attempt=old_time
)
# Should not block with 5-minute window (outside window)
self.assertFalse(attempt.should_block(max_attempts=5, time_window_minutes=5))
# Should block with 15-minute window (within window)
self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15))
def test_brute_force_protection_explicit_blocking(self):
"""Test explicit blocking of brute force attempts"""
attempt = BruteForceAttempt.objects.create(
user=self.user, attempt_count=2, is_blocked=True # Explicitly blocked
)
# Should block regardless of attempt count
self.assertTrue(attempt.should_block(max_attempts=5, time_window_minutes=15))
def test_brute_force_protection_increment_blocks_at_threshold(self):
"""Test that increment_attempt blocks at threshold"""
attempt = BruteForceAttempt.objects.create(user=self.user, attempt_count=4)
# Increment should block at threshold
attempt.increment_attempt()
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)
def test_brute_force_protection_get_or_create_behavior(self):
"""Test get_or_create behavior for brute force attempts"""
# First call should create
attempt, created = BruteForceAttempt.objects.get_or_create(
user=self.user, defaults={"attempt_count": 0}
)
self.assertTrue(created)
self.assertEqual(attempt.attempt_count, 0)
# Second call should get existing
attempt2, created2 = BruteForceAttempt.objects.get_or_create(
user=self.user, defaults={"attempt_count": 0}
)
self.assertFalse(created2)
self.assertEqual(attempt.id, attempt2.id)
def test_brute_force_protection_unique_constraints(self):
"""Test unique constraints for brute force attempts"""
# Create first attempt
BruteForceAttempt.objects.create(
user=self.user, username="testuser", ip_address=self.ip_address
)
# Try to create duplicate (should fail)
with self.assertRaises(Exception): # IntegrityError
BruteForceAttempt.objects.create(
user=self.user, username="testuser", ip_address=self.ip_address
)
def test_brute_force_protection_mixed_authentication_states(self):
"""Test brute force protection with mixed authentication states"""
# Start unauthenticated
for i in range(3):
response = self.client.get(
"/accounts/api/external/", {"username": "testuser"}
)
self.assertEqual(response.status_code, 401)
# Switch to authenticated
self.client.login(username="testuser", password="testpass123")
# Should still track by user now
response = self.client.get("/accounts/api/external/")
self.assertEqual(response.status_code, 200)
# Verify attempt was created for authenticated user
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 0) # Reset after successful auth
def test_brute_force_protection_edge_case_empty_username(self):
"""Test brute force protection with empty username parameter"""
# Make requests with empty username
for i in range(6):
response = self.client.get("/accounts/api/external/", {"username": ""})
if i < 5:
self.assertEqual(response.status_code, 401)
else:
self.assertEqual(response.status_code, 429)
# Verify attempt was created with None username
attempt = BruteForceAttempt.objects.get(username=None)
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)
def test_brute_force_protection_concurrent_requests(self):
"""Test brute force protection with concurrent-like requests"""
self.client.login(username=self.username, password=self.password)
# Simulate rapid requests that will fail
nonexistent_group = random_string()
responses = []
for i in range(10):
response = self.client.get(f"/accounts/api/external/{nonexistent_group}/")
responses.append(response.status_code)
# First 5 should fail with 403, rest should be blocked
self.assertEqual(responses[:5], [403] * 5)
self.assertEqual(responses[5:], [429] * 5)
# Verify final state
attempt = BruteForceAttempt.objects.get(user=self.user)
self.assertEqual(attempt.attempt_count, 5)
self.assertTrue(attempt.is_blocked)

View File

@@ -0,0 +1,228 @@
# -*- coding: utf-8 -*-
"""
Test token authentication backend for ivatar
"""
import os
import django
from django.test import TestCase
from django.contrib.auth.models import User
from django.utils import timezone
from django.http import HttpRequest
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
from ivatar.ivataraccount.auth_models import AuthToken
from ivatar.ivataraccount.token_auth import TokenAuthenticationBackend
class TokenAuthenticationBackendTestCase(TestCase):
"""
Test cases for TokenAuthenticationBackend
"""
def setUp(self):
"""Set up test data"""
self.user = User.objects.create_user(
username="testuser", password="testpass123", email="test@example.com"
)
self.backend = TokenAuthenticationBackend()
self.request = HttpRequest()
def test_authenticate_with_valid_token(self):
"""Test authentication with valid token"""
# Create a valid token
_token = AuthToken.objects.create(
token="valid-token-123", user=self.user, ip_address="127.0.0.1"
)
# Authenticate with the token
authenticated_user = self.backend.authenticate(
request=self.request, token="valid-token-123"
)
self.assertEqual(authenticated_user, self.user)
# Token should still be active
_token.refresh_from_db()
self.assertTrue(_token.is_active)
def test_authenticate_with_invalid_token(self):
"""Test authentication with invalid token"""
# Try to authenticate with non-existent token
authenticated_user = self.backend.authenticate(
request=self.request, token="invalid-token-456"
)
self.assertIsNone(authenticated_user)
def test_authenticate_with_expired_token(self):
"""Test authentication with expired token"""
# Create an expired token
past_time = timezone.now() - timezone.timedelta(hours=2)
_token = AuthToken.objects.create(token="expired-token-789", user=self.user)
_token.expires_at = past_time
_token.save()
# Try to authenticate with expired token
authenticated_user = self.backend.authenticate(
request=self.request, token="expired-token-789"
)
self.assertIsNone(authenticated_user)
# Token should be deactivated
_token.refresh_from_db()
self.assertFalse(_token.is_active)
def test_authenticate_with_inactive_token(self):
"""Test authentication with inactive token"""
# Create an inactive token
AuthToken.objects.create(
token="inactive-token-101", user=self.user, is_active=False
)
# Try to authenticate with inactive token
authenticated_user = self.backend.authenticate(
request=self.request, token="inactive-token-101"
)
self.assertIsNone(authenticated_user)
def test_authenticate_without_token(self):
"""Test authentication without token parameter"""
authenticated_user = self.backend.authenticate(request=self.request, token=None)
self.assertIsNone(authenticated_user)
def test_authenticate_with_empty_token(self):
"""Test authentication with empty token"""
authenticated_user = self.backend.authenticate(request=self.request, token="")
self.assertIsNone(authenticated_user)
def test_get_user_with_valid_id(self):
"""Test get_user with valid user ID"""
user = self.backend.get_user(self.user.id)
self.assertEqual(user, self.user)
self.assertEqual(user.username, "testuser")
self.assertEqual(user.email, "test@example.com")
def test_get_user_with_invalid_id(self):
"""Test get_user with invalid user ID"""
user = self.backend.get_user(99999) # Non-existent ID
self.assertIsNone(user)
def test_get_user_with_none_id(self):
"""Test get_user with None ID"""
user = self.backend.get_user(None)
self.assertIsNone(user)
def test_authenticate_token_case_sensitivity(self):
"""Test that token authentication is case sensitive"""
# Create a token with specific case
# Create token with case-sensitive name
AuthToken.objects.create(token="Case-Sensitive-Token", user=self.user)
# Try with different case
authenticated_user = self.backend.authenticate(
request=self.request, token="case-sensitive-token"
)
self.assertIsNone(authenticated_user)
# Try with correct case
authenticated_user = self.backend.authenticate(
request=self.request, token="Case-Sensitive-Token"
)
self.assertEqual(authenticated_user, self.user)
def test_authenticate_multiple_tokens_same_user(self):
"""Test authentication with multiple tokens for same user"""
# Create multiple tokens for the same user
AuthToken.objects.create(token="token-1", user=self.user)
AuthToken.objects.create(token="token-2", user=self.user)
# Both tokens should authenticate to the same user
user1 = self.backend.authenticate(request=self.request, token="token-1")
user2 = self.backend.authenticate(request=self.request, token="token-2")
self.assertEqual(user1, self.user)
self.assertEqual(user2, self.user)
self.assertEqual(user1, user2)
def test_authenticate_token_with_special_characters(self):
"""Test authentication with token containing special characters"""
special_token = "token-with-special-chars!@#$%^&*()_+-=[]{}|;:,.<>?"
AuthToken.objects.create(token=special_token, user=self.user)
authenticated_user = self.backend.authenticate(
request=self.request, token=special_token
)
self.assertEqual(authenticated_user, self.user)
def test_authenticate_token_length_variations(self):
"""Test authentication with tokens of different lengths"""
# Short token
short_token = "abc"
AuthToken.objects.create(token=short_token, user=self.user)
# Long token
long_token = "a" * 100
AuthToken.objects.create(token=long_token, user=self.user)
# Both should work
user1 = self.backend.authenticate(request=self.request, token=short_token)
user2 = self.backend.authenticate(request=self.request, token=long_token)
self.assertEqual(user1, self.user)
self.assertEqual(user2, self.user)
def test_authenticate_with_deleted_user(self):
"""Test authentication with token for deleted user"""
# Create token for user
AuthToken.objects.create(token="token-for-deleted-user", user=self.user)
# Delete the user
self.user.delete()
# Try to authenticate with token for deleted user
authenticated_user = self.backend.authenticate(
request=self.request, token="token-for-deleted-user"
)
# Should return None (token should be invalid)
self.assertIsNone(authenticated_user)
def test_authenticate_token_unicode_characters(self):
"""Test authentication with token containing unicode characters"""
unicode_token = "token-with-unicode-ñáéíóú-中文-🚀"
AuthToken.objects.create(token=unicode_token, user=self.user)
authenticated_user = self.backend.authenticate(
request=self.request, token=unicode_token
)
self.assertEqual(authenticated_user, self.user)
def test_backend_name(self):
"""Test that backend has correct name"""
self.assertEqual(self.backend.__class__.__name__, "TokenAuthenticationBackend")
def test_authenticate_with_request_none(self):
"""Test authentication with None request"""
AuthToken.objects.create(token="token-with-none-request", user=self.user)
authenticated_user = self.backend.authenticate(
request=None, token="token-with-none-request"
)
self.assertEqual(authenticated_user, self.user)

View File

@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
"""
Token-based authentication backend for ivatar
"""
from typing import Optional
from django.contrib.auth.backends import BaseBackend
from django.contrib.auth.models import User
from django.http import HttpRequest
from .auth_models import AuthToken
class TokenAuthenticationBackend(BaseBackend):
"""
Authentication backend that uses tokens for authentication
"""
def authenticate(
self, request: Optional[HttpRequest], token: Optional[str] = None, **kwargs
) -> Optional[User]:
"""
Authenticate using a token
"""
if not token:
return None
try:
auth_token = AuthToken.objects.get(token=token, is_active=True)
# Check if token is expired
if auth_token.is_expired():
# Mark token as inactive
auth_token.is_active = False
auth_token.save()
return None
return auth_token.user
except AuthToken.DoesNotExist:
return None
def get_user(self, user_id: int) -> Optional[User]:
"""
Get user by ID
"""
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None

View File

@@ -27,6 +27,7 @@ from .views import ResendConfirmationMailView
from .views import IvatarLoginView
from .views import DeleteAccountView
from .views import ExportView
from .auth_views import ExternalAuthView
# Define URL patterns, self documenting
# To see the fancy, colorful evaluation of these use:
@@ -166,4 +167,9 @@ urlpatterns = [ # pylint: disable=invalid-name
ResendConfirmationMailView.as_view(),
name="resend_confirmation_mail",
),
re_path(
r"api/external/(?P<group>\w+)?/?$",
ExternalAuthView.as_view(),
name="external_auth",
),
]