Reformat with black and remove debug print statement

This commit is contained in:
Oliver Falk
2022-02-11 13:15:39 +01:00
parent 0c3686beef
commit c93d0eb86c
4 changed files with 158 additions and 140 deletions

View File

@@ -102,5 +102,4 @@ class CheckForm(forms.Form):
def clean_mail(self):
data = self.cleaned_data["mail"]
print(data)
return data.lower()

View File

@@ -1,57 +1,48 @@
'''
# -*- coding: utf-8 -*-
"""
Test our views in ivatar.ivataraccount.views and ivatar.views
'''
"""
# pylint: disable=too-many-lines
from urllib.parse import urlsplit
from io import BytesIO
import io
import os
import django
from django.test import TestCase
from django.test import Client
from django.urls import reverse
from django.contrib.auth.models import User
from django.contrib.auth import authenticate
import hashlib
from libravatar import libravatar_url
from PIL import Image
os.environ['DJANGO_SETTINGS_MODULE'] = 'ivatar.settings'
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
# pylint: disable=wrong-import-position
from ivatar import settings
from ivatar.ivataraccount.forms import MAX_NUM_UNCONFIRMED_EMAILS_DEFAULT
from ivatar.ivataraccount.models import Photo, ConfirmedOpenId
from ivatar.utils import random_string
# pylint: enable=wrong-import-position
class Tester(TestCase): # pylint: disable=too-many-public-methods
'''
"""
Main test class
'''
"""
client = Client()
user = None
username = random_string()
password = random_string()
email = '%s@%s.%s' % (username, random_string(), random_string(2))
email = "%s@%s.%s" % (username, random_string(), random_string(2))
# Dunno why random tld doesn't work, but I'm too lazy now to investigate
openid = 'http://%s.%s.%s/' % (username, random_string(), 'org')
openid = "http://%s.%s.%s/" % (username, random_string(), "org")
def login(self):
'''
"""
Login as user
'''
"""
self.client.login(username=self.username, password=self.password)
def setUp(self):
'''
"""
Prepare for tests.
- Create user
'''
"""
self.user = User.objects.create_user(
username=self.username,
password=self.password,
@@ -61,12 +52,12 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
"""
Test check page
"""
response = self.client.get(reverse('tools_check'))
self.assertEqual(response.status_code, 200, 'no 200 ok?')
response = self.client.get(reverse("tools_check"))
self.assertEqual(response.status_code, 200, "no 200 ok?")
def test_check_domain(self):
"""
Test check domain page
"""
response = self.client.get(reverse('tools_check_domain'))
self.assertEqual(response.status_code, 200, 'no 200 ok?')
response = self.client.get(reverse("tools_check_domain"))
self.assertEqual(response.status_code, 200, "no 200 ok?")

View File

@@ -1,12 +1,13 @@
'''
# -*- coding: utf-8 -*-
"""
ivatar/tools URL configuration
'''
"""
from django.conf.urls import url
from .views import CheckView, CheckDomainView
urlpatterns = [ # pylint: disable=invalid-name
url('check/', CheckView.as_view(), name='tools_check'),
url('check_domain/', CheckDomainView.as_view(), name='tools_check_domain'),
url('check_domain$', CheckDomainView.as_view(), name='tools_check_domain'),
url("check/", CheckView.as_view(), name="tools_check"),
url("check_domain/", CheckDomainView.as_view(), name="tools_check_domain"),
url("check_domain$", CheckDomainView.as_view(), name="tools_check_domain"),
]

View File

@@ -1,6 +1,7 @@
'''
# -*- coding: utf-8 -*-
"""
View classes for ivatar/tools/
'''
"""
from socket import inet_ntop, AF_INET6
import hashlib
import random
@@ -16,49 +17,59 @@ from libravatar import SECURE_BASE_URL as LIBRAVATAR_SECURE_BASE_URL
from libravatar import BASE_URL as LIBRAVATAR_BASE_URL
from ivatar.settings import SECURE_BASE_URL, BASE_URL
from .forms import CheckDomainForm, CheckForm # pylint: disable=relative-beyond-top-level
from .forms import (
CheckDomainForm,
CheckForm,
) # pylint: disable=relative-beyond-top-level
class CheckDomainView(FormView):
'''
"""
View class for checking a domain
'''
template_name = 'check_domain.html'
"""
template_name = "check_domain.html"
form_class = CheckDomainForm
success_url = reverse('tools_check_domain')
success_url = reverse("tools_check_domain")
def form_valid(self, form):
result = {}
super().form_valid(form)
domain = form.cleaned_data['domain']
result['avatar_server_http'] = lookup_avatar_server(domain, False)
if result['avatar_server_http']:
result['avatar_server_http_ipv4'] = lookup_ip_address(
result['avatar_server_http'],
False)
result['avatar_server_http_ipv6'] = lookup_ip_address(
result['avatar_server_http'],
True)
result['avatar_server_https'] = lookup_avatar_server(domain, True)
if result['avatar_server_https']:
result['avatar_server_https_ipv4'] = lookup_ip_address(
result['avatar_server_https'],
False)
result['avatar_server_https_ipv6'] = lookup_ip_address(
result['avatar_server_https'],
True)
return render(self.request, self.template_name, {
'form': form,
'result': result,
})
domain = form.cleaned_data["domain"]
result["avatar_server_http"] = lookup_avatar_server(domain, False)
if result["avatar_server_http"]:
result["avatar_server_http_ipv4"] = lookup_ip_address(
result["avatar_server_http"], False
)
result["avatar_server_http_ipv6"] = lookup_ip_address(
result["avatar_server_http"], True
)
result["avatar_server_https"] = lookup_avatar_server(domain, True)
if result["avatar_server_https"]:
result["avatar_server_https_ipv4"] = lookup_ip_address(
result["avatar_server_https"], False
)
result["avatar_server_https_ipv6"] = lookup_ip_address(
result["avatar_server_https"], True
)
return render(
self.request,
self.template_name,
{
"form": form,
"result": result,
},
)
class CheckView(FormView):
'''
"""
View class for checking an e-mail or openid address
'''
template_name = 'check.html'
"""
template_name = "check.html"
form_class = CheckForm
success_url = reverse('tools_check')
success_url = reverse("tools_check")
def form_valid(self, form):
mailurl = None
@@ -73,82 +84,88 @@ class CheckView(FormView):
super().form_valid(form)
if form.cleaned_data['default_url']:
default_url = form.cleaned_data['default_url']
elif form.cleaned_data['default_opt'] and form.cleaned_data['default_opt'] != 'none':
default_url = form.cleaned_data['default_opt']
if form.cleaned_data["default_url"]:
default_url = form.cleaned_data["default_url"]
elif (
form.cleaned_data["default_opt"]
and form.cleaned_data["default_opt"] != "none"
):
default_url = form.cleaned_data["default_opt"]
else:
default_url = None
if 'size' in form.cleaned_data:
size = form.cleaned_data['size']
if form.cleaned_data['mail']:
if "size" in form.cleaned_data:
size = form.cleaned_data["size"]
if form.cleaned_data["mail"]:
mailurl = libravatar_url(
email=form.cleaned_data['mail'],
size=size,
default=default_url)
email=form.cleaned_data["mail"], size=size, default=default_url
)
mailurl = mailurl.replace(LIBRAVATAR_BASE_URL, BASE_URL)
mailurl_secure = libravatar_url(
email=form.cleaned_data['mail'],
email=form.cleaned_data["mail"],
size=size,
https=True,
default=default_url)
default=default_url,
)
mailurl_secure = mailurl_secure.replace(
LIBRAVATAR_SECURE_BASE_URL,
SECURE_BASE_URL)
LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL
)
mail_hash = parse_user_identity(
email=form.cleaned_data['mail'],
openid=None)[0]
hash_obj = hashlib.new('sha256')
hash_obj.update(form.cleaned_data['mail'].encode('utf-8'))
email=form.cleaned_data["mail"], openid=None
)[0]
hash_obj = hashlib.new("sha256")
hash_obj.update(form.cleaned_data["mail"].encode("utf-8"))
mail_hash256 = hash_obj.hexdigest()
mailurl_secure_256 = mailurl_secure.replace(
mail_hash,
mail_hash256)
if form.cleaned_data['openid']:
if not form.cleaned_data['openid'].startswith('http://') and \
not form.cleaned_data['openid'].startswith('https://'):
form.cleaned_data['openid'] = 'http://%s' % form.cleaned_data['openid']
mailurl_secure_256 = mailurl_secure.replace(mail_hash, mail_hash256)
if form.cleaned_data["openid"]:
if not form.cleaned_data["openid"].startswith(
"http://"
) and not form.cleaned_data["openid"].startswith("https://"):
form.cleaned_data["openid"] = "http://%s" % form.cleaned_data["openid"]
openidurl = libravatar_url(
openid=form.cleaned_data['openid'],
size=size,
default=default_url)
openid=form.cleaned_data["openid"], size=size, default=default_url
)
openidurl = openidurl.replace(LIBRAVATAR_BASE_URL, BASE_URL)
openidurl_secure = libravatar_url(
openid=form.cleaned_data['openid'],
openid=form.cleaned_data["openid"],
size=size,
https=True,
default=default_url)
default=default_url,
)
openidurl_secure = openidurl_secure.replace(
LIBRAVATAR_SECURE_BASE_URL,
SECURE_BASE_URL)
LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL
)
openid_hash = parse_user_identity(
openid=form.cleaned_data['openid'],
email=None)[0]
openid=form.cleaned_data["openid"], email=None
)[0]
return render(self.request, self.template_name, {
'form': form,
'mailurl': mailurl,
'openidurl': openidurl,
'mailurl_secure': mailurl_secure,
'mailurl_secure_256': mailurl_secure_256,
'openidurl_secure': openidurl_secure,
'mail_hash': mail_hash,
'mail_hash256': mail_hash256,
'openid_hash': openid_hash,
'size': size,
})
return render(
self.request,
self.template_name,
{
"form": form,
"mailurl": mailurl,
"openidurl": openidurl,
"mailurl_secure": mailurl_secure,
"mailurl_secure_256": mailurl_secure_256,
"openidurl_secure": openidurl_secure,
"mail_hash": mail_hash,
"mail_hash256": mail_hash256,
"openid_hash": openid_hash,
"size": size,
},
)
def lookup_avatar_server(domain, https):
'''
"""
Extract the avatar server from an SRV record in the DNS zone
The SRV records should look like this:
_avatars._tcp.example.com. IN SRV 0 0 80 avatars.example.com
_avatars-sec._tcp.example.com. IN SRV 0 0 443 avatars.example.com
'''
"""
if domain and len(domain) > 60:
domain = domain[:60]
@@ -161,27 +178,35 @@ def lookup_avatar_server(domain, https):
DNS.DiscoverNameServers()
try:
dns_request = DNS.Request(name=service_name, qtype='SRV').req()
dns_request = DNS.Request(name=service_name, qtype="SRV").req()
except DNS.DNSError as message:
print("DNS Error: %s (%s)" % (message, domain))
return None
if dns_request.header['status'] == 'NXDOMAIN':
if dns_request.header["status"] == "NXDOMAIN":
# Not an error, but no point in going any further
return None
if dns_request.header['status'] != 'NOERROR':
print("DNS Error: status=%s (%s)" % (dns_request.header['status'], domain))
if dns_request.header["status"] != "NOERROR":
print("DNS Error: status=%s (%s)" % (dns_request.header["status"], domain))
return None
records = []
for answer in dns_request.answers:
if ('data' not in answer) or (not answer['data']) or \
(not answer['typename']) or (answer['typename'] != 'SRV'):
if (
("data" not in answer)
or (not answer["data"])
or (not answer["typename"])
or (answer["typename"] != "SRV")
):
continue
record = {'priority': int(answer['data'][0]), 'weight': int(answer['data'][1]),
'port': int(answer['data'][2]), 'target': answer['data'][3]}
record = {
"priority": int(answer["data"][0]),
"weight": int(answer["data"][1]),
"port": int(answer["data"][2]),
"target": answer["data"][3],
}
records.append(record)
@@ -194,38 +219,38 @@ def lookup_avatar_server(domain, https):
def srv_hostname(records):
'''
"""
Return the right (target, port) pair from a list of SRV records.
'''
"""
if len(records) < 1:
return (None, None)
if len(records) == 1:
ret = records[0]
return (ret['target'], ret['port'])
return (ret["target"], ret["port"])
# Keep only the servers in the top priority
priority_records = []
total_weight = 0
top_priority = records[0]['priority'] # highest priority = lowest number
top_priority = records[0]["priority"] # highest priority = lowest number
for ret in records:
if ret['priority'] > top_priority:
if ret["priority"] > top_priority:
# ignore the record (ret has lower priority)
continue
# Take care - this if is only a if, if the above if
# uses continue at the end. else it should be an elsif
if ret['priority'] < top_priority:
if ret["priority"] < top_priority:
# reset the aretay (ret has higher priority)
top_priority = ret['priority']
top_priority = ret["priority"]
total_weight = 0
priority_records = []
total_weight += ret['weight']
total_weight += ret["weight"]
if ret['weight'] > 0:
if ret["weight"] > 0:
priority_records.append((total_weight, ret))
else:
# zero-weigth elements must come first
@@ -233,7 +258,7 @@ def srv_hostname(records):
if len(priority_records) == 1:
unused, ret = priority_records[0] # pylint: disable=unused-variable
return (ret['target'], ret['port'])
return (ret["target"], ret["port"])
# Select first record according to RFC2782 weight ordering algorithm (page 3)
random_number = random.randint(0, total_weight)
@@ -242,9 +267,9 @@ def srv_hostname(records):
weighted_index, ret = record
if weighted_index >= random_number:
return (ret['target'], ret['port'])
return (ret["target"], ret["port"])
print('There is something wrong with our SRV weight ordering algorithm')
print("There is something wrong with our SRV weight ordering algorithm")
return (None, None)
@@ -263,19 +288,21 @@ def lookup_ip_address(hostname, ipv6):
print("DNS Error: %s (%s)" % (message, hostname))
return None
if dns_request.header['status'] != 'NOERROR':
print("DNS Error: status=%s (%s)" % (dns_request.header['status'], hostname))
if dns_request.header["status"] != "NOERROR":
print("DNS Error: status=%s (%s)" % (dns_request.header["status"], hostname))
return None
for answer in dns_request.answers:
if ('data' not in answer) or (not answer['data']):
if ("data" not in answer) or (not answer["data"]):
continue
if (ipv6 and answer['typename'] != 'AAAA') or (not ipv6 and answer['typename'] != 'A'):
if (ipv6 and answer["typename"] != "AAAA") or (
not ipv6 and answer["typename"] != "A"
):
continue # skip CNAME records
if ipv6:
return inet_ntop(AF_INET6, answer['data'])
return inet_ntop(AF_INET6, answer["data"])
return answer['data']
return answer["data"]
return None