Files
ivatar/ivatar/ivataraccount/models.py
2020-02-25 16:40:53 +01:00

663 lines
22 KiB
Python

'''
Our models for ivatar.ivataraccount
'''
import base64
import hashlib
import time
from io import BytesIO
from os import urandom
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
from urllib.parse import urlsplit, urlunsplit
from PIL import Image
from django.contrib.auth.models import User
from django.contrib import messages
from django.db import models
from django.utils import timezone
from django.http import HttpResponseRedirect
from django.urls import reverse_lazy, reverse
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ObjectDoesNotExist
from django.core.mail import send_mail
from django.template.loader import render_to_string
from openid.association import Association as OIDAssociation
from openid.store import nonce as oidnonce
from openid.store.interface import OpenIDStore
from simplecrypt import encrypt
from binascii import hexlify
from libravatar import libravatar_url
from ivatar.settings import MAX_LENGTH_EMAIL, logger
from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY
from ivatar.settings import MAX_LENGTH_URL
from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL
from ivatar.utils import openid_variations
from .gravatar import get_photo as get_gravatar_photo
from ivatar.utils import random_string
def file_format(image_type):
'''
Helper method returning a short image type
'''
if image_type == 'JPEG':
return 'jpg'
elif image_type == 'PNG':
return 'png'
elif image_type == 'GIF':
return 'gif'
return None
def random_api_key():
return random_string(32)
def pil_format(image_type):
'''
Helper method returning the 'encoder name' for PIL
'''
if image_type == 'jpg' or image_type == 'jpeg':
return 'JPEG'
elif image_type == 'png':
return 'PNG'
elif image_type == 'gif':
return 'GIF'
logger.info('Unsupported file format: %s', image_type)
return None
def get_key_from_args(public_key=None, secret_key=None):
encryption_key = None
if type(public_key) is str:
try:
key = APIKey.objects.get(public_key=public_key)
encryption_key = key.secret_key
except APIKey.DoesNotExist:
raise Exception(_('Key given, does not exist'))
if type(public_key) is APIKey:
encryption_key = public_key.secret_key
if type(secret_key) is str:
encryption_key = secret_key
if type(secret_key) is APIKey:
encryption_key = secret_key.secret_key
if not encryption_key:
raise Exception(_('No key given for encryption'))
return encryption_key
class APIKey(models.Model):
'''
Holds the users API keys (public and private/secret)
'''
public_key = models.CharField(max_length=32, default=random_api_key, unique=True, blank=False, null=False)
secret_key = models.CharField(max_length=32, default=random_api_key, unique=False, blank=False, null=False)
user = models.OneToOneField(
User,
on_delete=models.deletion.CASCADE,
primary_key=True,
)
class UserPreference(models.Model):
'''
Holds the user users preferences
'''
THEMES = (
('default', 'Default theme'),
('clime', 'climes theme'),
('green', 'green theme'),
('red', 'red theme'),
)
theme = models.CharField(
max_length=10,
choices=THEMES,
default='default',
)
user = models.OneToOneField(
User,
on_delete=models.deletion.CASCADE,
primary_key=True,
)
def __str__(self):
return 'Preference (%i) for %s' % (self.pk, self.user)
class BaseAccountModel(models.Model):
'''
Base, abstract model, holding fields we use in all cases
'''
user = models.ForeignKey(
User,
on_delete=models.deletion.CASCADE,
)
ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True)
add_date = models.DateTimeField(default=timezone.now)
class Meta: # pylint: disable=too-few-public-methods
'''
Class attributes
'''
abstract = True
class Photo(BaseAccountModel):
'''
Model holding the photos and information about them
'''
ip_address = models.GenericIPAddressField(unpack_ipv4=True)
data = models.BinaryField()
format = models.CharField(max_length=3)
access_count = models.BigIntegerField(default=0, editable=False)
class Meta: # pylint: disable=too-few-public-methods
'''
Class attributes
'''
verbose_name = _('photo')
verbose_name_plural = _('photos')
def import_image(self, service_name, email_address):
'''
Allow to import image from other (eg. Gravatar) service
'''
image_url = False
if service_name == 'Gravatar':
gravatar = get_gravatar_photo(email_address)
if gravatar:
image_url = gravatar['image_url']
if service_name == 'Libravatar':
image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE)
if not image_url:
return False # pragma: no cover
try:
image = urlopen(image_url)
# No idea how to test this
# pragma: no cover
except HTTPError as exc:
print('%s import failed with an HTTP error: %s' %
(service_name, exc.code))
return False
# No idea how to test this
# pragma: no cover
except URLError as exc:
print('%s import failed: %s' % (service_name, exc.reason))
return False
data = image.read()
try:
img = Image.open(BytesIO(data))
# How am I supposed to test this?
except ValueError: # pragma: no cover
return False # pragma: no cover
self.format = file_format(img.format)
if not self.format:
print('Unable to determine format: %s' % img) # pragma: no cover
return False # pragma: no cover
self.data = data
super().save()
return True
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
'''
Override save from parent, taking care about the image
'''
# Use PIL to read the file format
try:
img = Image.open(BytesIO(self.data))
# Testing? Ideas anyone?
except Exception as exc: # pylint: disable=broad-except
# For debugging only
print('Exception caught in Photo.save(): %s' % exc)
return False
self.format = file_format(img.format)
if not self.format:
print('Format not recognized')
return False
return super().save(force_insert, force_update, using, update_fields)
def perform_crop(self, request, dimensions, email, openid):
'''
Helper to crop the image
'''
if request.user.photo_set.count() == 1:
# This is the first photo, assign to all confirmed addresses
for addr in request.user.confirmedemail_set.all():
addr.photo = self
addr.save()
for addr in request.user.confirmedopenid_set.all():
addr.photo = self
addr.save()
if email:
# Explicitly asked
email.photo = self
email.save()
if openid:
# Explicitly asked
openid.photo = self
openid.save()
# Do the real work cropping
img = Image.open(BytesIO(self.data))
# This should be anyway checked during save...
dimensions['a'], \
dimensions['b'] = img.size # pylint: disable=invalid-name
if dimensions['a'] > MAX_PIXELS or dimensions['b'] > MAX_PIXELS:
messages.error(
request,
_('Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s' % {
max_pixels: MAX_PIXELS,
}))
return HttpResponseRedirect(reverse_lazy('profile'))
if dimensions['w'] == 0 and dimensions['h'] == 0:
dimensions['w'], dimensions['h'] = dimensions['a'], dimensions['b']
min_from_w_h = min(dimensions['w'], dimensions['h'])
dimensions['w'], dimensions['h'] = min_from_w_h, min_from_w_h
elif ((dimensions['w'] < 0)
or ((dimensions['x'] + dimensions['w']) > dimensions['a'])
or (dimensions['h'] < 0)
or ((dimensions['y'] + dimensions['h']) > dimensions['b'])):
messages.error(
request,
_('Crop outside of original image bounding box'))
return HttpResponseRedirect(reverse_lazy('profile'))
cropped = img.crop((
dimensions['x'],
dimensions['y'],
dimensions['x'] + dimensions['w'],
dimensions['y'] + dimensions['h']))
# cropped.load()
# Resize the image only if it's larger than the specified max width.
cropped_w, cropped_h = cropped.size
max_w = AVATAR_MAX_SIZE
if cropped_w > max_w or cropped_h > max_w:
cropped = cropped.resize((max_w, max_w), Image.ANTIALIAS)
data = BytesIO()
cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY)
data.seek(0)
# Overwrite the existing image
self.data = data.read()
self.save()
return HttpResponseRedirect(reverse_lazy('profile'))
def __str__(self):
return '%s (%i) from %s' % (self.format, self.pk or 0, self.user)
# pylint: disable=too-few-public-methods
class ConfirmedEmailManager(models.Manager):
'''
Manager for our confirmed email addresses model
'''
@staticmethod
def create_confirmed_email(user, email_address, is_logged_in):
'''
Helper method to create confirmed email address
'''
confirmed = ConfirmedEmail()
confirmed.user = user
confirmed.ip_address = '0.0.0.0'
confirmed.email = email_address
confirmed.save()
external_photos = []
if is_logged_in:
gravatar = get_gravatar_photo(confirmed.email)
if gravatar:
external_photos.append(gravatar)
return (confirmed.pk, external_photos)
class ConfirmedEmail(BaseAccountModel):
'''
Model holding our confirmed email addresses, as well as the relation
to the assigned photo
'''
email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL)
photo = models.ForeignKey(
Photo,
related_name='emails',
blank=True,
null=True,
on_delete=models.deletion.SET_NULL,
)
digest = models.CharField(max_length=32)
digest_sha256 = models.CharField(max_length=64)
objects = ConfirmedEmailManager()
access_count = models.BigIntegerField(default=0, editable=False)
class Meta: # pylint: disable=too-few-public-methods
'''
Class attributes
'''
verbose_name = _('confirmed email')
verbose_name_plural = _('confirmed emails')
def set_photo(self, photo):
'''
Helper method to set photo
'''
self.photo = photo
self.save()
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
'''
Override save from parent, add digest
'''
self.digest = hashlib.md5(
self.email.strip().lower().encode('utf-8')
).hexdigest()
self.digest_sha256 = hashlib.sha256(
self.email.strip().lower().encode('utf-8')
).hexdigest()
return super().save(force_insert, force_update, using, update_fields)
def __str__(self):
return '%s (%i) from %s' % (self.email, self.pk, self.user)
def encrypted_digest_sha256(self, public_key=None, secret_key=None):
encryption_key = get_key_from_args(public_key=public_key, secret_key=secret_key)
return hexlify(encrypt(encryption_key, self.digest_sha256))
def encrypted_digest(self, public_key=None, secret_key=None):
encryption_key = get_key_from_args(public_key=public_key, secret_key=secret_key)
return hexlify(encrypt(encryption_key, self.digest))
class UnconfirmedEmail(BaseAccountModel):
'''
Model holding unconfirmed email addresses as well as the verification key
'''
email = models.EmailField(max_length=MAX_LENGTH_EMAIL)
verification_key = models.CharField(max_length=64)
class Meta: # pylint: disable=too-few-public-methods
'''
Class attributes
'''
verbose_name = _('unconfirmed email')
verbose_name_plural = _('unconfirmed emails')
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
hash_object = hashlib.new('sha256')
hash_object.update(
urandom(1024) + self.user.username.encode('utf-8') # pylint: disable=no-member
) # pylint: disable=no-member
self.verification_key = hash_object.hexdigest()
super(UnconfirmedEmail, self).save(
force_insert,
force_update,
using,
update_fields)
def send_confirmation_mail(self, url=SECURE_BASE_URL):
'''
Send confirmation mail to that mail address
'''
link = url + \
reverse(
'confirm_email',
kwargs={'verification_key': self.verification_key})
email_subject = _('Confirm your email address on %s') % \
SITE_NAME
email_body = render_to_string('email_confirmation.txt', {
'verification_link': link,
'site_name': SITE_NAME,
})
# if settings.DEBUG:
# print('DEBUG: %s' % link)
send_mail(
email_subject, email_body, DEFAULT_FROM_EMAIL,
[self.email])
return True
def __str__(self):
return '%s (%i) from %s' % (self.email, self.pk, self.user)
class UnconfirmedOpenId(BaseAccountModel):
'''
Model holding unconfirmed OpenIDs
'''
openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL)
class Meta: # pylint: disable=too-few-public-methods
'''
Meta class
'''
verbose_name = _('unconfirmed OpenID')
verbose_name_plural = ('unconfirmed_OpenIDs')
def __str__(self):
return '%s (%i) from %s' % (self.openid, self.pk, self.user)
class ConfirmedOpenId(BaseAccountModel):
'''
Model holding confirmed OpenIDs, as well as the relation to
the assigned photo
'''
openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL)
photo = models.ForeignKey(
Photo,
related_name='openids',
blank=True,
null=True,
on_delete=models.deletion.SET_NULL,
)
# http://<id>/ base version - http w/ trailing slash
digest = models.CharField(max_length=64)
# http://<id> - http w/o trailing slash
alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None)
# https://<id>/ - https w/ trailing slash
alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None)
# https://<id> - https w/o trailing slash
alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None)
access_count = models.BigIntegerField(default=0, editable=False)
class Meta: # pylint: disable=too-few-public-methods
'''
Meta class
'''
verbose_name = _('confirmed OpenID')
verbose_name_plural = _('confirmed OpenIDs')
def set_photo(self, photo):
'''
Helper method to save photo
'''
self.photo = photo
self.save()
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
url = urlsplit(self.openid)
if url.username: # pragma: no cover
password = url.password or ''
netloc = url.username + ':' + password + '@' + url.hostname
else:
netloc = url.hostname
lowercase_url = urlunsplit(
(url.scheme.lower(), netloc, url.path, url.query, url.fragment)
)
self.openid = lowercase_url
self.digest = hashlib.sha256(openid_variations(lowercase_url)[0].encode('utf-8')).hexdigest()
self.alt_digest1 = hashlib.sha256(openid_variations(lowercase_url)[1].encode('utf-8')).hexdigest()
self.alt_digest2 = hashlib.sha256(openid_variations(lowercase_url)[2].encode('utf-8')).hexdigest()
self.alt_digest3 = hashlib.sha256(openid_variations(lowercase_url)[3].encode('utf-8')).hexdigest()
return super().save(force_insert, force_update, using, update_fields)
def __str__(self):
return '%s (%i) (%s)' % (self.openid, self.pk, self.user)
class OpenIDNonce(models.Model):
'''
Model holding OpenID Nonces
See also: https://github.com/edx/django-openid-auth/
'''
server_url = models.CharField(max_length=255)
timestamp = models.IntegerField()
salt = models.CharField(max_length=128)
def __str__(self):
return '%s (%i) (timestamp: %i)' % (
self.server_url,
self.pk,
self.timestamp)
class OpenIDAssociation(models.Model):
'''
Model holding the relation/association about OpenIDs
'''
server_url = models.TextField(max_length=2047)
handle = models.CharField(max_length=255)
secret = models.TextField(max_length=255) # stored base64 encoded
issued = models.IntegerField()
lifetime = models.IntegerField()
assoc_type = models.TextField(max_length=64)
def __str__(self):
return '%s (%i) (%s, lifetime: %i)' % (
self.server_url,
self.pk,
self.assoc_type,
self.lifetime)
class DjangoOpenIDStore(OpenIDStore):
'''
The Python openid library needs an OpenIDStore subclass to persist data
related to OpenID authentications. This one uses our Django models.
'''
@staticmethod
def storeAssociation(server_url, association): # pragma: no cover
'''
Helper method to store associations
'''
assoc = OpenIDAssociation(
server_url=server_url,
handle=association.handle,
secret=base64.encodebytes(association.secret),
issued=association.issued,
lifetime=association.issued,
assoc_type=association.assoc_type)
assoc.save()
def getAssociation(self, server_url, handle=None): # pragma: no cover
'''
Helper method to get associations
'''
assocs = []
if handle is not None:
assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
server_url=server_url,
handle=handle)
else:
assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
server_url=server_url)
if not assocs:
return None
associations = []
for assoc in assocs:
if isinstance(assoc.secret, str):
assoc.secret = assoc.secret.split("b'")[1].split("'")[0]
assoc.secret = bytes(assoc.secret, 'utf-8')
association = OIDAssociation(assoc.handle,
base64.decodebytes(assoc.secret),
assoc.issued, assoc.lifetime,
assoc.assoc_type)
expires = 0
try:
# pylint: disable=no-member
expires = association.getExpiresIn()
except AttributeError:
expires = association.expiresIn
if expires == 0:
self.removeAssociation(server_url, assoc.handle)
else:
associations.append((association.issued, association))
if not associations:
return None
return associations[-1][1]
@staticmethod
def removeAssociation(server_url, handle): # pragma: no cover
'''
Helper method to remove associations
'''
assocs = list(
OpenIDAssociation.objects.filter( # pylint: disable=no-member
server_url=server_url, handle=handle))
assocs_exist = len(assocs) > 0
for assoc in assocs:
assoc.delete()
return assocs_exist
@staticmethod
def useNonce(server_url, timestamp, salt): # pragma: no cover
'''
Helper method to 'use' nonces
'''
# Has nonce expired?
if abs(timestamp - time.time()) > oidnonce.SKEW:
return False
try:
nonce = OpenIDNonce.objects.get( # pylint: disable=no-member
server_url__exact=server_url,
timestamp__exact=timestamp,
salt__exact=salt)
except ObjectDoesNotExist:
nonce = OpenIDNonce.objects.create( # pylint: disable=no-member
server_url=server_url, timestamp=timestamp, salt=salt)
return True
nonce.delete()
return False
@staticmethod
def cleanupNonces(): # pragma: no cover
'''
Helper method to cleanup nonces
'''
timestamp = int(time.time()) - oidnonce.SKEW
# pylint: disable=no-member
OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete()
@staticmethod
def cleanupAssociations(): # pragma: no cover
'''
Helper method to cleanup associations
'''
OpenIDAssociation.objects.extra( # pylint: disable=no-member
where=['issued + lifetimeint < (%s)' % time.time()]).delete()