"""
views under /
"""
import contextlib
from io import BytesIO
from os import path
import hashlib
import logging
import threading
from ivatar.utils import urlopen, Bluesky
from urllib.error import HTTPError, URLError
from ssl import SSLError
from django.views.generic.base import TemplateView, View
from django.http import HttpResponse, HttpResponseRedirect
from django.http import HttpResponseNotFound, JsonResponse
from django.core.exceptions import ObjectDoesNotExist
from django.core.cache import cache, caches
from django.utils.translation import gettext_lazy as _
from django.urls import reverse_lazy
from django.db.models import Q
from django.contrib.auth.models import User
from PIL import Image
from monsterid.id import build_monster as BuildMonster
import Identicon
from pydenticon5 import Pydenticon5
from .robohash import create_robohash
from .pagan_optimized import create_optimized_pagan
from ivatar.settings import AVATAR_MAX_SIZE, JPEG_QUALITY, DEFAULT_AVATAR_SIZE
from ivatar.settings import CACHE_RESPONSE
from ivatar.settings import CACHE_IMAGES_MAX_AGE
from ivatar.settings import TRUSTED_DEFAULT_URLS
from ivatar.settings import (
DEFAULT_GRAVATARPROXY,
DEFAULT_GRAVATARREDIRECT,
FORCEDEFAULT,
)
from .ivataraccount.models import ConfirmedEmail, ConfirmedOpenId
from .ivataraccount.models import UnconfirmedEmail, UnconfirmedOpenId
from .ivataraccount.models import Photo
from .ivataraccount.models import pil_format, file_format
from .utils import is_trusted_url, mm_ng, resize_animated_gif
# Import OpenTelemetry (always enabled, export controlled by OTEL_EXPORT_ENABLED)
try:
from .opentelemetry_middleware import trace_avatar_operation, get_avatar_metrics
avatar_metrics = get_avatar_metrics()
except ImportError:
# OpenTelemetry packages not installed
def trace_avatar_operation(operation_name):
def decorator(func):
return func
return decorator
class NoOpMetrics:
def record_avatar_generated(self, *args, **kwargs):
pass
def record_cache_hit(self, *args, **kwargs):
pass
def record_cache_miss(self, *args, **kwargs):
pass
def record_external_request(self, *args, **kwargs):
pass
def record_file_upload(self, *args, **kwargs):
pass
avatar_metrics = NoOpMetrics()
# Initialize loggers
logger = logging.getLogger("ivatar")
security_logger = logging.getLogger("ivatar.security")
def get_size(request, size=DEFAULT_AVATAR_SIZE):
"""
Get size from the URL arguments
"""
sizetemp = None
if "s" in request.GET:
sizetemp = request.GET["s"]
if "size" in request.GET:
sizetemp = request.GET["size"]
if sizetemp:
if sizetemp not in ["", "0"]:
with contextlib.suppress(ValueError):
if int(sizetemp) > 0:
size = int(sizetemp)
size = min(size, int(AVATAR_MAX_SIZE))
return size
class CachingHttpResponse(HttpResponse):
"""
Handle caching of response
"""
def __init__(
self,
uri,
content=b"",
content_type=None,
status=200, # pylint: disable=too-many-arguments
reason=None,
charset=None,
):
if CACHE_RESPONSE:
caches["filesystem"].set(
uri,
{
"content": content,
"content_type": content_type,
"status": status,
"reason": reason,
"charset": charset,
},
)
super().__init__(content, content_type, status, reason, charset)
class AvatarImageView(TemplateView):
"""
View to return (binary) image, based on OpenID/Email (both by digest)
"""
# TODO: Do cache resize images!! Memcached?
def options(self, request, *args, **kwargs):
response = HttpResponse("", content_type="text/plain")
response["Allow"] = "404 mm mp retro pagan wavatar monsterid robohash identicon"
return response
def get(
self, request, *args, **kwargs
): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-return-statements
"""
Override get from parent class
"""
model = ConfirmedEmail
size = get_size(request)
imgformat = "png"
obj = None
default = None
forcedefault = FORCEDEFAULT
gravatarredirect = DEFAULT_GRAVATARREDIRECT
gravatarproxy = DEFAULT_GRAVATARPROXY
uri = request.build_absolute_uri()
# Check the cache first
if CACHE_RESPONSE:
if centry := caches["filesystem"].get(uri):
# Record cache hit
avatar_metrics.record_cache_hit(size=str(size), format_type=imgformat)
# For DEBUG purpose only
# print('Cached entry for %s' % uri)
return HttpResponse(
centry["content"],
content_type=centry["content_type"],
status=centry["status"],
reason=centry["reason"],
charset=centry["charset"],
)
else:
# Record cache miss
avatar_metrics.record_cache_miss(size=str(size), format_type=imgformat)
# In case no digest at all is provided, return to home page
if "digest" not in kwargs:
return HttpResponseRedirect(reverse_lazy("home"))
if "d" in request.GET:
default = request.GET["d"]
if "default" in request.GET:
default = request.GET["default"]
if default is not None:
if TRUSTED_DEFAULT_URLS is None:
logger.warning("Query parameter `default` is disabled.")
default = None
elif default.find("://") > 0:
# Check if it's trusted, if not, reset to None
trusted_url = is_trusted_url(default, TRUSTED_DEFAULT_URLS)
if not trusted_url:
security_logger.warning(
f"Default URL is not in trusted URLs: '{default}'; Kicking it!"
)
default = None
if "f" in request.GET:
if request.GET["f"] == "y":
forcedefault = True
if "forcedefault" in request.GET:
if request.GET["forcedefault"] == "y":
forcedefault = True
if "gravatarredirect" in request.GET:
if request.GET["gravatarredirect"] == "y":
gravatarredirect = True
if "gravatarproxy" in request.GET:
if request.GET["gravatarproxy"] == "n":
gravatarproxy = False
try:
obj = model.objects.get(digest=kwargs["digest"])
except ObjectDoesNotExist:
try:
obj = model.objects.get(digest_sha256=kwargs["digest"])
except ObjectDoesNotExist:
model = ConfirmedOpenId
with contextlib.suppress(Exception):
d = kwargs["digest"] # pylint: disable=invalid-name
# OpenID is tricky. http vs. https, versus trailing slash or not
# However, some users eventually have added their variations already
# and therefore we need to use filter() and first()
obj = model.objects.filter(
Q(digest=d)
| Q(alt_digest1=d)
| Q(alt_digest2=d)
| Q(alt_digest3=d)
).first()
# Handle the special case of Bluesky
if obj:
if obj.bluesky_handle:
return HttpResponseRedirect(
reverse_lazy("blueskyproxy", args=[kwargs["digest"]])
)
# If that mail/openid doesn't exist, or has no photo linked to it
if not obj or not obj.photo or forcedefault:
gravatar_url = (
"https://secure.gravatar.com/avatar/"
+ kwargs["digest"]
+ "?s=%i" % size
)
# If we have redirection to Gravatar enabled, this overrides all
# default= settings, except forcedefault!
if gravatarredirect and not forcedefault:
return HttpResponseRedirect(gravatar_url)
# Request to proxy Gravatar image - only if not forcedefault
if gravatarproxy and not forcedefault:
url = (
reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
+ "?s=%i" % size
)
# Ensure we do not convert None to string 'None'
if default:
url += f"&default={default}"
return HttpResponseRedirect(url)
# Return the default URL, as specified, or 404 Not Found, if default=404
if default:
# Proxy to gravatar to generate wavatar - lazy me
if str(default) == "wavatar":
url = (
reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
+ "?s=%i" % size
+ f"&default={default}&f=y"
)
return HttpResponseRedirect(url)
if str(default) == str(404):
return HttpResponseNotFound(_("
Image not found
"))
if str(default) == "monsterid":
monsterdata = BuildMonster(seed=kwargs["digest"], size=(size, size))
data = BytesIO()
return self._return_cached_png(monsterdata, data, uri)
if str(default) == "robohash":
roboset = request.GET.get("robohash") or "any"
data = create_robohash(kwargs["digest"], size, roboset)
return self._return_cached_response(data, uri)
if str(default) == "retro":
identicon = Identicon.render(kwargs["digest"])
data = BytesIO()
img = Image.open(BytesIO(identicon))
img = img.resize((size, size), Image.LANCZOS)
return self._return_cached_png(img, data, uri)
if str(default) == "pagan":
data = create_optimized_pagan(kwargs["digest"], size)
return self._return_cached_response(data, uri)
if str(default) == "identicon":
p = Pydenticon5() # pylint: disable=invalid-name
# In order to make use of the whole 32 bytes digest, we need to redigest them.
newdigest = hashlib.md5(
bytes(kwargs["digest"], "utf-8")
).hexdigest()
img = p.draw(newdigest, size, 0)
data = BytesIO()
return self._return_cached_png(img, data, uri)
if str(default) == "mmng":
mmngimg = mm_ng(idhash=kwargs["digest"], size=size)
data = BytesIO()
return self._return_cached_png(mmngimg, data, uri)
if str(default) in {"mm", "mp"}:
return self._redirect_static_w_size("mm", size)
return HttpResponseRedirect(default)
return self._redirect_static_w_size("nobody", size)
imgformat = obj.photo.format
photodata = Image.open(BytesIO(obj.photo.data))
data = BytesIO()
# Animated GIFs need additional handling
if imgformat == "gif" and photodata.is_animated:
# Debug only
# print("Object is animated and has %i frames" % photodata.n_frames)
data = resize_animated_gif(photodata, (size, size))
else:
# If the image is smaller than what was requested, we need
# to use the function resize
if photodata.size[0] < size or photodata.size[1] < size:
photodata = photodata.resize((size, size), Image.LANCZOS)
else:
photodata.thumbnail((size, size), Image.LANCZOS)
photodata.save(data, pil_format(imgformat), quality=JPEG_QUALITY)
data.seek(0)
obj.photo.access_count += 1
obj.photo.save()
obj.access_count += 1
obj.save()
if imgformat == "jpg":
imgformat = "jpeg"
# Record avatar generation metrics
avatar_metrics.record_avatar_generated(
size=str(size),
format_type=imgformat,
source="uploaded" if obj else "generated",
)
response = CachingHttpResponse(uri, data, content_type=f"image/{imgformat}")
response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
# Remove Vary header for images since language doesn't matter
response["Vary"] = ""
return response
def _redirect_static_w_size(self, arg0, size):
"""
Helper method to redirect to static image with size i/a
"""
# If mm is explicitly given, we need to catch that
static_img = path.join("static", "img", arg0, f"{str(size)}.png")
if not path.isfile(static_img):
# We trust this exists!!!
static_img = path.join("static", "img", arg0, "512.png")
# We trust static/ is mapped to /static/
return HttpResponseRedirect(f"/{static_img}")
def _return_cached_response(self, data, uri):
data.seek(0)
response = CachingHttpResponse(uri, data, content_type="image/png")
response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
# Remove Vary header for images since language doesn't matter
response["Vary"] = ""
return response
@trace_avatar_operation("generate_png")
def _return_cached_png(self, arg0, data, uri):
arg0.save(data, "PNG", quality=JPEG_QUALITY)
return self._return_cached_response(data, uri)
class GravatarProxyView(View):
"""
Proxy request to Gravatar and return the image from there
"""
# TODO: Do cache images!! Memcached?
@trace_avatar_operation("gravatar_proxy")
def get(
self, request, *args, **kwargs
): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
"""
Override get from parent class
"""
def redir_default(default=None):
url = (
reverse_lazy("avatar_view", args=[kwargs["digest"]])
+ "?s=%i" % size
+ "&forcedefault=y"
)
if default is not None:
url += f"&default={default}"
return HttpResponseRedirect(url)
size = get_size(request)
gravatarimagedata = None
default = None
with contextlib.suppress(Exception):
if str(request.GET["default"]) != "None":
default = request.GET["default"]
if str(default) != "wavatar":
# This part is special/hackish
# Check if the image returned by Gravatar is their default image, if so,
# redirect to our default instead.
gravatar_test_url = (
"https://secure.gravatar.com/avatar/"
+ kwargs["digest"]
+ "?s=%i&d=%i" % (50, 404)
)
if cache.get(gravatar_test_url) == "default":
# DEBUG only
# print("Cached Gravatar response: Default.")
return redir_default(default)
try:
urlopen(gravatar_test_url)
except HTTPError as exc:
if exc.code == 404:
cache.set(gravatar_test_url, "default", 60)
else:
logger.warning(f"Gravatar test url fetch failed: {exc}")
return redir_default(default)
gravatar_url = (
"https://secure.gravatar.com/avatar/" + kwargs["digest"] + "?s=%i" % size
)
if default:
gravatar_url += f"&d={default}"
try:
if cache.get(gravatar_url) == "err":
logger.warning(
f"Cached Gravatar fetch failed with URL error: {gravatar_url}"
)
return redir_default(default)
gravatarimagedata = urlopen(gravatar_url)
except HTTPError as exc:
if exc.code not in [404, 503]:
logger.warning(
f"Gravatar fetch failed with an unexpected {exc.code} HTTP error: {gravatar_url}"
)
cache.set(gravatar_url, "err", 30)
return redir_default(default)
except URLError as exc:
logger.warning(f"Gravatar fetch failed with URL error: {exc.reason}")
cache.set(gravatar_url, "err", 30)
return redir_default(default)
except SSLError as exc:
logger.warning(f"Gravatar fetch failed with SSL error: {exc.reason}")
cache.set(gravatar_url, "err", 30)
return redir_default(default)
try:
data = BytesIO(gravatarimagedata.read())
img = Image.open(data)
data.seek(0)
response = HttpResponse(
data.read(), content_type=f"image/{file_format(img.format)}"
)
response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
# Remove Vary header for images since language doesn't matter
response["Vary"] = ""
return response
except ValueError as exc:
logger.error(f"Value error: {exc}")
return redir_default(default)
# We shouldn't reach this point... But make sure we do something
return redir_default(default)
class BlueskyProxyView(View):
"""
Proxy request to Bluesky and return the image from there
"""
def get(
self, request, *args, **kwargs
): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
"""
Override get from parent class
"""
def redir_default(default=None):
url = (
reverse_lazy("avatar_view", args=[kwargs["digest"]])
+ "?s=%i" % size
+ "&forcedefault=y"
)
if default is not None:
url += f"&default={default}"
return HttpResponseRedirect(url)
size = get_size(request)
logger.debug(f"Bluesky avatar size requested: {size}")
blueskyimagedata = None
default = None
with contextlib.suppress(Exception):
if str(request.GET["default"]) != "None":
default = request.GET["default"]
identity = None
# First check for email, as this is the most common
try:
identity = ConfirmedEmail.objects.filter(
Q(digest=kwargs["digest"]) | Q(digest_sha256=kwargs["digest"])
).first()
except Exception as exc:
logger.warning(f"Exception: {exc}")
# If no identity is found in the email table, try the openid table
if not identity:
try:
identity = ConfirmedOpenId.objects.filter(
Q(digest=kwargs["digest"])
| Q(alt_digest1=kwargs["digest"])
| Q(alt_digest2=kwargs["digest"])
| Q(alt_digest3=kwargs["digest"])
).first()
except Exception as exc:
logger.warning(f"Exception: {exc}")
# If still no identity is found, redirect to the default
if not identity:
return redir_default(default)
bs = Bluesky()
bluesky_url = None
# Try with the cache first
with contextlib.suppress(Exception):
if cache.get(identity.bluesky_handle):
bluesky_url = cache.get(identity.bluesky_handle)
if not bluesky_url:
try:
bluesky_url = bs.get_avatar(identity.bluesky_handle)
cache.set(identity.bluesky_handle, bluesky_url)
except Exception: # pylint: disable=bare-except
return redir_default(default)
try:
if cache.get(bluesky_url) == "err":
logger.warning(
f"Cached Bluesky fetch failed with URL error: {bluesky_url}"
)
return redir_default(default)
blueskyimagedata = urlopen(bluesky_url)
except HTTPError as exc:
if exc.code not in [404, 503]:
print(
f"Bluesky fetch failed with an unexpected {exc.code} HTTP error: {bluesky_url}"
)
cache.set(bluesky_url, "err", 30)
return redir_default(default)
except URLError as exc:
logger.warning(f"Bluesky fetch failed with URL error: {exc.reason}")
cache.set(bluesky_url, "err", 30)
return redir_default(default)
except SSLError as exc:
logger.warning(f"Bluesky fetch failed with SSL error: {exc.reason}")
cache.set(bluesky_url, "err", 30)
return redir_default(default)
try:
data = BytesIO(blueskyimagedata.read())
img = Image.open(data)
img_format = img.format
if max(img.size) > size:
aspect = img.size[0] / float(img.size[1])
if aspect > 1:
new_size = (size, int(size / aspect))
else:
new_size = (int(size * aspect), size)
img = img.resize(new_size)
data = BytesIO()
img.save(data, format=img_format)
data.seek(0)
response = HttpResponse(
data.read(), content_type=f"image/{file_format(format)}"
)
response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
# Remove Vary header for images since language doesn't matter
response["Vary"] = ""
return response
except ValueError as exc:
logger.error(f"Value error: {exc}")
return redir_default(default)
# We shouldn't reach this point... But make sure we do something
return redir_default(default)
class StatsView(TemplateView, JsonResponse):
"""
Return stats
"""
def get(
self, request, *args, **kwargs
): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
retval = {
"users": User.objects.count(),
"mails": ConfirmedEmail.objects.count(),
"openids": ConfirmedOpenId.objects.count(), # pylint: disable=no-member
"unconfirmed_mails": UnconfirmedEmail.objects.count(), # pylint: disable=no-member
"unconfirmed_openids": UnconfirmedOpenId.objects.count(), # pylint: disable=no-member
"avatars": Photo.objects.count(), # pylint: disable=no-member
}
# Top 10 viewed avatars
top_photos = Photo.objects.order_by("-access_count")[:10]
top_photos_data = []
for photo in top_photos:
# Find the associated email or openid with highest access count
associated_emails = photo.emails.all().order_by("-access_count")
associated_openids = photo.openids.all().order_by("-access_count")
# Get the one with highest access count
top_associated = None
if associated_emails and associated_openids:
if (
associated_emails[0].access_count
>= associated_openids[0].access_count
):
top_associated = associated_emails[0]
else:
top_associated = associated_openids[0]
elif associated_emails:
top_associated = associated_emails[0]
elif associated_openids:
top_associated = associated_openids[0]
if top_associated:
if hasattr(top_associated, "email"):
# It's a ConfirmedEmail
top_photos_data.append(
{
"access_count": top_associated.access_count,
"avatar_url": f"https://libravatar.org/avatar/{top_associated.digest_sha256}",
}
)
else:
# It's a ConfirmedOpenId
top_photos_data.append(
{
"access_count": top_associated.access_count,
"avatar_url": f"https://libravatar.org/avatar/{top_associated.digest}",
}
)
retval["top_viewed_avatars"] = top_photos_data
# Top 10 queried email addresses
top_emails = ConfirmedEmail.objects.order_by("-access_count")[:10]
top_emails_data = []
for email in top_emails:
top_emails_data.append(
{
"access_count": email.access_count,
"avatar_url": f"https://libravatar.org/avatar/{email.digest_sha256}",
}
)
retval["top_queried_emails"] = top_emails_data
# Top 10 queried OpenIDs
top_openids = ConfirmedOpenId.objects.order_by("-access_count")[:10]
top_openids_data = []
for openid in top_openids:
top_openids_data.append(
{
"access_count": openid.access_count,
"avatar_url": f"https://libravatar.org/avatar/{openid.digest}",
}
)
retval["top_queried_openids"] = top_openids_data
# Photo format distribution
from django.db.models import Count
format_distribution = (
Photo.objects.values("format")
.annotate(count=Count("format"))
.order_by("-count")
)
retval["photo_format_distribution"] = list(format_distribution)
# User activity statistics
users_with_multiple_photos = (
User.objects.annotate(photo_count=Count("photo"))
.filter(photo_count__gt=1)
.count()
)
users_with_both_email_and_openid = (
User.objects.filter(
confirmedemail__isnull=False, confirmedopenid__isnull=False
)
.distinct()
.count()
)
# Calculate average photos per user
total_photos = Photo.objects.count()
total_users = User.objects.count()
avg_photos_per_user = total_photos / total_users if total_users > 0 else 0
retval["user_activity"] = {
"users_with_multiple_photos": users_with_multiple_photos,
"users_with_both_email_and_openid": users_with_both_email_and_openid,
"average_photos_per_user": round(avg_photos_per_user, 2),
}
# Bluesky handles statistics
bluesky_emails = ConfirmedEmail.objects.filter(
bluesky_handle__isnull=False
).count()
bluesky_openids = ConfirmedOpenId.objects.filter(
bluesky_handle__isnull=False
).count()
total_bluesky_handles = bluesky_emails + bluesky_openids
# Top Bluesky handles by access count
retval["bluesky_handles"] = {
"total_bluesky_handles": total_bluesky_handles,
"bluesky_emails": bluesky_emails,
"bluesky_openids": bluesky_openids,
}
# Average photo size statistics using raw SQL
from django.db import connection
with connection.cursor() as cursor:
# SQL to calculate average photo size
cursor.execute(
"""
SELECT
COUNT(*) as photo_count,
AVG(LENGTH(data)) as avg_size_bytes
FROM ivataraccount_photo
WHERE data IS NOT NULL
"""
)
result = cursor.fetchone()
if result and result[0] > 0:
photo_count, avg_size_bytes = result
# Convert to float in case database returns string
avg_size_bytes = float(avg_size_bytes) if avg_size_bytes else 0
avg_size_kb = round(avg_size_bytes / 1024, 2) if avg_size_bytes else 0
avg_size_mb = (
round(avg_size_bytes / (1024 * 1024), 2) if avg_size_bytes else 0
)
retval["photo_size_stats"] = {
"average_size_bytes": (
round(avg_size_bytes, 2) if avg_size_bytes else 0
),
"average_size_kb": avg_size_kb,
"average_size_mb": avg_size_mb,
"total_photos_analyzed": photo_count,
}
else:
retval["photo_size_stats"] = {
"average_size_bytes": 0,
"average_size_kb": 0,
"average_size_mb": 0,
"total_photos_analyzed": 0,
}
# For potential duplicate photos, we'll check for photos with the same format and size
# Note: This is not definitive - different images can have the same format and size
# but it's a good indicator of potential duplicates that might warrant investigation
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT
format,
LENGTH(data) as file_size,
COUNT(*) as count
FROM ivataraccount_photo
WHERE data IS NOT NULL
GROUP BY format, LENGTH(data)
HAVING COUNT(*) > 1
ORDER BY count DESC
LIMIT 10
"""
)
duplicate_groups = cursor.fetchall()
total_potential_duplicate_photos = sum(
group[2] for group in duplicate_groups
)
# Convert to list of dictionaries for JSON serialization
duplicate_groups_detail = [
{"format": group[0], "file_size": group[1], "count": group[2]}
for group in duplicate_groups
]
retval["potential_duplicate_photos"] = {
"potential_duplicate_groups": len(duplicate_groups),
"total_potential_duplicate_photos": total_potential_duplicate_photos,
"potential_duplicate_groups_detail": duplicate_groups_detail,
"note": "Potential duplicates are identified by matching file format and size - not definitive duplicates",
}
return JsonResponse(retval)
# Thread-safe version cache - cached indefinitely since container restarts on changes
_version_cache = None
_version_cache_lock = threading.Lock()
def _get_git_info_from_files():
"""
Safely extract git information from .git files without subprocess calls
"""
try:
# Get the project root directory
project_root = path.dirname(path.dirname(path.abspath(__file__)))
git_dir = path.join(project_root, ".git")
if not path.exists(git_dir):
return None
# Read HEAD to get current branch/commit
head_file = path.join(git_dir, "HEAD")
if not path.exists(head_file):
return None
with open(head_file) as f:
head_content = f.read().strip()
# Parse HEAD content
if head_content.startswith("ref: "):
# We're on a branch
branch_ref = head_content[5:] # Remove 'ref: '
branch_name = path.basename(branch_ref)
# Read the commit hash from the ref
ref_file = path.join(git_dir, branch_ref)
if path.exists(ref_file):
with open(ref_file) as f:
commit_hash = f.read().strip()
else:
return None
else:
# Detached HEAD state
commit_hash = head_content
branch_name = "detached"
# Try to get commit date from git log file (if available)
# Optimize: read only the last line instead of entire file
commit_date = None
log_file = path.join(git_dir, "logs", "HEAD")
if path.exists(log_file):
try:
with open(log_file, "rb") as f:
# Seek to end and read backwards to find last line
f.seek(0, 2) # Seek to end
file_size = f.tell()
# Read backwards in chunks to find the last line
chunk_size = min(1024, file_size)
f.seek(max(0, file_size - chunk_size))
chunk = f.read().decode("utf-8", errors="ignore")
# Find the last non-empty line
lines = chunk.split("\n")
last_line = None
for line in reversed(lines):
if line.strip():
last_line = line.strip()
break
if last_line:
# Git log format:
# The format uses spaces, not tabs
parts = last_line.split()
if len(parts) >= 6:
# Extract timestamp and convert to readable date
# Format:
# We need to find the timestamp which is after the author email
for i, part in enumerate(parts):
if part.isdigit() and len(part) == 10: # Unix timestamp
import datetime
timestamp = int(part)
commit_date = datetime.datetime.fromtimestamp(
timestamp
).strftime("%Y-%m-%d %H:%M:%S %z")
break
except (ValueError, IndexError, UnicodeDecodeError):
pass
# Fallback: try to get date from commit object if available
if not commit_date and len(commit_hash) == 40:
try:
commit_dir = path.join(git_dir, "objects", commit_hash[:2])
commit_file = path.join(commit_dir, commit_hash[2:])
if path.exists(commit_file):
# This would require decompressing the git object, which is complex
# For now, we'll use a placeholder
commit_date = "unknown"
except Exception:
commit_date = "unknown"
# Get deployment date from file modification time
# Use manage.py as it's always updated during deployment
deployment_date = None
manage_py_path = path.join(project_root, "manage.py")
if path.exists(manage_py_path):
try:
import datetime
mtime = path.getmtime(manage_py_path)
deployment_date = datetime.datetime.fromtimestamp(mtime).strftime(
"%Y-%m-%d %H:%M:%S %z"
)
except Exception:
deployment_date = "unknown"
return {
"commit_hash": commit_hash,
"short_hash": commit_hash[:7] if len(commit_hash) >= 7 else commit_hash,
"branch": branch_name,
"commit_date": commit_date or "unknown",
"deployment_date": deployment_date or "unknown",
"deployment_status": "active",
"version": f"{branch_name}-{commit_hash[:7] if len(commit_hash) >= 7 else commit_hash}",
}
except Exception as exc:
logger.warning(f"Failed to read git info from files: {exc}")
return None
def _get_cached_version_info():
"""
Get cached version information, loading it if not available
Since containers restart on content changes, cache indefinitely
"""
global _version_cache
with _version_cache_lock:
if _version_cache is None:
# Get version info from git files
_version_cache = _get_git_info_from_files()
# If that fails, return error
if _version_cache is None:
_version_cache = {
"error": "Unable to determine version - .git directory not found",
"deployment_status": "unknown",
}
return _version_cache
class DeploymentVersionView(View):
"""
View to return deployment version information for CI/CD verification
Uses cached version info to prevent DDoS attacks and improve performance
"""
def get(self, request, *args, **kwargs):
"""
Return cached deployment version information
"""
version_info = _get_cached_version_info()
if "error" in version_info:
return JsonResponse(version_info, status=500)
return JsonResponse(version_info)