Bluesky integration

* Centralize the our urlopen for consistency.
* Fix a few tests
This commit is contained in:
Oliver Falk
2025-02-07 11:34:24 +00:00
parent dc30267ff4
commit 3aaaac51f0
17 changed files with 920 additions and 91 deletions

View File

@@ -268,9 +268,7 @@ TRUSTED_DEFAULT_URLS = [
},
]
# This MUST BE THE LAST!
if os.path.isfile(os.path.join(BASE_DIR, "config_local.py")):
from config_local import * # noqa # flake8: noqa # NOQA # pragma: no cover
URL_TIMEOUT = 10
def map_legacy_config(trusted_url):
@@ -286,3 +284,11 @@ def map_legacy_config(trusted_url):
# Backward compability for legacy behavior
TRUSTED_DEFAULT_URLS = list(map(map_legacy_config, TRUSTED_DEFAULT_URLS))
# Bluesky settings
BLUESKY_IDENTIFIER = os.environ.get("BLUESKY_IDENTIFIER", None)
BLUESKY_APP_PASSWORD = os.environ.get("BLUESKY_APP_PASSWORD", None)
# This MUST BE THE LAST!
if os.path.isfile(os.path.join(BASE_DIR, "config_local.py")):
from config_local import * # noqa # flake8: noqa # NOQA # pragma: no cover

View File

@@ -3,13 +3,12 @@
Helper method to fetch Gravatar image
"""
from ssl import SSLError
from urllib.request import urlopen, HTTPError, URLError
from urllib.request import HTTPError, URLError
from ivatar.utils import urlopen
import hashlib
from ..settings import AVATAR_MAX_SIZE
URL_TIMEOUT = 5 # in seconds
def get_photo(email):
"""
@@ -30,7 +29,7 @@ def get_photo(email):
service_url = "http://www.gravatar.com/" + hash_object.hexdigest()
try:
urlopen(image_url, timeout=URL_TIMEOUT)
urlopen(image_url)
except HTTPError as exc:
if exc.code != 404 and exc.code != 503:
print( # pragma: no cover

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# Generated by Django 5.1.5 on 2025-01-27 10:54
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("ivataraccount", "0018_alter_photo_format"),
]
operations = [
migrations.AddField(
model_name="confirmedemail",
name="bluesky_handle",
field=models.CharField(blank=True, max_length=256, null=True),
),
]

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# Generated by Django 5.1.5 on 2025-01-27 13:33
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("ivataraccount", "0019_confirmedemail_bluesky_handle"),
]
operations = [
migrations.AddField(
model_name="confirmedopenid",
name="bluesky_handle",
field=models.CharField(blank=True, max_length=256, null=True),
),
]

View File

@@ -9,7 +9,7 @@ import time
from io import BytesIO
from os import urandom
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
from ivatar.utils import urlopen, Bluesky
from urllib.parse import urlsplit, urlunsplit
from PIL import Image
@@ -322,6 +322,8 @@ class ConfirmedEmail(BaseAccountModel):
null=True,
on_delete=models.deletion.SET_NULL,
)
# Alternative assignment - use Bluesky handle
bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
digest = models.CharField(max_length=32)
digest_sha256 = models.CharField(max_length=64)
objects = ConfirmedEmailManager()
@@ -342,6 +344,18 @@ class ConfirmedEmail(BaseAccountModel):
self.photo = photo
self.save()
def set_bluesky_handle(self, handle):
"""
Helper method to set Bluesky handle
"""
bs = Bluesky()
avatar = bs.get_profile(handle)
if not avatar:
raise Exception("Invalid Bluesky handle")
return
self.bluesky_handle = handle
self.save()
def save(
self, force_insert=False, force_update=False, using=None, update_fields=None
):
@@ -463,6 +477,8 @@ class ConfirmedOpenId(BaseAccountModel):
alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None)
# https://<id> - https w/o trailing slash
alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None)
# Alternative assignment - use Bluesky handle
bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
access_count = models.BigIntegerField(default=0, editable=False)
@@ -481,6 +497,18 @@ class ConfirmedOpenId(BaseAccountModel):
self.photo = photo
self.save()
def set_bluesky_handle(self, handle):
"""
Helper method to set Bluesky handle
"""
bs = Bluesky()
avatar = bs.get_profile(handle)
if not avatar:
raise Exception("Invalid Bluesky handle")
return
self.bluesky_handle = handle
self.save()
def save(
self, force_insert=False, force_update=False, using=None, update_fields=None
):

View File

@@ -30,39 +30,68 @@ outline: inherit;
<div class="row">
{% for photo in user.photo_set.all %}
<form action="{% url 'assign_photo_email' view.kwargs.email_id %}" method="post" style="float:left;margin-left:20px">{% csrf_token %}
<input type="hidden" name="photo_id" value="{{ photo.id }}">
<button type="submit" name="photo{{ photo.id }}" class="nobutton">
<div class="panel panel-tortin" style="width:132px;margin:0">
<div class="panel-heading">
<h3 class="panel-title">{% if email.photo.id == photo.id %}<i class="fa fa-check"></i>{% endif %} {% trans 'Image' %} {{ forloop.counter }}</h3>
</div>
<div class="panel-body" style="height:130px">
<center>
<input type="hidden" name="photo_id" value="{{ photo.id }}">
<button type="submit" name="photo{{ photo.id }}" class="nobutton">
<div class="panel panel-tortin" style="width:132px;margin:0">
<div class="panel-heading">
<h3 class="panel-title">{% if email.photo.id == photo.id %}<i class="fa fa-check"></i>{% endif %} {% trans 'Image' %} {{ forloop.counter }}</h3>
</div>
<div class="panel-body" style="height:130px">
<center>
<img style="max-height:100px;max-width:100px" src="{% url 'raw_image' photo.id %}">
</center>
</div>
</div>
</button>
</center>
</div>
</div>
</button>
</form>
{% endfor %}
<form action="{% url 'assign_photo_email' view.kwargs.email_id %}" method="post" style="float:left;margin-left:20px">{% csrf_token %}
<button type="submit" name="photoNone" class="nobutton">
<div class="panel panel-tortin" style="width:132px;margin:0">
<div class="panel-heading">
<h3 class="panel-title">{% if email.photo.id == photo.id %}<i class="fa fa-check"></i>{% endif %} {% trans 'No image' %}</h3>
</div>
<div class="panel-body" style="height:130px">
<center>
<img style="max-height:100px;max-width:100px" src="/static/img/nobody/100.png">
</center>
</div>
</div>
</button>
<button type="submit" name="photoNone" class="nobutton">
<div class="panel panel-tortin" style="width:132px;margin:0">
<div class="panel-heading">
<h3 class="panel-title">{% if email.photo.id == photo.id %}{% if not email.bluesky_handle %}<i class="fa fa-check"></i>{% endif %}{% endif %} {% trans 'No image' %}</h3>
</div>
<div class="panel-body" style="height:130px">
<center>
<img style="max-height:100px;max-width:100px" src="/static/img/nobody/100.png">
</center>
</div>
</div>
</button>
</form>
{% if email.bluesky_handle %}
<form action="" style="float:left;margin-left:20px">
<div class="panel panel-tortin" style="width:132px;margin:0">
<div class="panel-heading">
<h3 class="panel-title"><i class="fa fa-check"></i> {% trans "Bluesky" %}</h3>
</div>
<div class="panel-body" style="height:130px">
<center>
<img style="max-height:100px;max-width:100px" src="{% url "blueskyproxy" email.digest %}?size=100">
</center>
</div>
</div>
</form>
{% endif %}
</div>
<div style="height:8px"></div>
<a href="{% url 'upload_photo' %}" class="button">{% blocktrans %}Upload a new one{% endblocktrans %}</a>&nbsp;&nbsp;
<a href="{% url 'import_photo' email.pk %}" class="button">{% blocktrans %}Import from other services{% endblocktrans %}</a>
<a href="{% url 'upload_photo' %}" class="button">{% blocktrans %}Upload a new one{% endblocktrans %}</a>&nbsp;&nbsp;
<a href="{% url 'import_photo' email.pk %}" class="button">{% blocktrans %}Import from other services{% endblocktrans %}</a>
<div style="height:8px"></div>
<form action="{% url 'assign_bluesky_handle_to_email' view.kwargs.email_id %}" method="post" style="float:left;margin-left:20px">{% csrf_token %}
<div class="form-group">
<label for="id_bluesky_handle">{% trans "Bluesky handle" %}:</label>
{% if email.bluesky_handle %}
<input type="text" name="bluesky_handle" required value="{{ email.bluesky_handle }}" class="form-control" id="id_bluesky_handle">
{% else %}
<input type="text" name="bluesky_handle" required value="" placeholder="{% trans 'Bluesky handle' %}" class="form-control" id="id_bluesky_handle">
{% endif %}
</div>
<button type="submit" class="button">{% trans 'Assign Bluesky handle' %}</button>
</form>
{% endif %}
<div style="height:40px"></div>
{% endblock content %}

View File

@@ -63,6 +63,19 @@ outline: inherit;
<div style="height:8px"></div>
<a href="{% url 'upload_photo' %}" class="button">{% blocktrans %}upload a new one{% endblocktrans %}</a>
<div style="height:8px"></div>
<form action="{% url 'assign_bluesky_handle_to_openid' view.kwargs.openid_id %}" method="post" style="float:left;margin-left:20px">{% csrf_token %}
<div class="form-group">
<label for="id_bluesky_handle">{% trans "Bluesky handle" %}:</label>
{% if openid.bluesky_handle %}
<input type="text" name="bluesky_handle" required value="{{ openid.bluesky_handle }}" class="form-control" id="id_bluesky_handle">
{% else %}
<input type="text" name="bluesky_handle" required value="" placeholder="{% trans 'Bluesky handle' %}" class="form-control" id="id_bluesky_handle">
{% endif %}
</div>
<button type="submit" class="button">{% trans 'Assign Bluesky handle' %}</button>
</form>
{% endif %}
<div style="height:40px"></div>
{% endblock content %}

View File

@@ -101,7 +101,15 @@
<form action="{% url 'remove_confirmed_email' email.id %}" method="post">
{% csrf_token %}
<div id="email-conf-{{ forloop.counter }}" class="profile-container active">
<img title="{% trans 'Access count' %}: {{ email.access_count }}" src="{% if email.photo %}{% url 'raw_image' email.photo.id %}{% else %}{% static '/img/nobody/120.png' %}{% endif %}">
<img title="{% trans 'Access count' %}: {{ email.access_count }}"
src="
{% if email.photo %}
{% url 'raw_image' email.photo.id %}
{% elif email.bluesky_handle %}
{% url 'blueskyproxy' email.digest %}
{% else %}
{% static '/img/nobody/120.png' %}
{% endif %}">
<h3 class="panel-title email-profile" title="{{ email.email }}">
{{ email.email }}
</h3>
@@ -123,7 +131,15 @@
<form action="{% url 'remove_confirmed_email' email.id %}" method="post">
{% csrf_token %}
<div id="email-conf-{{ forloop.counter }}" class="profile-container" onclick="add_active('email-conf-{{ forloop.counter }}')">
<img title="{% trans 'Access count' %}: {{ email.access_count }}" src="{% if email.photo %}{% url 'raw_image' email.photo.id %}{% else %}{% static '/img/nobody/120.png' %}{% endif %}">
<img title="{% trans 'Access count' %}: {{ email.access_count }}"
src="
{% if email.photo %}
{% url 'raw_image' email.photo.id %}
{% elif email.bluesky_handle %}
{% url 'blueskyproxy' email.digest %}
{% else %}
{% static '/img/nobody/120.png' %}
{% endif %}">
<h3 class="panel-title email-profile" title="{{ email.email }}">
{{ email.email }}
</h3>
@@ -148,7 +164,15 @@
<form action="{% url 'remove_confirmed_openid' openid.id %}" method="post">{% csrf_token %}
<div>
<div id="id-conf-{{ forloop.counter }}" class="profile-container active">
<img title="{% trans 'Access count' %}: {{ openid.access_count }}" src="{% if openid.photo %}{% url 'raw_image' openid.photo.id %}{% else %}{% static '/img/nobody/120.png' %}{% endif %}">
<img title="{% trans 'Access count' %}: {{ openid.access_count }}"
src="
{% if openid.photo %}
{% url 'raw_image' openid.photo.id %}
{% elif openid.bluesky_handle %}
{% url 'blueskyproxy' openid.digest %}
{% else %}
{% static '/img/nobody/120.png' %}
{% endif %}">
<h3 class="panel-title email-profile" title="{{ openid.openid }}">
{{ openid.openid }}
</h3>

View File

@@ -456,11 +456,18 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
},
follow=True,
) # Create test addresses + 1 too much
# TODO: This test isn't super criticial, but needs to be fixed
# Currently issues an error with an unbound form
# self.assertFormError(
# response, "form", None, "Too many unconfirmed mail addresses!"
# )
# Check the response context for form errors
self.assertTrue(
hasattr(response, "context"), "Response does not have a context"
)
form = response.context.get("form")
self.assertIsNotNone(form, "No form found in response context")
# Verify form errors
self.assertFalse(form.is_valid(), "Form should not be valid")
self.assertIn(
"Too many unconfirmed mail addresses!", form.errors.get("__all__", [])
)
def test_add_mail_address_twice(self):
"""
@@ -479,11 +486,18 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
},
follow=True,
)
# TODO: This test isn't super criticial, but needs to be fixed
# Currently issues an error with an unbound form
# self.assertFormError(
# response, "form", "email", "Address already added, currently unconfirmed"
# )
# Check the response context for form errors
self.assertTrue(
hasattr(response, "context"), "Response does not have a context"
)
form = response.context.get("form")
self.assertIsNotNone(form, "No form found in response context")
# Verify form errors
self.assertFalse(form.is_valid(), "Form should not be valid")
self.assertIn(
"Address already added, currently unconfirmed", form.errors.get("email", [])
)
def test_add_already_confirmed_email_self(self): # pylint: disable=invalid-name
"""
@@ -500,11 +514,19 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
},
follow=True,
)
# TODO: This test isn't super criticial, but needs to be fixed
# Currently issues an error with an unbound form
# self.assertFormError(
# response, "form", "email", "Address already confirmed (by you)"
# )
# Check the response context for form errors
self.assertTrue(
hasattr(response, "context"), "Response does not have a context"
)
form = response.context.get("form")
self.assertIsNotNone(form, "No form found in response context")
# Verify form errors
self.assertFalse(form.is_valid(), "Form should not be valid")
self.assertIn(
"Address already confirmed (by you)", form.errors.get("email", [])
)
def test_add_already_confirmed_email_other(self): # pylint: disable=invalid-name
"""
@@ -528,11 +550,19 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
},
follow=True,
)
# TODO: This test isn't super criticial, but needs to be fixed
# Currently issues an error with an unbound form
# self.assertFormError(
# response, "form", "email", "Address already confirmed (by someone)"
# )
# Check the response context for form errors
self.assertTrue(
hasattr(response, "context"), "Response does not have a context"
)
form = response.context.get("form")
self.assertIsNotNone(form, "No form found in response context")
# Verify form errors
self.assertFalse(form.is_valid(), "Form should not be valid")
self.assertIn(
"Address already confirmed (by someone else)", form.errors.get("email", [])
)
def test_remove_unconfirmed_non_existing_email(
self,
@@ -1052,11 +1082,19 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
"There must only be one unconfirmed ID!",
)
# TODO: This test isn't super criticial, but needs to be fixed
# Currently issues an error with an unbound form
# self.assertFormError(
# response, "form", "openid", "OpenID already added, but not confirmed yet!"
# )
# Check the response context for form errors
self.assertTrue(
hasattr(response, "context"), "Response does not have a context"
)
form = response.context.get("form")
self.assertIsNotNone(form, "No form found in response context")
# Verify form errors
self.assertFalse(form.is_valid(), "Form should not be valid")
self.assertIn(
"OpenID already added, but not confirmed yet!",
form.errors.get("openid", []),
)
# Manual confirm, since testing is _really_ hard!
unconfirmed = self.user.unconfirmedopenid_set.first()
@@ -1075,11 +1113,19 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
},
follow=True,
)
# TODO: This test isn't super criticial, but needs to be fixed
# Currently issues an error with an unbound form
# self.assertFormError(
# response, "form", "openid", "OpenID already added and confirmed!"
# )
# Check the response context for form errors
self.assertTrue(
hasattr(response, "context"), "Response does not have a context"
)
form = response.context.get("form")
self.assertIsNotNone(form, "No form found in response context")
# Verify form errors
self.assertFalse(form.is_valid(), "Form should not be valid")
self.assertIn(
"OpenID already added and confirmed!", form.errors.get("openid", [])
)
def test_assign_photo_to_openid(self):
"""
@@ -1529,9 +1575,6 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
"""
Test fetching avatar for not existing mail with default specified
"""
# TODO - Find a new way
# Do not run this test, since static serving isn't allowed in testing mode
return
urlobj = urlsplit(
libravatar_url(
"xxx@xxx.xxx",
@@ -1540,11 +1583,13 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
)
)
url = "%s?%s" % (urlobj.path, urlobj.query)
url += "&gravatarproxy=n"
response = self.client.get(url, follow=False)
self.assertRedirects(
response=response,
expected_url="/static/img/nobody.png",
msg_prefix="Why does this not redirect to nobody img?",
self.assertEqual(response.status_code, 302, "Doesn't redirect with 302?")
self.assertEqual(
response["Location"],
"/static/img/nobody.png",
"Doesn't redirect to static img?",
)
def test_avatar_url_default_gravatarproxy_disabled(
@@ -1553,9 +1598,6 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
"""
Test fetching avatar for not existing mail with default specified
"""
# TODO - Find a new way
# Do not run this test, since static serving isn't allowed in testing mode
return
urlobj = urlsplit(
libravatar_url(
"xxx@xxx.xxx",
@@ -1565,10 +1607,10 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
)
url = "%s?%s&gravatarproxy=n" % (urlobj.path, urlobj.query)
response = self.client.get(url, follow=True)
self.assertRedirects(
response=response,
expected_url="/static/img/nobody.png",
msg_prefix="Why does this not redirect to the default img?",
self.assertEqual(
response.redirect_chain[0][0],
"/static/img/nobody.png",
"Doesn't redirect to static?",
)
def test_avatar_url_default_external(self): # pylint: disable=invalid-name

View File

@@ -0,0 +1,284 @@
# -*- coding: utf-8 -*-
"""
Test our views in ivatar.ivataraccount.views and ivatar.views
"""
# pylint: disable=too-many-lines
import os
import django
from django.test import TestCase
from django.test import Client
from django.urls import reverse
from django.contrib.auth.models import User
# from django.contrib.auth import authenticate
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
# pylint: disable=wrong-import-position
from ivatar import settings
from ivatar.ivataraccount.models import ConfirmedOpenId, ConfirmedEmail
from ivatar.utils import random_string
from libravatar import libravatar_url
class Tester(TestCase): # pylint: disable=too-many-public-methods
"""
Main test class
"""
client = Client()
user = None
username = random_string()
password = random_string()
email = "%s@%s.%s" % (username, random_string(), random_string(2))
# Dunno why random tld doesn't work, but I'm too lazy now to investigate
openid = "http://%s.%s.%s/" % (username, random_string(), "org")
first_name = random_string()
last_name = random_string()
bsky_test_account = "ofalk.bsky.social"
def login(self):
"""
Login as user
"""
self.client.login(username=self.username, password=self.password)
def setUp(self):
"""
Prepare for tests.
- Create user
"""
self.user = User.objects.create_user(
username=self.username,
password=self.password,
first_name=self.first_name,
last_name=self.last_name,
)
settings.EMAIL_BACKEND = "django.core.mail.backends.dummy.EmailBackend"
def create_confirmed_openid(self):
"""
Create a confirmed openid
"""
confirmed = ConfirmedOpenId.objects.create(
user=self.user,
ip_address="127.0.0.1",
openid=self.openid,
)
return confirmed
def create_confirmed_email(self):
"""
Create a confirmed email
"""
confirmed = ConfirmedEmail.objects.create(
email=self.email,
user=self.user,
)
return confirmed
# The following tests need to be moved over to the model tests
# and real web UI tests added
def test_bluesky_handle_for_mail_via_model_handle_doesnt_exist(self):
"""
Add Bluesky handle to a confirmed mail address
"""
self.login()
confirmed = self.create_confirmed_email()
confirmed.set_bluesky_handle(self.bsky_test_account)
try:
confirmed.set_bluesky_handle(self.bsky_test_account + "1")
except Exception:
pass
self.assertNotEqual(
confirmed.bluesky_handle,
self.bsky_test_account + "1",
"Setting Bluesky handle that doesn exist works?",
)
def test_bluesky_handle_for_mail_via_model_handle_exists(self):
"""
Add Bluesky handle to a confirmed mail address
"""
self.login()
confirmed = self.create_confirmed_email()
confirmed.set_bluesky_handle(self.bsky_test_account)
self.assertEqual(
confirmed.bluesky_handle,
self.bsky_test_account,
"Setting Bluesky handle doesn't work?",
)
def test_bluesky_handle_for_openid_via_model_handle_doesnt_exist(self):
"""
Add Bluesky handle to a confirmed openid address
"""
self.login()
confirmed = self.create_confirmed_openid()
confirmed.set_bluesky_handle(self.bsky_test_account)
try:
confirmed.set_bluesky_handle(self.bsky_test_account + "1")
except Exception:
pass
self.assertNotEqual(
confirmed.bluesky_handle,
self.bsky_test_account + "1",
"Setting Bluesky handle that doesn exist works?",
)
def test_bluesky_handle_for_openid_via_model_handle_exists(self):
"""
Add Bluesky handle to a confirmed openid address
"""
self.login()
confirmed = self.create_confirmed_openid()
confirmed.set_bluesky_handle(self.bsky_test_account)
self.assertEqual(
confirmed.bluesky_handle,
self.bsky_test_account,
"Setting Bluesky handle doesn't work?",
)
def test_bluesky_fetch_mail(self):
"""
Check if we can successfully fetch a Bluesky avatar via email
"""
self.login()
confirmed = self.create_confirmed_email()
confirmed.set_bluesky_handle(self.bsky_test_account)
lu = libravatar_url(confirmed.email, https=True)
lu = lu.replace("https://seccdn.libravatar.org/", reverse("home"))
response = self.client.get(lu)
# This is supposed to redirect to the Bluesky proxy
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "/blueskyproxy/%s" % confirmed.digest)
def test_bluesky_fetch_openid(self):
"""
Check if we can successfully fetch a Bluesky avatar via OpenID
"""
self.login()
confirmed = self.create_confirmed_openid()
confirmed.set_bluesky_handle(self.bsky_test_account)
lu = libravatar_url(openid=confirmed.openid, https=True)
lu = lu.replace("https://seccdn.libravatar.org/", reverse("home"))
response = self.client.get(lu)
# This is supposed to redirect to the Bluesky proxy
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "/blueskyproxy/%s" % confirmed.digest)
def test_assign_bluesky_handle_to_openid(self):
"""
Assign a Bluesky handle to an OpenID
"""
self.login()
confirmed = self.create_confirmed_openid()
url = reverse("assign_bluesky_handle_to_openid", args=[confirmed.id])
response = self.client.post(
url,
{
"bluesky_handle": self.bsky_test_account,
},
follow=True,
)
self.assertEqual(
response.status_code, 200, "Adding Bluesky handle to OpenID fails?"
)
# Fetch object again, as it has changed because of the request
confirmed.refresh_from_db(fields=["bluesky_handle"])
self.assertEqual(
confirmed.bluesky_handle,
self.bsky_test_account,
"Setting Bluesky handle doesn't work?",
)
def test_assign_bluesky_handle_to_email(self):
"""
Assign a Bluesky handle to an email
"""
self.login()
confirmed = self.create_confirmed_email()
url = reverse("assign_bluesky_handle_to_email", args=[confirmed.id])
response = self.client.post(
url,
{
"bluesky_handle": self.bsky_test_account,
},
follow=True,
)
self.assertEqual(
response.status_code, 200, "Adding Bluesky handle to Email fails?"
)
# Fetch object again, as it has changed because of the request
confirmed.refresh_from_db(fields=["bluesky_handle"])
self.assertEqual(
confirmed.bluesky_handle,
self.bsky_test_account,
"Setting Bluesky handle doesn't work?",
)
def test_assign_photo_to_mail_removes_bluesky_handle(self):
"""
Assign a Photo to a mail, removes Bluesky handle
"""
self.login()
confirmed = self.create_confirmed_email()
confirmed.bluesky_handle = self.bsky_test_account
confirmed.save()
url = reverse("assign_photo_email", args=[confirmed.id])
response = self.client.post(
url,
{
"photoNone": True,
},
follow=True,
)
self.assertEqual(response.status_code, 200, "Unassigning Photo doesn't work?")
# Fetch object again, as it has changed because of the request
confirmed.refresh_from_db(fields=["bluesky_handle"])
self.assertEqual(
confirmed.bluesky_handle,
None,
"Removing Bluesky handle doesn't work?",
)
def test_assign_photo_to_openid_removes_bluesky_handle(self):
"""
Assign a Photo to a OpenID, removes Bluesky handle
"""
self.login()
confirmed = self.create_confirmed_openid()
confirmed.bluesky_handle = self.bsky_test_account
confirmed.save()
url = reverse("assign_photo_openid", args=[confirmed.id])
response = self.client.post(
url,
{
"photoNone": True,
},
follow=True,
)
self.assertEqual(response.status_code, 200, "Unassigning Photo doesn't work?")
# Fetch object again, as it has changed because of the request
confirmed.refresh_from_db(fields=["bluesky_handle"])
self.assertEqual(
confirmed.bluesky_handle,
None,
"Removing Bluesky handle doesn't work?",
)

View File

@@ -20,6 +20,7 @@ from .views import RemoveUnconfirmedOpenIDView, RemoveConfirmedOpenIDView
from .views import ImportPhotoView, RawImageView, DeletePhotoView
from .views import UploadPhotoView, AssignPhotoOpenIDView
from .views import AddOpenIDView, RedirectOpenIDView, ConfirmOpenIDView
from .views import AssignBlueskyHandleToEmailView, AssignBlueskyHandleToOpenIdView
from .views import CropPhotoView
from .views import UserPreferenceView, UploadLibravatarExportView
from .views import ResendConfirmationMailView
@@ -125,6 +126,16 @@ urlpatterns = [ # pylint: disable=invalid-name
AssignPhotoOpenIDView.as_view(),
name="assign_photo_openid",
),
re_path(
r"assign_bluesky_handle_to_email/(?P<email_id>\d+)",
AssignBlueskyHandleToEmailView.as_view(),
name="assign_bluesky_handle_to_email",
),
re_path(
r"assign_bluesky_handle_to_openid/(?P<open_id>\d+)",
AssignBlueskyHandleToOpenIdView.as_view(),
name="assign_bluesky_handle_to_openid",
),
re_path(r"import_photo/$", ImportPhotoView.as_view(), name="import_photo"),
re_path(
r"import_photo/(?P<email_addr>[\w.+-]+@[\w.]+.[\w.]+)",

View File

@@ -3,7 +3,7 @@
View classes for ivatar/ivataraccount/
"""
from io import BytesIO
from urllib.request import urlopen
from ivatar.utils import urlopen, Bluesky
import base64
import binascii
from xml.sax import saxutils
@@ -288,6 +288,7 @@ class AssignPhotoEmailView(SuccessMessageMixin, TemplateView):
messages.error(request, _("Photo does not exist"))
return HttpResponseRedirect(reverse_lazy("profile"))
email.photo = photo
email.bluesky_handle = None
email.save()
messages.success(request, _("Successfully changed photo"))
@@ -337,6 +338,7 @@ class AssignPhotoOpenIDView(SuccessMessageMixin, TemplateView):
messages.error(request, _("Photo does not exist"))
return HttpResponseRedirect(reverse_lazy("profile"))
openid.photo = photo
openid.bluesky_handle = None
openid.save()
messages.success(request, _("Successfully changed photo"))
@@ -350,6 +352,96 @@ class AssignPhotoOpenIDView(SuccessMessageMixin, TemplateView):
return data
@method_decorator(login_required, name="dispatch")
class AssignBlueskyHandleToEmailView(SuccessMessageMixin, TemplateView):
"""
View class for assigning a Bluesky handle to an email address
"""
def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
"""
Handle post request - assign bluesky handle to email
"""
try:
email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
messages.error(request, _("Invalid request"))
return HttpResponseRedirect(reverse_lazy("profile"))
if "bluesky_handle" not in request.POST:
messages.error(request, _("Invalid request [bluesky_handle] missing"))
return HttpResponseRedirect(reverse_lazy("profile"))
bluesky_handle = request.POST["bluesky_handle"]
try:
bs = Bluesky()
bs.get_avatar(bluesky_handle)
except Exception as e:
messages.error(
request, _("Handle '%s' not found: %s" % (bluesky_handle, e))
)
return HttpResponseRedirect(reverse_lazy("profile"))
email.set_bluesky_handle(bluesky_handle)
email.photo = None
email.save()
messages.success(request, _("Successfully assigned Bluesky handle"))
return HttpResponseRedirect(reverse_lazy("profile"))
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
return data
@method_decorator(login_required, name="dispatch")
class AssignBlueskyHandleToOpenIdView(SuccessMessageMixin, TemplateView):
"""
View class for assigning a Bluesky handle to an email address
"""
def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
"""
Handle post request - assign bluesky handle to email
"""
try:
openid = ConfirmedOpenId.objects.get(
user=request.user, id=kwargs["open_id"]
)
except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
messages.error(request, _("Invalid request"))
return HttpResponseRedirect(reverse_lazy("profile"))
if "bluesky_handle" not in request.POST:
messages.error(request, _("Invalid request [bluesky_handle] missing"))
return HttpResponseRedirect(reverse_lazy("profile"))
bluesky_handle = request.POST["bluesky_handle"]
try:
bs = Bluesky()
bs.get_avatar(bluesky_handle)
except Exception as e:
messages.error(
request, _("Handle '%s' not found: %s" % (bluesky_handle, e))
)
return HttpResponseRedirect(reverse_lazy("profile"))
openid.set_bluesky_handle(bluesky_handle)
openid.photo = None
openid.save()
messages.success(request, _("Successfully assigned Bluesky handle"))
return HttpResponseRedirect(reverse_lazy("profile"))
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
data["openid"] = ConfirmedOpenId.objects.get(pk=kwargs["open_id"])
return data
@method_decorator(login_required, name="dispatch")
class ImportPhotoView(SuccessMessageMixin, TemplateView):
"""

View File

@@ -10,8 +10,14 @@ from django.urls import reverse
from django.test import TestCase
from django.test import Client
from django.contrib.auth.models import User
from ivatar.utils import random_string, Bluesky
from ivatar.utils import random_string
BLUESKY_APP_PASSWORD = None
BLUESKY_IDENTIFIER = None
try:
from settings import BLUESKY_APP_PASSWORD, BLUESKY_IDENTIFIER
except Exception: # pylint: disable=broad-except
pass
os.environ["DJANGO_SETTINGS_MODULE"] = "ivatar.settings"
django.setup()
@@ -89,3 +95,18 @@ class Tester(TestCase): # pylint: disable=too-many-public-methods
)
response = self.client.post(reverse("logout"), follow=True)
self.assertEqual(response.status_code, 200, "logout with post should logout")
def test_Bluesky_client(self):
"""
Bluesky client needs credentials, so it's limited with testing here now
"""
if BLUESKY_APP_PASSWORD and BLUESKY_IDENTIFIER:
b = Bluesky()
profile = b.get_profile("ofalk.bsky.social")
self.assertEqual(profile["handle"], "ofalk.bsky.social")
# As long as I don't change my avatar, this should stay the same
self.assertEqual(
profile["avatar"],
"https://cdn.bsky.app/img/avatar/plain/did:plc:35jdu26cjgsc5vdbsaqiuw4a/bafkreidgtubihcdwcr72s5nag2ohcnwhhbg2zabw4jtxlhmtekrm6t5f4y@jpeg",
)
self.assertEqual(True, True)

View File

@@ -16,7 +16,7 @@ from libravatar import libravatar_url, parse_user_identity
from libravatar import SECURE_BASE_URL as LIBRAVATAR_SECURE_BASE_URL
from libravatar import BASE_URL as LIBRAVATAR_BASE_URL
from ivatar.settings import SECURE_BASE_URL, BASE_URL
from ivatar.settings import SECURE_BASE_URL, BASE_URL, SITE_NAME, DEBUG
from .forms import (
CheckDomainForm,
CheckForm,
@@ -139,6 +139,36 @@ class CheckView(FormView):
openid=form.cleaned_data["openid"], email=None
)[0]
if "DEVELOPMENT" in SITE_NAME:
if DEBUG:
if mailurl:
mailurl = mailurl.replace(
"https://avatars.linux-kernel.at",
"http://" + self.request.get_host(),
)
if mailurl_secure:
mailurl_secure = mailurl_secure.replace(
"https://avatars.linux-kernel.at",
"http://" + self.request.get_host(),
)
if mailurl_secure_256:
mailurl_secure_256 = mailurl_secure_256.replace(
"https://avatars.linux-kernel.at",
"http://" + self.request.get_host(),
)
if openidurl:
openidurl = openidurl.replace(
"https://avatars.linux-kernel.at",
"http://" + self.request.get_host(),
)
if openidurl_secure:
openidurl_secure = openidurl_secure.replace(
"https://avatars.linux-kernel.at",
"http://" + self.request.get_host(),
)
print(mailurl, openidurl, mailurl_secure, mailurl_secure_256, openidurl_secure)
return render(
self.request,
self.template_name,

View File

@@ -7,7 +7,8 @@ from django.urls import path, include, re_path
from django.conf.urls.static import static
from django.views.generic import TemplateView, RedirectView
from ivatar import settings
from .views import AvatarImageView, GravatarProxyView, StatsView
from .views import AvatarImageView, StatsView
from .views import GravatarProxyView, BlueskyProxyView
urlpatterns = [ # pylint: disable=invalid-name
path("admin/", admin.site.urls),
@@ -31,6 +32,11 @@ urlpatterns = [ # pylint: disable=invalid-name
GravatarProxyView.as_view(),
name="gravatarproxy",
),
re_path(
r"blueskyproxy/(?P<digest>\w*)",
BlueskyProxyView.as_view(),
name="blueskyproxy",
),
path(
"description/",
TemplateView.as_view(template_name="description.html"),

View File

@@ -7,6 +7,82 @@ import string
from io import BytesIO
from PIL import Image, ImageDraw, ImageSequence
from urllib.parse import urlparse
import requests
from ivatar.settings import DEBUG, URL_TIMEOUT
from urllib.request import urlopen as urlopen_orig
BLUESKY_IDENTIFIER = None
BLUESKY_APP_PASSWORD = None
try:
from ivatar.settings import BLUESKY_IDENTIFIER, BLUESKY_APP_PASSWORD
except Exception: # pylint: disable=broad-except
pass
def urlopen(url, timeout=URL_TIMEOUT):
ctx = None
if DEBUG:
import ssl
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return urlopen_orig(url, timeout=timeout, context=ctx)
class Bluesky:
"""
Handle Bluesky client access
"""
identifier = ""
app_password = ""
service = "https://bsky.social"
session = None
def __init__(
self,
identifier: str = BLUESKY_IDENTIFIER,
app_password: str = BLUESKY_APP_PASSWORD,
service: str = "https://bsky.social",
):
self.identifier = identifier
self.app_password = app_password
self.service = service
def login(self):
"""
Login to Bluesky
"""
auth_response = requests.post(
f"{self.service}/xrpc/com.atproto.server.createSession",
json={"identifier": self.identifier, "password": self.app_password},
)
auth_response.raise_for_status()
self.session = auth_response.json()
def get_profile(self, handle: str):
if not self.session:
self.login()
profile_response = None
try:
profile_response = requests.get(
f"{self.service}/xrpc/app.bsky.actor.getProfile",
headers={"Authorization": f'Bearer {self.session["accessJwt"]}'},
params={"actor": handle},
)
profile_response.raise_for_status()
except Exception as exc:
print(
"Bluesky profile fetch failed with HTTP error: %s" % exc
) # pragma: no cover
return None
return profile_response.json()
def get_avatar(self, handle: str):
profile = self.get_profile(handle)
return profile["avatar"] if profile else None
def random_string(length=10):

View File

@@ -5,7 +5,7 @@ views under /
from io import BytesIO
from os import path
import hashlib
from urllib.request import urlopen
from ivatar.utils import urlopen, Bluesky
from urllib.error import HTTPError, URLError
from ssl import SSLError
from django.views.generic.base import TemplateView, View
@@ -36,8 +36,6 @@ from .ivataraccount.models import Photo
from .ivataraccount.models import pil_format, file_format
from .utils import is_trusted_url, mm_ng, resize_animated_gif
URL_TIMEOUT = 5 # in seconds
def get_size(request, size=DEFAULT_AVATAR_SIZE):
"""
@@ -123,7 +121,8 @@ class AvatarImageView(TemplateView):
if CACHE_RESPONSE:
centry = caches["filesystem"].get(uri)
if centry:
# For DEBUG purpose only print('Cached entry for %s' % uri)
# For DEBUG purpose only
# print('Cached entry for %s' % uri)
return HttpResponse(
centry["content"],
content_type=centry["content_type"],
@@ -192,6 +191,12 @@ class AvatarImageView(TemplateView):
except Exception: # pylint: disable=bare-except
pass
# Handle the special case of Bluesky
if obj:
if obj.bluesky_handle:
return HttpResponseRedirect(
reverse_lazy("blueskyproxy", args=[kwargs["digest"]])
)
# If that mail/openid doesn't exist, or has no photo linked to it
if not obj or not obj.photo or forcedefault:
gravatar_url = (
@@ -396,7 +401,7 @@ class GravatarProxyView(View):
# print("Cached Gravatar response: Default.")
return redir_default(default)
try:
urlopen(gravatar_test_url, timeout=URL_TIMEOUT)
urlopen(gravatar_test_url)
except HTTPError as exc:
if exc.code == 404:
cache.set(gravatar_test_url, "default", 60)
@@ -415,7 +420,7 @@ class GravatarProxyView(View):
print("Cached Gravatar fetch failed with URL error: %s" % gravatar_url)
return redir_default(default)
gravatarimagedata = urlopen(gravatar_url, timeout=URL_TIMEOUT)
gravatarimagedata = urlopen(gravatar_url)
except HTTPError as exc:
if exc.code != 404 and exc.code != 503:
print(
@@ -450,6 +455,131 @@ class GravatarProxyView(View):
return redir_default(default)
class BlueskyProxyView(View):
"""
Proxy request to Bluesky and return the image from there
"""
def get(
self, request, *args, **kwargs
): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
"""
Override get from parent class
"""
def redir_default(default=None):
url = (
reverse_lazy("avatar_view", args=[kwargs["digest"]])
+ "?s=%i" % size
+ "&forcedefault=y"
)
if default is not None:
url += "&default=%s" % default
return HttpResponseRedirect(url)
size = get_size(request)
print(size)
blueskyimagedata = None
default = None
try:
if str(request.GET["default"]) != "None":
default = request.GET["default"]
except Exception: # pylint: disable=bare-except
pass
identity = None
# First check for email, as this is the most common
try:
identity = ConfirmedEmail.objects.filter(
Q(digest=kwargs["digest"]) | Q(digest_sha256=kwargs["digest"])
).first()
except Exception as exc:
print(exc)
# If no identity is found in the email table, try the openid table
if not identity:
try:
identity = ConfirmedOpenId.objects.filter(
Q(digest=kwargs["digest"])
| Q(alt_digest1=kwargs["digest"])
| Q(alt_digest2=kwargs["digest"])
| Q(alt_digest3=kwargs["digest"])
).first()
except Exception as exc:
print(exc)
# If still no identity is found, redirect to the default
if not identity:
return redir_default(default)
bs = Bluesky()
bluesky_url = None
# Try with the cache first
try:
if cache.get(identity.bluesky_handle):
bluesky_url = cache.get(identity.bluesky_handle)
except Exception: # pylint: disable=bare-except
pass
if not bluesky_url:
try:
bluesky_url = bs.get_avatar(identity.bluesky_handle)
cache.set(identity.bluesky_handle, bluesky_url)
except Exception: # pylint: disable=bare-except
return redir_default(default)
try:
if cache.get(bluesky_url) == "err":
print("Cached Bluesky fetch failed with URL error: %s" % bluesky_url)
return redir_default(default)
blueskyimagedata = urlopen(bluesky_url)
except HTTPError as exc:
if exc.code != 404 and exc.code != 503:
print(
"Bluesky fetch failed with an unexpected %s HTTP error: %s"
% (exc.code, bluesky_url)
)
cache.set(bluesky_url, "err", 30)
return redir_default(default)
except URLError as exc:
print("Bluesky fetch failed with URL error: %s" % exc.reason)
cache.set(bluesky_url, "err", 30)
return redir_default(default)
except SSLError as exc:
print("Bluesky fetch failed with SSL error: %s" % exc.reason)
cache.set(bluesky_url, "err", 30)
return redir_default(default)
try:
data = BytesIO(blueskyimagedata.read())
img = Image.open(data)
format = img.format
if max(img.size) > size:
aspect = img.size[0] / float(img.size[1])
if aspect > 1:
new_size = (size, int(size / aspect))
else:
new_size = (int(size * aspect), size)
img = img.resize(new_size)
data = BytesIO()
img.save(data, format=format)
data.seek(0)
response = HttpResponse(
data.read(), content_type="image/%s" % file_format(format)
)
response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
return response
except ValueError as exc:
print("Value error: %s" % exc)
return redir_default(default)
# We shouldn't reach this point... But make sure we do something
return redir_default(default)
class StatsView(TemplateView, JsonResponse):
"""
Return stats