mirror of
https://git.linux-kernel.at/oliver/ivatar.git
synced 2025-11-19 14:38:02 +00:00
319 lines
10 KiB
Python
319 lines
10 KiB
Python
"""
|
|
Classes for our ivatar.ivataraccount.forms
|
|
"""
|
|
|
|
from urllib.parse import urlsplit, urlunsplit
|
|
|
|
from django import forms
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.core.exceptions import ValidationError
|
|
|
|
from ipware import get_client_ip
|
|
|
|
from ivatar import settings
|
|
from ivatar.settings import MIN_LENGTH_EMAIL, MAX_LENGTH_EMAIL
|
|
from ivatar.settings import MIN_LENGTH_URL, MAX_LENGTH_URL
|
|
from ivatar.settings import ENABLE_FILE_SECURITY_VALIDATION
|
|
from ivatar.file_security import validate_uploaded_file, FileUploadSecurityError
|
|
from .models import UnconfirmedEmail, ConfirmedEmail, Photo
|
|
from .models import UnconfirmedOpenId, ConfirmedOpenId
|
|
from .models import UserPreference
|
|
import logging
|
|
|
|
# Initialize logger
|
|
logger = logging.getLogger("ivatar.ivataraccount.forms")
|
|
|
|
|
|
MAX_NUM_UNCONFIRMED_EMAILS_DEFAULT = 5
|
|
|
|
|
|
class AddEmailForm(forms.Form):
|
|
"""
|
|
Form to handle adding email addresses
|
|
"""
|
|
|
|
email = forms.EmailField(
|
|
label=_("Email"),
|
|
min_length=MIN_LENGTH_EMAIL,
|
|
max_length=MAX_LENGTH_EMAIL,
|
|
)
|
|
|
|
def clean_email(self):
|
|
"""
|
|
Enforce lowercase email
|
|
"""
|
|
# TODO: Domain restriction as in libravatar?
|
|
return self.cleaned_data["email"].lower()
|
|
|
|
def save(self, request):
|
|
"""
|
|
Save the model, ensuring some safety
|
|
"""
|
|
user = request.user
|
|
# Enforce the maximum number of unconfirmed emails a user can have
|
|
num_unconfirmed = user.unconfirmedemail_set.count()
|
|
|
|
max_num_unconfirmed_emails = getattr(
|
|
settings, "MAX_NUM_UNCONFIRMED_EMAILS", MAX_NUM_UNCONFIRMED_EMAILS_DEFAULT
|
|
)
|
|
|
|
if num_unconfirmed >= max_num_unconfirmed_emails:
|
|
self.add_error(None, _("Too many unconfirmed mail addresses!"))
|
|
return False
|
|
|
|
# Check whether or not a confirmation email has been
|
|
# sent by this user already
|
|
if UnconfirmedEmail.objects.filter( # pylint: disable=no-member
|
|
user=user, email=self.cleaned_data["email"]
|
|
).exists():
|
|
self.add_error("email", _("Address already added, currently unconfirmed"))
|
|
return False
|
|
|
|
# Check whether or not the email is already confirmed (by someone)
|
|
check_mail = ConfirmedEmail.objects.filter(email=self.cleaned_data["email"])
|
|
if check_mail.exists():
|
|
msg = _("Address already confirmed (by someone else)")
|
|
if check_mail.first().user == request.user:
|
|
msg = _("Address already confirmed (by you)")
|
|
self.add_error("email", msg)
|
|
return False
|
|
|
|
unconfirmed = UnconfirmedEmail()
|
|
unconfirmed.email = self.cleaned_data["email"]
|
|
unconfirmed.user = user
|
|
unconfirmed.save()
|
|
unconfirmed.send_confirmation_mail(url=request.build_absolute_uri("/")[:-1])
|
|
return True
|
|
|
|
|
|
class UploadPhotoForm(forms.Form):
|
|
"""
|
|
Form handling photo upload with enhanced security validation
|
|
"""
|
|
|
|
photo = forms.FileField(
|
|
label=_("Photo"),
|
|
error_messages={"required": _("You must choose an image to upload.")},
|
|
)
|
|
not_porn = forms.BooleanField(
|
|
label=_("suitable for all ages (i.e. no offensive content)"),
|
|
required=True,
|
|
error_messages={
|
|
"required": _(
|
|
'We only host "G-rated" images and so this field must be checked.'
|
|
)
|
|
},
|
|
)
|
|
can_distribute = forms.BooleanField(
|
|
label=_("can be freely copied"),
|
|
required=True,
|
|
error_messages={
|
|
"required": _(
|
|
"This field must be checked since we need to be able to distribute photos to third parties."
|
|
)
|
|
},
|
|
)
|
|
|
|
def clean_photo(self):
|
|
"""
|
|
Enhanced photo validation with security checks
|
|
"""
|
|
photo = self.cleaned_data.get("photo")
|
|
|
|
if not photo:
|
|
raise ValidationError(_("No file provided"))
|
|
|
|
# Read file data
|
|
try:
|
|
# Handle different file types
|
|
if hasattr(photo, "read"):
|
|
file_data = photo.read()
|
|
elif hasattr(photo, "file"):
|
|
file_data = photo.file.read()
|
|
else:
|
|
file_data = bytes(photo)
|
|
filename = photo.name
|
|
except Exception as e:
|
|
logger.error(f"Error reading uploaded file: {e}")
|
|
raise ValidationError(_("Error reading uploaded file"))
|
|
|
|
# Perform comprehensive security validation (if enabled)
|
|
if ENABLE_FILE_SECURITY_VALIDATION:
|
|
try:
|
|
is_valid, validation_results, sanitized_data = validate_uploaded_file(
|
|
file_data, filename
|
|
)
|
|
|
|
if not is_valid:
|
|
# Log security violation
|
|
logger.warning(
|
|
f"File upload security violation: {validation_results['errors']}"
|
|
)
|
|
|
|
# Only reject truly malicious files at the form level
|
|
# Allow basic format issues to pass through to Photo.save() for original error handling
|
|
if validation_results.get("security_score", 100) < 30:
|
|
raise ValidationError(
|
|
_("File appears to be malicious and cannot be uploaded")
|
|
)
|
|
else:
|
|
# For format issues, don't raise ValidationError - let Photo.save() handle it
|
|
# This preserves the original error handling behavior
|
|
logger.info(
|
|
f"File format issue detected, allowing Photo.save() to handle: {validation_results['errors']}"
|
|
)
|
|
# Store the validation results for potential use, but don't reject the form
|
|
self.validation_results = validation_results
|
|
self.file_data = file_data
|
|
else:
|
|
# Store sanitized data for later use
|
|
self.sanitized_data = sanitized_data
|
|
self.validation_results = validation_results
|
|
# Store original file data for fallback
|
|
self.file_data = file_data
|
|
|
|
# Log successful validation
|
|
logger.info(
|
|
f"File upload validated successfully: {filename}, security_score: {validation_results.get('security_score', 100)}"
|
|
)
|
|
|
|
except FileUploadSecurityError as e:
|
|
logger.error(f"File upload security error: {e}")
|
|
raise ValidationError(_("File security validation failed"))
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during file validation: {e}")
|
|
raise ValidationError(_("File validation failed"))
|
|
else:
|
|
# Security validation disabled (e.g., in tests)
|
|
logger.debug(f"File upload security validation disabled for: {filename}")
|
|
self.file_data = file_data
|
|
|
|
return photo
|
|
|
|
def save(self, request, data):
|
|
"""
|
|
Save the model and assign it to the current user with enhanced security
|
|
"""
|
|
# Link this file to the user's profile
|
|
photo = Photo()
|
|
photo.user = request.user
|
|
photo.ip_address = get_client_ip(request)[0]
|
|
|
|
# Use sanitized data if available, otherwise use stored file data
|
|
if hasattr(self, "sanitized_data"):
|
|
photo.data = self.sanitized_data
|
|
elif hasattr(self, "file_data"):
|
|
photo.data = self.file_data
|
|
else:
|
|
# Fallback: try to read from the file object
|
|
try:
|
|
photo.data = data.read()
|
|
except Exception as e:
|
|
logger.error(f"Failed to read file data: {e}")
|
|
photo.data = b""
|
|
|
|
photo.save()
|
|
return photo if photo.pk else None
|
|
|
|
|
|
class AddOpenIDForm(forms.Form):
|
|
"""
|
|
Form to handle adding OpenID
|
|
"""
|
|
|
|
openid = forms.URLField(
|
|
label=_("OpenID"),
|
|
min_length=MIN_LENGTH_URL,
|
|
max_length=MAX_LENGTH_URL,
|
|
initial="http://",
|
|
)
|
|
|
|
def clean_openid(self):
|
|
"""
|
|
Enforce restrictions
|
|
"""
|
|
# Lowercase hostname port of the URL
|
|
url = urlsplit(self.cleaned_data["openid"])
|
|
return urlunsplit(
|
|
(
|
|
url.scheme.lower(),
|
|
url.netloc.lower(),
|
|
url.path,
|
|
url.query,
|
|
url.fragment,
|
|
)
|
|
)
|
|
|
|
def save(self, user):
|
|
"""
|
|
Save the model, ensuring some safety
|
|
"""
|
|
if ConfirmedOpenId.objects.filter( # pylint: disable=no-member
|
|
openid=self.cleaned_data["openid"]
|
|
).exists():
|
|
self.add_error("openid", _("OpenID already added and confirmed!"))
|
|
return False
|
|
|
|
if UnconfirmedOpenId.objects.filter( # pylint: disable=no-member
|
|
openid=self.cleaned_data["openid"]
|
|
).exists():
|
|
self.add_error("openid", _("OpenID already added, but not confirmed yet!"))
|
|
return False
|
|
|
|
unconfirmed = UnconfirmedOpenId()
|
|
unconfirmed.openid = self.cleaned_data["openid"]
|
|
unconfirmed.user = user
|
|
unconfirmed.save()
|
|
|
|
return unconfirmed.pk
|
|
|
|
|
|
class UpdatePreferenceForm(forms.ModelForm):
|
|
"""
|
|
Form for updating user preferences
|
|
"""
|
|
|
|
class Meta: # pylint: disable=too-few-public-methods
|
|
"""
|
|
Meta class for UpdatePreferenceForm
|
|
"""
|
|
|
|
model = UserPreference
|
|
fields = ["theme"]
|
|
|
|
|
|
class UploadLibravatarExportForm(forms.Form):
|
|
"""
|
|
Form handling libravatar user export upload
|
|
"""
|
|
|
|
export_file = forms.FileField(
|
|
label=_("Export file"),
|
|
error_messages={"required": _("You must choose an export file to upload.")},
|
|
)
|
|
not_porn = forms.BooleanField(
|
|
label=_("suitable for all ages (i.e. no offensive content)"),
|
|
required=True,
|
|
error_messages={
|
|
"required": _(
|
|
'We only host "G-rated" images and so this field must be checked.'
|
|
)
|
|
},
|
|
)
|
|
can_distribute = forms.BooleanField(
|
|
label=_("can be freely copied"),
|
|
required=True,
|
|
error_messages={
|
|
"required": _(
|
|
"This field must be checked since we need to be able to\
|
|
distribute photos to third parties."
|
|
)
|
|
},
|
|
)
|
|
|
|
|
|
class DeleteAccountForm(forms.Form):
|
|
password = forms.CharField(
|
|
label=_("Password"), required=False, widget=forms.PasswordInput()
|
|
)
|