-# Generated by Django 6.0.4 on 2026-04-24 11:37
+# Generated by Django 6.0.4 on 2026-04-24 13:34
import django.db.models.deletion
from django.db import migrations, models
-# Generated by Django 6.0.4 on 2026-04-24 11:37
+# Generated by Django 6.0.4 on 2026-04-24 13:34
import django.db.models.deletion
import pgtrigger.compiler
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="dee2cb1106ec730d60f5af5665fee20ddc1db488",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"posted_ts":\' || OLD."posted_ts" || \',"edited_ts":\' || OLD."edited_ts" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \',"text":\' || OLD."text" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="4d9a92efa5e09a6f1b6133b0058532a81f154a19",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_e3727",
table="channel_channelmessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="24860285f07bfec230d8fe91e1e5f1e27dbf4ab5",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"posted_ts":\' || NEW."posted_ts" || \',"edited_ts":\' || NEW."edited_ts" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \',"text":\' || NEW."text" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="53d66e09ee2f16df9a76df7c84d04f03c3899ecb",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_a5e85",
table="channel_channelmessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="2fe0eac7d74107a4b22294668b0a01c0fd4b9c5b",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"added_ts":\' || OLD."added_ts" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="bdcd7bfc868f23b48db6af8414d7bfb0b607149d",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_d1dad",
table="channel_channeluser",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="5122e638478843f6f34a2c7af368ee2f7734efdd",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"added_ts":\' || NEW."added_ts" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="2c178a72a40e11e6ea1f2fca4b4e94fe0b590277",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_4abda",
table="channel_channeluser",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"name":\' || OLD."name" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="dd1c4a527a14046c19cd2d260336f61c954091bc",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"created_ts":\' || OLD."created_ts" || \',"name":\' || OLD."name" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="d5bc46053d7e825d228f68ce90aa068a3bf33458",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_71f70",
table="channel_channel",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"name":\' || NEW."name" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="80f6d071b8557fbda0cc1bc2483b8876186fbe94",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"created_ts":\' || NEW."created_ts" || \',"name":\' || NEW."name" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="0ca4fae492403b7b12267933aadc6c8f8cc325ef",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_0f4bc",
table="channel_channel",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"sender_id":\' || OLD."sender_id" || \',"recipient_id":\' || OLD."recipient_id" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="54f02ee6495116653d2d634032e42e3ac3d01712",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"posted_ts":\' || OLD."posted_ts" || \',"edited_ts":\' || OLD."edited_ts" || \',"sender_id":\' || OLD."sender_id" || \',"recipient_id":\' || OLD."recipient_id" || \',"text":\' || OLD."text" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="80d30fe7e136d5e9fb35eb4a4d8f8e05080d7868",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_5cdfa",
table="channel_privatemessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"sender_id":\' || NEW."sender_id" || \',"recipient_id":\' || NEW."recipient_id" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="c49996d1cd9f313f657e55f87b3588d2b68ba81b",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"posted_ts":\' || NEW."posted_ts" || \',"edited_ts":\' || NEW."edited_ts" || \',"sender_id":\' || NEW."sender_id" || \',"recipient_id":\' || NEW."recipient_id" || \',"text":\' || NEW."text" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="168788f9d00ca7895225db105540b91b8919ad66",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_4c85f",
table="channel_privatemessage",
text = TextField()
class Meta:
- triggers = [*chat_channel(("id", "sender_id", "recipient_id"))]
+ triggers = [
+ *chat_channel(
+ ("id", "posted_ts", "edited_ts", "sender_id", "recipient_id", "text")
+ )
+ ]
class Channel(Model):
users = ManyToManyField(User, related_name="channels", through="ChannelUser")
class Meta:
- triggers = [*chat_channel(("id", "name"))]
+ triggers = [*chat_channel(("id", "created_ts", "name"))]
class ChannelUser(Model):
channel = ForeignKey(Channel, on_delete=CASCADE, editable=False)
class Meta:
- triggers = [*chat_channel(("id", "user_id", "channel_id"))]
+ triggers = [*chat_channel(("id", "added_ts", "user_id", "channel_id"))]
class ChannelMessage(Model):
text = TextField()
class Meta:
- triggers = [*chat_channel(("id", "user_id", "channel_id"))]
+ triggers = [
+ *chat_channel(
+ ("id", "posted_ts", "edited_ts", "user_id", "channel_id", "text")
+ )
+ ]
--- /dev/null
+from rest.serializers import ModelSerializer
+from .models import Channel, PrivateMessage
+
+
+class PrivateMessageSerializer(ModelSerializer):
+ model = PrivateMessage
+ fields = [
+ "id",
+ "url",
+ "posted_ts",
+ "edited_ts",
+ "sender_id",
+ "recipient_id",
+ "text",
+ ]
+
+ def from_json(self, data, instance=None):
+ if instance is None:
+ data.update({"sender_id": self.request.user.pk})
+ data.pop("posted_ts", None)
+ data.pop("edited_ts", None)
+ return super().from_json(data, instance)
+
+
+class ChannelSerializer(ModelSerializer):
+ model = Channel
+ fields = ["id", "url", "created_ts", "name", "users"]
+
+ def from_json(self, data, instance=None):
+ instance, m2m = super().from_json(data, instance)
+ data.pop("created_ts", None)
+ if instance.pk is None:
+ if "users" not in m2m:
+ m2m["users"] = []
+ if self.request.user.pk not in m2m["users"]:
+ m2m["users"].append(self.request.user.pk)
+ return instance, m2m
--- /dev/null
+body {
+ display: grid;
+ grid-template-columns: 150px 1fr;
+}
+
+main {
+ flex: 1;
+ display: flex;
+ flex-direction: column;
+}
+
+div.messages {
+ flex: 1;
+ overflow-y: auto;
+ padding: 20px;
+}
+
+div.messages-footer {
+ display: flex;
+ gap: 1em;
+ align-items: flex-start;
+ height: 10dvh;
+}
+
+div.messages-footer textarea {
+ flex: 1;
+ resize: none;
+ height: calc(100% - 4px);
+}
{% extends "chat/base.html" %}
-{% block main %}
- <h1>{{ title }}</h1>
+{% load static %}
+{% block head %}
+ {{ block.super }}
+ <link rel="stylesheet" href="{% static 'channel/styles.css' %}">
+ <script src="{% static 'chat/chatutils.js' %}"></script>
+{% endblock head %}
+{% block header %}
{% if messages %}
<ul class="messages">
{% for message in messages %}
{% endfor %}
</ul>
{% endif %}
- <form action="{% url 'user-logout' %}">
- <button>Log Out</button>
- </form>
+ <nav>
+ <div class="profile">Profile</div>
+ <div class="channel-list">
+ <h3>Channel list</h3>
+ </div>
+ <div class="user-list">
+ <h3>User list</h3>
+ </div>
+ </nav>
+{% endblock header %}
+{% block main %}
+ <div class="messages-header">
+ <form action="{% url 'user-logout' %}">
+ <button>Log Out</button>
+ </form>
+ </div>
+ <div class="messages"></div>
+ <div class="messages-footer">
+ <textarea></textarea>
+ <button>Send</button>
+ </div>
{% endblock main %}
async_to_sync(lambda: func_or_coro)()
else:
func_or_coro(*args, **kwargs)
+ elif settings.TESTING:
+ func_or_coro(*args, **kwargs)
else:
raise RuntimeError(
"AsyncBridge loop not initialized (Uvicorn not running)."
BASE_DIR = Path(__file__).resolve().parents[1]
SECRET_KEY = os.environ["SECRET_KEY"]
-DEBUG = bool(int(os.environ.get("DEBUG", "1")))
-ALLOWED_HOSTS = [os.environ.get("ALLOWED_HOST") or "localhost"]
-ADMINS = [os.environ["ADMIN_EMAIL"]]
+DEBUG = os.getenv("DEBUG", "1").lower() in ("1", "true", "t")
+ALLOWED_HOSTS = os.getenv("ALLOWED_HOST", "localhost").split(",")
+ADMINS = os.environ["ADMIN_EMAIL"].split(",")
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.messages",
"django.contrib.staticfiles",
"pgtrigger",
- "channel",
"chat",
+ "rest",
+ "channel",
"user",
]
MIDDLEWARE = [
"django.contrib.auth.password_validation.UserAttributeSimilarityValidator"
),
},
- {
- "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
- },
{
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
"OPTIONS": {"min_length": 9},
},
+ {
+ "NAME": "user.password_validation.ComplexityValidator",
+ },
]
LANGUAGE_CODE = "en-us"
},
}
+DEFAULT_PAGE_SIZE = 20
+
try:
from .settings_local import * # noqa: F403
except ImportError:
--- /dev/null
+from .settings import * # noqa
+
+TESTING = True
+PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"]
--- /dev/null
+* {
+ margin: 0;
+ padding: 0;
+}
+
+html, body, main {
+ height: 100%;
+}
<meta name="description" content="{{ meta_description }}">
<meta name="keywords" content="{{ meta_keywords }}">
{% endblock meta %}
- {# <link rel="stylesheet" href="{% static 'chat/style.css' %}"> #}
+ <link rel="stylesheet" href="{% static 'chat/styles.css' %}">
<title>
{% block title %}
- Title
+ {{ title|default:'Untitled' }}
{% endblock title %}
</title>
{% endblock head %}
--- /dev/null
+from asyncio import new_event_loop, set_event_loop
+from tempfile import SpooledTemporaryFile
+from threading import Thread
+from time import sleep
+
+from django.test import TestCase
+from .bridge import bridge
+
+
+class AsyncBridgeTest(TestCase):
+ @staticmethod
+ def write_add(fh, a, b):
+ fh.write(str(a + b))
+
+ @staticmethod
+ async def async_write_add(fh, a, b):
+ fh.write(str(a + b))
+
+ @staticmethod
+ def run_loop(loop):
+ set_event_loop(loop)
+ loop.run_forever()
+
+ def setUp(self):
+ bridge.loop = new_event_loop()
+ self._loop_thread = Thread(
+ target=self.run_loop, args=(bridge.loop,), daemon=True
+ )
+ self._loop_thread.start()
+
+ _loop_thread = None
+
+ def tearDown(self):
+ bridge.loop.call_soon_threadsafe(bridge.loop.stop)
+ self._loop_thread.join()
+ bridge.loop.close()
+ bridge.loop = None
+
+ def test_async_bridge(self):
+ with SpooledTemporaryFile(mode="wt") as fh:
+ for callback in (self.write_add, self.async_write_add):
+ bridge.call_async(callback, fh, 5, 7)
+ while True:
+ fh.seek(0)
+ data = fh.read()
+ if data:
+ break
+ sleep(1 / 32)
+ self.assertEqual(int(data), 5 + 7)
+ fh.seek(0)
+ fh.truncate()
from django.conf import settings
from django.contrib import admin
from django.contrib.auth.decorators import login_not_required
+from django.contrib.staticfiles.views import serve
from django.contrib.staticfiles.urls import static
from django.urls import include, path
from django.views.generic import RedirectView
path("user/", include("user.urls")),
path("channel/", include("channel.urls")),
path("admin/", admin.site.urls),
+ path("api/", include("rest.urls")),
+ # rely on DEBUG checks both in static and serve
+ *static(settings.STATIC_URL, login_not_required(serve)),
]
-
-if settings.DEBUG:
- urlpatterns.extend(static(settings.STATIC_URL))
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
-"POT-Creation-Date: 2026-04-22 11:54+0000\n"
+"POT-Creation-Date: 2026-05-01 09:44+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
-#: user/email.py:26
-msgid "Password reset request"
+#: user/email.py:29
+#, python-brace-format
+msgid ""
+"Hello {username}\n"
+"\n"
+"Your user has been created. Please reset your password:\n"
+"\n"
+"{url}\n"
+"\n"
+"yours,\n"
+"{host}"
msgstr ""
-#: user/email.py:28
+#: user/email.py:35
+#, python-brace-format
msgid ""
-"Hello\n"
+"Hello {username}\n"
"\n"
-"Someone that is hopefully you, has requested a password request link:\n"
+"Someone that is hopefully you, has requested a password reset link:\n"
"\n"
+"{url}\n"
+"\n"
+"yours,\n"
+"{host}"
+msgstr ""
+
+#: user/email.py:40
+#, python-brace-format
+msgid "{host}: Password reset request"
msgstr ""
-#: user/email.py:47
-msgid "Chat Registration"
+#: user/email.py:58
+#, python-brace-format
+msgid "{host}: Confirm your email address"
msgstr ""
-#: user/email.py:48
+#: user/email.py:60
+#, python-brace-format
+msgid ""
+"Hello {username}\n"
+"\n"
+"Someone that is hopefully you, has filled in the registration form.\n"
+"Please click this link to confirm your email address:\n"
+"\n"
+"{url}\n"
+"\n"
+"yours,\n"
+"{host}"
+msgstr ""
+
+#: user/email.py:82
+msgid "Chat Registration Request"
+msgstr ""
+
+#: user/email.py:83
#, python-brace-format
msgid ""
"username: {username}\n"
+"email: {email}\n"
"message: {message}\n"
"\n"
"{url}"
msgid "Message to admins"
msgstr ""
-#: user/views.py:60
-msgid "Registration email submitted."
+#: user/password_validation.py:9
+msgid "The password must contain at least 1 uppercase letter."
msgstr ""
-#: user/views.py:76
-msgid "If this user exists, an email is currently being sent."
+#: user/password_validation.py:14
+msgid "The password must contain at least 1 lowercase letter."
msgstr ""
-#: user/views.py:122
-msgid "User was already created."
+#: user/password_validation.py:19
+msgid "The password must contain at least 1 digit."
+msgstr ""
+
+#: user/password_validation.py:25
+msgid ""
+"Your password must contain at least 1 uppercase letter, 1 lowercase letter, "
+"and 1 digit."
+msgstr ""
+
+#: user/views.py:45
+msgid "Register"
+msgstr ""
+
+#: user/views.py:70
+msgid ""
+"Please confirm your email address using the link that's been sent to your "
+"email address."
+msgstr ""
+
+#: user/views.py:84
+msgid "Link expired"
+msgstr ""
+
+#: user/views.py:86
+msgid "Invalid link"
+msgstr ""
+
+#: user/views.py:89
+msgid "Registration was filed with the admins."
+msgstr ""
+
+#: user/views.py:100
+#, python-brace-format
+msgid "Username ({username}) or email ({email}) already exists."
+msgstr ""
+
+#: user/views.py:106
+msgid "User has been successfully created."
+msgstr ""
+
+#: user/views.py:115
+msgid "Request Password Reset"
msgstr ""
#: user/views.py:125
-msgid "User was successfully created."
+msgid "If this user exists, an email is currently being sent."
+msgstr ""
+
+#: user/views.py:155 user/views.py:161
+msgid "Token validation failed"
msgstr ""
def main():
"""Run administrative tasks."""
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "chat.settings")
+ if "test" in sys.argv:
+ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "chat.settings_test")
+ else:
+ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "chat.settings")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
[tool.djlint]
max_line_length = 88
+
+[tool.ruff.lint]
+select = ["E", "F", "INT"]
+
+[tool.ruff.lint.per-file-ignores]
+"**/migrations/*.py" = ["E501", "D101"]
--- /dev/null
+from django.apps import AppConfig
+
+
+class RestConfig(AppConfig):
+ name = "rest"
--- /dev/null
+from datetime import datetime
+from typing import Type
+
+from django.db.models import (
+ DateTimeField,
+ ForeignKey,
+ ManyToManyField,
+ ManyToManyRel,
+ Model,
+)
+from django.urls import reverse
+
+
+class ModelSerializer:
+ model: Type[Model]
+ fields = ["id", "url"]
+
+ def __init__(self, request):
+ self.request = request
+
+ def field_to_json(self, field_name, instance):
+ if field_name == "url":
+ name = self.model._meta.verbose_name.lower().replace(" ", "")
+ value = reverse(f"{name}-detail", args=[instance.pk])
+ if self.request:
+ value = self.request.build_absolute_uri(value)
+ return value
+ field = self.model._meta.get_field(field_name)
+ value = getattr(instance, field_name)
+ if isinstance(field, DateTimeField):
+ value = value.isoformat(timespec="milliseconds")
+ elif isinstance(field, (ManyToManyField, ManyToManyRel)):
+ value = [v.pk for v in value.all()]
+ return value
+
+ def to_json(self, instance):
+ return {key: self.field_to_json(key, instance) for key in self.fields}
+
+ def from_json(self, data, instance=None):
+ if instance is None:
+ instance = self.model()
+ m2m = {}
+ for field_name, value in data.items():
+ if field_name not in self.fields or field_name in ("id", "url"):
+ continue
+ field = self.model._meta.get_field(field_name)
+ if isinstance(field, DateTimeField):
+ value = datetime.fromisoformat(value)
+ if isinstance(field, (ManyToManyField, ManyToManyRel)):
+ m2m[field_name] = value
+ continue
+ if isinstance(field, ForeignKey):
+ # disallow reassigning FKs on update
+ if instance.pk is not None:
+ raise ValueError(f"Not allowed to update: {field_name}")
+ if not field_name.endswith("_id") or not isinstance(value, int):
+ raise KeyError(f"Use fk_id fields with an id: {field_name}")
+ setattr(instance, field_name, value)
+ return instance, m2m
--- /dev/null
+from django.urls import path
+
+
+urlpatterns = []
+
+
+def get_urls(view_class, name, *extra_patterns):
+ urlpatterns.extend(
+ (
+ path(
+ f"{name}/",
+ view_class.as_view(method_map={"get": "list", "post": "create"}),
+ name=f"api-{name}-list",
+ ),
+ path(
+ f"{name}/<int:id>/",
+ view_class.as_view(
+ method_map={"get": "detail", "put": "update", "delete": "delete"}
+ ),
+ name=f"api-{name}-detail",
+ ),
+ *extra_patterns,
+ )
+ )
--- /dev/null
+import json
+from functools import cached_property
+from typing import Type
+from urllib.parse import urlunsplit
+
+from django.conf import settings
+from django.contrib.auth import logout
+from django.db.models import QuerySet
+from django.http import HttpResponse
+from django.urls import reverse
+from django.utils.decorators import method_decorator
+from django.views import View
+from django.views.decorators.csrf import csrf_exempt
+
+from .serializers import ModelSerializer
+
+
+@method_decorator(csrf_exempt, name="dispatch")
+class ModelRestView(View):
+ http_method_names = []
+ list_all = False
+ method_map = {}
+
+ serializer: Type[ModelSerializer]
+
+ def get_queryset(self):
+ return QuerySet(self.serializer.model).all()
+
+ safe_chars = "/#%[]=:;$&()+,!?*@'~"
+
+ def paginate(self, queryset, to_json):
+ query = self.request.GET.copy()
+ if "before" in query:
+ queryset = queryset.filter(pk__lt=int(query["before"]))
+ count = queryset.count()
+ if "since" in query:
+ queryset = queryset.filter(pk__gte=int(query["since"]))
+ page_size = queryset.count()
+ elif not "list_all":
+ page_size = settings.DEFAULT_PAGE_SIZE
+ queryset = queryset[max(count - page_size, 0) :]
+ else:
+ page_size = count
+ if count > page_size:
+ query["before"] = queryset[0].pk
+ prev_url = self.request.build_absolute_uri(
+ urlunsplit(
+ ("", "", self.request.path, query.urlencode(self.safe_chars), "")
+ ),
+ )
+ else:
+ prev_url = None
+
+ return {
+ "previous": prev_url,
+ "result": [to_json(instance) for instance in queryset],
+ }
+
+ def get_object(self):
+ return self.get_queryset().get(pk=self.kwargs["id"])
+
+ @staticmethod
+ def get_json_dump_kwargs():
+ if settings.DEBUG: # pragma: no cover
+ return {"indent": 4}
+ return {}
+
+ def list(self):
+ serializer = self.serializer(self.request)
+ return HttpResponse(
+ json.dumps(
+ self.paginate(self.get_queryset(), serializer.to_json),
+ **self.get_json_dump_kwargs(),
+ ),
+ content_type="application/json",
+ )
+
+ def create(self):
+ return self.create_or_update(status=201)
+
+ def detail(self):
+ return HttpResponse(
+ json.dumps(
+ self.serializer(self.request).to_json(self.get_object()),
+ **self.get_json_dump_kwargs(),
+ ),
+ content_type="application/json",
+ )
+
+ def update(self):
+ return self.create_or_update(self.get_object())
+
+ def create_or_update(self, *args, **kwargs):
+ serializer = self.serializer(self.request)
+ instance, m2m = serializer.from_json(json.load(self.request), *args)
+ instance.save()
+ for field_name, value in m2m.items():
+ getattr(instance, field_name).set(value)
+ if settings.REST_CREATE_UPDATE_RETURN_RESULT:
+ return HttpResponse(
+ json.dumps(serializer.to_json(instance)),
+ content_type="application/json",
+ **kwargs,
+ )
+ return HttpResponse(status=204) # pragma: no cover
+
+ def delete(self):
+ self.get_object().delete()
+ return HttpResponse(status=204)
+
+ @cached_property
+ def action(self):
+ return self.method_map[self.request.method.lower()]
+
+ def dispatch(self, request, *args, **kwargs):
+ if not request.user.is_authenticated:
+ return self.handle_no_permission()
+ try:
+ return getattr(self, self.action)()
+ except Exception as e:
+ return HttpResponse(
+ json.dumps({type(e).__name__: e.args}),
+ content_type="application/json",
+ status=500,
+ )
+
+ def handle_no_permission(self):
+ logout(self.request)
+ return HttpResponse(
+ json.dumps({"Location": reverse("login")}),
+ content_type="application/json",
+ status=401,
+ )
)
parser.add_argument("files", nargs="+")
- print("p", sys.argv, file=sys.stderr)
args = parser.parse_args()
- print("q", args, file=sys.stderr)
returncode = 0
with (
get_jslint_js(args.jslint, args.upgrade) as jslint,
#!/usr/bin/env bash
-export PYTHON="${PYTHON:-python}"
+set -a
+PYTHON="${PYTHON:-python}"
"${PYTHON}" -m venv .venv
. .venv/bin/activate
if [[ -r .env ]]; then
- set -a
. .env
- set +a
fi
-export CFLAGS="$(
+CFLAGS="$(
python -c 'import sysconfig; print(sysconfig.get_config_var("CFLAGS"))'
) -O2"
-"${PYTHON}" -m pip install -U --no-binary \
+set +a
+pip_args=(-U --no-binary :all:)
+# avoid attempting network connections when the network is measurably down
+if [[ -z "$(ip route show default)" ]]; then
+ pip_args+=(--no-index --no-build-isolation)
+fi
+PIP_RETRIES=2 PIP_TIMEOUT=2 "${PYTHON}" -m pip install "${pip_args[@]}" \
beautysh coverage django-pgtrigger django-stubs djlint gunicorn \
pip pre-commit 'psycopg[c]' ruff 'uvicorn[standard]'
from django.conf import settings
from django.contrib.auth.tokens import default_token_generator
-from django.core.mail import EmailMultiAlternatives, send_mail, get_connection
+from django.core.mail import EmailMultiAlternatives, get_connection, send_mail
+from django.core import signing
from django.urls import reverse
from django.utils.encoding import force_bytes
from django.utils.translation import gettext_lazy as _
from .models import User
-def send_password_reset_email(request, username_or_email):
+def send_password_reset_email(request, username_or_email, created=False):
try:
user = User.objects.get_by_natural_key(username_or_email)
except User.DoesNotExist:
return
+ host = request.get_host()
relative_url = reverse(
"user-reset-password",
query={
"uidb64": urlsafe_b64encode(force_bytes(user.pk)).decode(),
},
)
+ if created:
+ msg = _(
+ "Hello {username}\n\n"
+ "Your user has been created. Please reset your password:\n\n"
+ "{url}\n\nyours,\n{host}"
+ )
+ else:
+ msg = _(
+ "Hello {username}\n\n"
+ "Someone that is hopefully you, has requested a password reset link:\n\n"
+ "{url}\n\nyours,\n{host}"
+ )
send_mail(
- _("Password reset request"),
- _(
- "Hello\n\n"
- "Someone that is hopefully you, has requested a password request link:\n\n"
- f"{request.build_absolute_uri(relative_url)}\n\n"
- "yours,\n"
- f"{settings.ALLOWED_HOSTS[0]}"
+ _("{host}: Password reset request").format(host=host),
+ msg.format(
+ host=host,
+ url=request.build_absolute_uri(relative_url),
+ username=user.username,
),
None,
[user.email],
)
-def send_register_email(request, data):
+def send_register_confirmation_email(request, form_data):
+ relative_url = reverse(
+ "user-confirm-registration",
+ query={"token": signing.dumps(form_data)},
+ )
+ host = request.get_host()
+ send_mail(
+ _("{host}: Confirm your email address").format(host=host),
+ _(
+ "Hello {username}\n\n"
+ "Someone that is hopefully you, has filled in the registration form.\n"
+ "Please click this link to confirm your email address:\n\n"
+ "{url}\n\nyours,\n{host}"
+ ).format(
+ host=host,
+ url=request.build_absolute_uri(relative_url),
+ username=form_data["username"],
+ ),
+ None,
+ [form_data["email"]],
+ )
+
+
+def send_registration_email(request, form_data):
url = request.build_absolute_uri(
reverse(
"user-create",
- query={key: value for key, value in data.items() if key != "message"},
+ query={key: value for key, value in form_data.items() if key != "message"},
)
)
return EmailMultiAlternatives(
- _("Chat Registration"),
- _("username: {username}\nmessage: {message}\n\n{url}").format(**data, url=url),
+ _("Chat Registration Request"),
+ _("username: {username}\nemail: {email}\nmessage: {message}\n\n{url}").format(
+ **form_data, url=url
+ ),
None,
settings.ADMINS,
connection=get_connection(),
- reply_to=[data["email"]],
+ reply_to=[form_data["email"]],
).send()
-# Generated by Django 6.0.4 on 2026-04-24 11:37
+# Generated by Django 6.0.4 on 2026-04-24 13:34
import django.contrib.auth.validators
import django.utils.timezone
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"username":\' || OLD."username" || \',"email":\' || OLD."email" || \',"first_name":\' || OLD."first_name" || \',"last_name":\' || OLD."last_name" || \',"date_joined":\' || OLD."date_joined" || \',"channels":\' || OLD."channels" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="028118469da41b01d89a7189ba91aab95db1c53f",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"username":\' || OLD."username" || \',"email":\' || OLD."email" || \',"first_name":\' || OLD."first_name" || \',"last_name":\' || OLD."last_name" || \',"date_joined":\' || OLD."date_joined" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="d9fa857efc39e0e22a104e69363f4cfaecada94a",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_a66e5",
table="user_user",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"username":\' || NEW."username" || \',"email":\' || NEW."email" || \',"first_name":\' || NEW."first_name" || \',"last_name":\' || NEW."last_name" || \',"date_joined":\' || NEW."date_joined" || \',"channels":\' || NEW."channels" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="225c79a6c8ca8556fcad9b1bd9826fd90172f63a",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"username":\' || NEW."username" || \',"email":\' || NEW."email" || \',"first_name":\' || NEW."first_name" || \',"last_name":\' || NEW."last_name" || \',"date_joined":\' || NEW."date_joined" || \'}}\'\n );\n RETURN NULL;\n ',
+ hash="bc33ad5f18bff543d5524d33e48f447ab5b9e997",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_72ed2",
table="user_user",
--- /dev/null
+# Generated by Django 6.0.4 on 2026-04-29 07:10
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+ dependencies = [
+ ("user", "0001_initial"),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name="user",
+ name="last_password_change",
+ field=models.DateTimeField(blank=True, null=True),
+ ),
+ ]
+from datetime import datetime, timedelta, UTC
+
from django.contrib.auth.models import AbstractUser, UserManager as DjangoUserManager
-from django.db.models import Q, QuerySet
+from django.db.models import DateTimeField, QuerySet
+from django.utils.http import base36_to_int
+from django.utils.timezone import now
from chat.triggers import chat_channel
class UserManager(DjangoUserManager):
def _get_by_natural_key(self, username, queryset_method):
- return queryset_method(
- self.filter(is_active=True),
- Q(**{self.model.USERNAME_FIELD: username})
- | Q(**{self.model.EMAIL_FIELD: username}),
- )
+ queryset = self.filter(is_active=True)
+ try:
+ return queryset_method(queryset, **{self.model.USERNAME_FIELD: username})
+ except self.model.DoesNotExist:
+ pass
+ return queryset_method(queryset, **{self.model.EMAIL_FIELD: username})
def get_by_natural_key(self, username):
return self._get_by_natural_key(username, QuerySet.get)
class User(AbstractUser):
+ last_password_change = DateTimeField(null=True, blank=True)
objects = UserManager()
class Meta:
triggers = [
*chat_channel(
- (
- "id",
- "username",
- "email",
- "first_name",
- "last_name",
- "date_joined",
- "channels",
- )
+ ("id", "username", "email", "first_name", "last_name", "date_joined")
)
]
+ def set_password(self, raw_password):
+ self.last_password_change = now()
+ return super().set_password(raw_password)
+
def is_privileged(self):
return self.is_staff or self.is_superuser
+
+ def get_full_email(self):
+ if self.first_name or self.last_name:
+ return " ".join(
+ filter(None, (self.first_name, self.last_name, f"<{self.email}>"))
+ )
+ return self.email
+
+ def check_token(self, token):
+ if self.last_password_change is None:
+ return True
+ return (pos := token.find("-")) > 0 and (
+ datetime(2001, 1, 1, tzinfo=UTC)
+ + timedelta(seconds=base36_to_int(token[:pos]))
+ > self.last_password_change
+ )
--- /dev/null
+from django.core.exceptions import ValidationError
+from django.utils.translation import gettext_lazy as _
+
+
+class ComplexityValidator:
+ def validate(self, password, user=None):
+ if not any(char.isupper() for char in password):
+ raise ValidationError(
+ _("The password must contain at least 1 uppercase letter."),
+ code="password_no_upper",
+ )
+ if not any(char.islower() for char in password):
+ raise ValidationError(
+ _("The password must contain at least 1 lowercase letter."),
+ code="password_no_lower",
+ )
+ if not any(char.isdigit() for char in password):
+ raise ValidationError(
+ _("The password must contain at least 1 digit."),
+ code="password_no_number",
+ )
+
+ def get_help_text(self):
+ return _(
+ "Your password must contain at least 1 uppercase letter, "
+ "1 lowercase letter, and 1 digit."
+ )
--- /dev/null
+from rest.serializers import ModelSerializer
+
+from .models import User
+
+
+class UserSerializer(ModelSerializer):
+ model = User
+ fields = [
+ "id",
+ "url",
+ "username",
+ "email",
+ "first_name",
+ "last_name",
+ "date_joined",
+ "channels",
+ ]
+
+ def to_json(self, instance):
+ result = super().to_json(instance)
+ result["is_authenticated"] = result["id"] == self.request.user.pk
+ return result
--- /dev/null
+form {
+ display: grid;
+ grid-template-columns: [label-col] max-content [input-col] 1fr;
+ gap: 1em;
+ width: 50%;
+}
+
+form p {
+ display: grid;
+ grid-template-columns: subgrid;
+ grid-column: 1 / -1;
+}
+
+form p label {
+ grid-column: label-col;
+}
+
+form p input {
+ grid-column: input-col;
+}
+
+form p span {
+ font-size: 0.85rem;
+ color: #666;
+ grid-column: input-col;
+}
{% extends "chat/base.html" %}
-{% block title %}
- {{ title }}
-{% endblock title %}
+{% load static %}
+{% block head %}
+ {{ block.super }}
+ <link rel="stylesheet" href="{% static 'user/styles.css' %}">
+{% endblock head %}
{% block main %}
<h1>{{ title }}</h1>
{% if messages %}
+from base64 import b64encode
+from urllib.parse import parse_qsl, urlsplit
+
+from django.conf import settings
+from django.contrib.auth.tokens import default_token_generator
+from django.contrib.messages import get_messages, INFO
+from django.core import mail
+from django.db.models import Q
+from django.http import HttpResponseRedirect
+from django.test import TestCase, override_settings
+from django.urls import reverse
+
+from .models import User
+
+# test all user-related stuff here
+# - login
+# - user rest view
+
+
+@override_settings(DEFAULT_FROM_EMAIL="noreply@testserver.com")
+class PasswordResetTest(TestCase):
+ @staticmethod
+ def messages(request):
+ return [(message.level, message.message) for message in get_messages(request)]
+
+ def check_reset_password_response(self, response, expected_outbox, username=None):
+ request = response.wsgi_request
+ self.assertEqual(
+ self.messages(request),
+ [(INFO, "If this user exists, an email is currently being sent.")],
+ )
+ self.client.get(response.headers["Location"])
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(response.headers["Location"], "/user/login")
+ self.assertEqual(response.headers["Content-Length"], "0")
+ self.assertEqual(response.content, b"")
+ self.assertEqual(len(mail.outbox), len(expected_outbox))
+ if not (outbox := mail.outbox.copy()):
+ return
+ mail.outbox.clear()
+ prefix = (
+ f"Hello {username}\n\n"
+ "Someone that is hopefully you, has requested a password reset link:\n\n"
+ )
+ suffix = "\n\nyours,\ntestserver"
+ for out_mail, expected_mail in zip(outbox, expected_outbox):
+ self.assertEqual(out_mail.subject, expected_mail["subject"])
+ self.assertEqual(out_mail.from_email, settings.DEFAULT_FROM_EMAIL)
+ self.assertEqual(out_mail.to, [expected_mail["user"].get_full_email()])
+ self.assertTrue(out_mail.body.startswith(prefix))
+ self.assertTrue(out_mail.body.endswith(suffix))
+ parts = urlsplit(out_mail.body[len(prefix) : -len(suffix)])
+ self.assertEqual(
+ parts[:3],
+ (request.scheme, request.get_host(), reverse("user-reset-password")),
+ )
+ self.assertEqual(parts[4], "")
+ query_params = parse_qsl(parts[3])
+ self.assertEqual(len(query_params), 2)
+ self.assertEqual(query_params[0][0], "token")
+ self.assertTrue(
+ default_token_generator.check_token(
+ expected_mail["user"], query_params[0][1]
+ )
+ )
+ self.assertEqual(
+ query_params[1],
+ ("uidb64", b64encode(str(expected_mail["user"].pk).encode()).decode()),
+ )
+
+ def test_request_password_reset(self):
+ self.assertEqual(len(mail.outbox), 0)
+ user = User.objects.create(username="herp", email="herp@derp.com")
+ url = reverse("user-request-password-reset")
+ response = self.client.get(url)
+ self.assertIn(b' name="username"', response.content)
+ response = self.client.post(url, {"username": user.username})
+ host = response.wsgi_request.get_host()
+ expected_outbox = [
+ {
+ "subject": f"{host}: Password reset request",
+ "user": user,
+ }
+ ]
+ self.check_reset_password_response(response, expected_outbox, user.username)
+ response = self.client.post(url, {"username": user.email})
+ self.check_reset_password_response(response, expected_outbox, user.username)
+ # test non-existing user
+ response = self.client.post(url, {"username": "herpderp"})
+ self.check_reset_password_response(response, [])
+
+ TEST_PASSWORDS = [
+ ("new_pw", [b" must contain at least ", b" uppercase letter."]),
+ ("newPassword", [b" too common.", b" at least 1 digit."]),
+ ("newPassword123", None),
+ ]
+
+ def test_reset_password(self):
+ user = User.objects.create(username="herp", email="herp@derp.com")
+ query_params = {
+ "token": default_token_generator.make_token(user),
+ "uidb64": b64encode(str(user.pk).encode()),
+ }
+ url = reverse("user-reset-password")
+ response = self.client.get(url, query_params)
+ self.assertIn(b' name="new_password1"', response.content)
+ self.assertIn(b' name="new_password2"', response.content)
+ response = self.client.post(
+ url, data={"new_password1": "new_password1"}, query_params=query_params
+ )
+ self.assertIn(b' name="new_password1"', response.content)
+ self.assertIn(b' name="new_password2"', response.content)
+ self.assertIn(b' id="id_new_password2_error"', response.content)
+ response = self.client.post(
+ url, data={"new_password2": "new_password2"}, query_params=query_params
+ )
+ self.assertIn(b' name="new_password1"', response.content)
+ self.assertIn(b' name="new_password2"', response.content)
+ self.assertIn(b' id="id_new_password1_error"', response.content)
+ response = self.client.post(
+ url,
+ data={"new_password1": "new_password1", "new_password2": "new_password2"},
+ query_params=query_params,
+ )
+ self.assertIn(b' name="new_password1"', response.content)
+ self.assertIn(b' name="new_password2"', response.content)
+ self.assertIn(b' id="id_new_password2_error"', response.content)
+ for password, expected_results in self.TEST_PASSWORDS:
+ response = self.client.post(
+ url,
+ data={"new_password1": password, "new_password2": password},
+ query_params=query_params,
+ )
+ if expected_results is not None:
+ for expected_result in expected_results:
+ self.assertIn(expected_result, response.content)
+ else:
+ self.assertIsInstance(response, HttpResponseRedirect)
+ self.assertEqual(
+ self.messages(response.wsgi_request),
+ [(INFO, "Password reset complete.")],
+ )
+ self.client.get(reverse("user-login"))
+ password = "YetAnotherValidPassword1234"
+ response = self.client.post(
+ url,
+ data={"new_password1": password, "new_password2": password},
+ query_params=query_params,
+ )
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(response.content, b"")
+ self.assertEqual(
+ self.messages(response.wsgi_request),
+ [(INFO, "Token validation failed")],
+ )
+
+ @override_settings(ADMINS=["admin@testserver.com"])
+ def test_register(self):
+ self.assertEqual(len(mail.outbox), 0)
+ url = reverse("user-register")
+ response = self.client.get(url)
+ self.assertIn(b' name="username"', response.content)
+ self.assertIn(b' name="email"', response.content)
+ self.assertIn(b' name="message"', response.content)
+ payload = {
+ "username": "herpderpy",
+ "email": "test@testserver.com",
+ "message": "hi",
+ }
+ response = self.client.post(url, payload)
+ self.assertEqual(
+ self.messages(response.wsgi_request),
+ [
+ (
+ INFO,
+ (
+ "Please confirm your email address using the link "
+ "that's been sent to your email address."
+ ),
+ )
+ ],
+ )
+ self.assertEqual(len(mail.outbox), 1)
+ host = response.wsgi_request.get_host()
+ self.assertEqual(mail.outbox[0].subject, f"{host}: Confirm your email address")
+ prefix = (
+ f"Hello {payload['username']}\n\n"
+ "Someone that is hopefully you, has filled in the registration form.\n"
+ "Please click this link to confirm your email address:\n\n"
+ )
+ suffix = f"\n\nyours,\n{host}"
+ self.assertTrue(mail.outbox[0].body.startswith(prefix))
+ self.assertTrue(mail.outbox[0].body.endswith(suffix))
+ self.client.get(reverse("user-login"))
+ response = self.client.get(mail.outbox[0].body[len(prefix) : -len(suffix)])
+ self.assertEqual(
+ self.messages(response.wsgi_request),
+ [(INFO, "Registration was filed with the admins.")],
+ )
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(response.content, b"")
+ self.client.get(reverse("user-login"))
+ self.assertEqual(len(mail.outbox), 2)
+ prefix = (
+ f"username: {payload['username']}\nemail: {payload['email']}\n"
+ f"message: {payload['message']}\n\n"
+ )
+ self.assertTrue(mail.outbox[1].subject, "Chat Registration Request")
+ self.assertTrue(mail.outbox[1].body.startswith(prefix))
+ self.assertEqual(mail.outbox[1].to, settings.ADMINS)
+ admin = User.objects.create(
+ username="admin",
+ email=settings.ADMINS[0],
+ is_superuser=True,
+ )
+ response = self.client.get(mail.outbox[1].body[len(prefix) :])
+ self.assertEqual(len(tuple(get_messages(response.wsgi_request))), 0)
+ self.assertEqual(response.status_code, 302)
+ self.assertTrue(
+ response.headers["Location"].startswith(f"{reverse('user-login')}?")
+ )
+ self.assertFalse(
+ User.objects.filter(
+ Q(email=payload["email"]) | Q(username=payload["username"])
+ ).exists()
+ )
+ self.client.force_login(admin)
+ response = self.client.get(mail.outbox[1].body[len(prefix) :])
+ self.assertEqual(
+ self.messages(response.wsgi_request),
+ [(INFO, "User has been successfully created.")],
+ )
+ user = User.objects.get(email=payload["email"])
+ self.assertEqual(
+ (user.username, user.email), (payload["username"], payload["email"])
+ )
+
+ def test_get_full_email(self):
+ user = User.objects.create(email="test@testserver.com")
+ self.assertEqual(user.get_full_email(), user.email)
+ user.first_name = "herp"
+ self.assertEqual(user.get_full_email(), f"{user.first_name} <{user.email}>")
+ user.last_name = "derpington"
+ self.assertEqual(
+ user.get_full_email(), f"{user.first_name} {user.last_name} <{user.email}>"
+ )
+ del user.first_name
+ self.assertEqual(user.get_full_email(), f"{user.last_name} <{user.email}>")
+
+ def is_redirect(self, response, expected_location=None):
+ self.assertEqual(response.status_code, 302)
+ if expected_location is not None:
+ self.assertEqual(response.headers["Location"], expected_location)
+
+ def test_login_logout(self):
+ user = User.objects.create(
+ email="testuser@testserver.com",
+ username="testobesto",
+ )
+ password = "Password1337"
+ user.set_password(password)
+ user.save()
+ url = reverse("user-login")
+ chat_url = reverse("channel-main")
+ self.is_redirect(self.client.get(chat_url), url)
+ response = self.client.post(
+ url,
+ {
+ "username": user.email,
+ "password": "derp",
+ },
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertIn(b"Please enter a correct Username (or email)", response.content)
+ self.is_redirect(self.client.get(chat_url), url)
+ for username in (user.email, user.username):
+ self.is_redirect(
+ self.client.post(
+ url,
+ {
+ "username": username,
+ "password": password,
+ },
+ ),
+ chat_url,
+ )
+ self.assertEqual(self.client.get(chat_url).status_code, 200)
+ self.assertEqual(self.client.get(reverse("user-logout")).status_code, 302)
+ self.is_redirect(self.client.get(chat_url), url)
from django.urls import path
-from .views import LoginView, LogoutView, RegisterView, ResetPasswordView, create_user
+from rest.urls import get_urls
+from .views import (
+ LoginView,
+ LogoutView,
+ RegisterView,
+ RequestPasswordResetView,
+ ResetPasswordView,
+ UserRestView,
+ confirm_registration_email,
+ create_user,
+)
urlpatterns = [
path("login", LoginView.as_view(), name="user-login"),
path("logout", LogoutView.as_view(), name="user-logout"),
+ path(
+ "request-password-reset",
+ RequestPasswordResetView.as_view(),
+ name="user-request-password-reset",
+ ),
path("reset-password", ResetPasswordView.as_view(), name="user-reset-password"),
path("register", RegisterView.as_view(), name="user-register"),
+ path("confirm", confirm_registration_email, name="user-confirm-registration"),
path("create", create_user, name="user-create"),
]
+
+get_urls(
+ UserRestView,
+ "user",
+ path(
+ "user/current/",
+ UserRestView.as_view(
+ method_map={"get": "detail", "put": "update", "delete": "delete"}
+ ),
+ name="api-user-current",
+ ),
+)
from django.contrib.auth.decorators import login_not_required
from django.contrib.auth.forms import SetPasswordForm
from django.contrib.auth.tokens import default_token_generator
-from django.contrib.auth.views import (
- INTERNAL_RESET_SESSION_TOKEN,
- LoginView as DjangoLoginView,
-)
+from django.contrib.auth.views import LoginView as DjangoLoginView
+from django.core import signing
+from django.db.models import Q
from django.shortcuts import redirect
from django.urls import reverse_lazy
from django.utils.decorators import method_decorator
from django.views.generic import FormView, RedirectView
from chat.bridge import bridge
-from .email import send_password_reset_email, send_register_email
+from rest.views import ModelRestView
+from .email import (
+ send_password_reset_email,
+ send_register_confirmation_email,
+ send_registration_email,
+)
from .forms import (
AuthenticationForm,
RegisterForm,
django_gettext_lazy,
)
from .models import User
+from .serializers import UserSerializer
class LoginView(DjangoLoginView):
extra_context = {
"title": "Login",
"links": {
- "Reset Password": reverse_lazy("user-reset-password"),
- "Register": reverse_lazy("user-register"),
+ django_gettext_lazy("Reset Password"): reverse_lazy(
+ "user-request-password-reset"
+ ),
+ _("Register"): reverse_lazy("user-register"),
},
}
success_url = reverse_lazy("user-login")
def form_valid(self, form):
- bridge.call_async(send_register_email, self.request, form.cleaned_data)
- messages.info(self.request, _("Registration email submitted."))
- return redirect("user-login")
+ send_register_confirmation_email(self.request, form.cleaned_data)
+ messages.info(
+ self.request,
+ _(
+ "Please confirm your email address using the link "
+ "that's been sent to your email address."
+ ),
+ )
+ return super().form_valid(form)
+
+
+@require_GET
+@login_not_required
+def confirm_registration_email(request):
+ try:
+ # max_age is in seconds (e.g., 3 hours = 10800)
+ form_data = signing.loads(request.GET["token"], max_age=10800)
+ except signing.SignatureExpired:
+ messages.error(request, _("Link expired"))
+ except signing.BadSignature:
+ messages.error(request, _("Invalid link"))
+ else:
+ send_registration_email(request, form_data)
+ messages.info(request, _("Registration was filed with the admins."))
+ return redirect("user-login")
+
+
+@require_GET
+def create_user(request):
+ if not request.user.is_privileged():
+ raise
+ if User.objects.filter(
+ Q(username=request.GET["username"]) | Q(email=request.GET["email"])
+ ).exists():
+ msg = _("Username ({username}) or email ({email}) already exists.").format(
+ username=request.GET["username"], email=request.GET["email"]
+ )
+ else:
+ User.objects.create(**request.GET.dict(), is_active=True)
+ send_password_reset_email(request, request.GET["email"], True)
+ msg = _("User has been successfully created.")
+ messages.info(request, msg)
+ return redirect("channel-main")
@method_decorator(login_not_required, name="dispatch")
-class ResetPasswordView(FormView):
+class RequestPasswordResetView(FormView):
form_class = ResetPasswordForm
template_name = "user/user.html"
- extra_context = {"title": "Reset Password"}
+ extra_context = {"title": _("Request Password Reset")}
success_url = reverse_lazy("user-login")
- def send_password_reset_email(self, form):
- bridge.call_async(
- send_password_reset_email, self.request, form.cleaned_data["username"]
- )
- messages.info(
- self.request, _("If this user exists, an email is currently being sent.")
+ def form_valid(self, form):
+ if isinstance(form, ResetPasswordForm) and "username" in form.cleaned_data:
+ bridge.call_async(
+ send_password_reset_email, self.request, form.cleaned_data["username"]
+ )
+ messages.info(
+ self.request,
+ _("If this user exists, an email is currently being sent."),
+ )
+ return super().form_valid(form)
+
+
+@method_decorator(login_not_required, name="dispatch")
+class ResetPasswordView(FormView):
+ form_class = SetPasswordForm
+ template_name = "user/user.html"
+ extra_context = {"title": django_gettext_lazy("Reset Password")}
+ success_url = reverse_lazy("user-login")
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.user = None
+
+ def setup_user_and_validate_request(self, request):
+ try:
+ self.user = User.objects.get(
+ pk=int(urlsafe_b64decode(request.GET["uidb64"]))
+ )
+ token = request.GET["token"]
+ except (User.DoesNotExist, KeyError, ValueError):
+ return False
+ return self.user.check_token(token) and default_token_generator.check_token(
+ self.user, token
)
- def save_new_password(self, form):
- form.save()
- del self.request.session[INTERNAL_RESET_SESSION_TOKEN]
- messages.info(self.request, django_gettext_lazy("Password reset complete"))
+ def get(self, request, *args, **kwargs):
+ if not self.setup_user_and_validate_request(request):
+ messages.info(self.request, _("Token validation failed"))
+ return redirect("user-login")
+ return super().get(request, *args, **kwargs)
+
+ def post(self, request, *args, **kwargs):
+ if not self.setup_user_and_validate_request(request):
+ messages.info(self.request, _("Token validation failed"))
+ return redirect("user-login")
+ return super().post(request, *args, **kwargs)
+
+ def get_form_kwargs(self):
+ return {**super().get_form_kwargs(), "user": self.user}
def form_valid(self, form):
- if "username" in form.cleaned_data:
- self.send_password_reset_email(form)
- elif (
- "new_password1" in form.cleaned_data
- and "new_password2" in form.cleaned_data
- ):
- self.save_new_password(form)
+ if all(key in form.cleaned_data for key in form.fields.keys()):
+ form.save()
+ messages.info(self.request, django_gettext_lazy("Password reset complete."))
return super().form_valid(form)
- def get_form(self, form_class=None):
- request = self.request
- if (
- request.method == "GET"
- and "token" in request.GET
- and "uidb64" in request.GET
- ):
- user = User.objects.get(pk=urlsafe_b64decode(request.GET["uidb64"]))
- default_token_generator.check_token(user, request.GET["token"])
- request.session[INTERNAL_RESET_SESSION_TOKEN] = user.pk
- elif (
- request.method == "POST"
- and "new_password1" in request.POST
- and "new_password2" in request.POST
- and "token" in request.GET
- ):
- user = User.objects.get(pk=request.session[INTERNAL_RESET_SESSION_TOKEN])
- default_token_generator.check_token(user, request.GET["token"])
- else:
- return super().get_form(form_class)
- return SetPasswordForm(user, **self.get_form_kwargs())
+class UserRestView(ModelRestView):
+ serializer = UserSerializer
-@require_GET
-def create_user(request):
- if User.objects.filter(
- username=request.GET["username"], email=request.GET["email"]
- ):
- messages.info(request, _("User was already created."))
- else:
- User.objects.create(**request.GET.dict(), is_active=True)
- messages.info(request, _("User was successfully created."))
- return redirect("channel-main")
+ def get_queryset(self):
+ return super().get_queryset().filter(is_active=True, last_login__isnull=False)
+
+ def dispatch(self, request, *args, **kwargs):
+ user_id = self.request.user.pk
+ if self.request.resolver_match.view_name == "api-user-current":
+ self.kwargs["id"] = user_id
+ if (
+ self.action in ("create", "delete") and not request.user.is_privileged()
+ ) or (
+ self.action == "update"
+ and not request.user.is_privileged()
+ and self.kwargs["id"] != user_id
+ ):
+ return self.handle_no_permission()
+ return super().dispatch(request, *args, **kwargs)