-# Generated by Django 6.0.4 on 2026-04-24 13:34
+# Generated by Django 6.0.4 on 2026-05-10 15:04
import django.db.models.deletion
from django.db import migrations, models
-# Generated by Django 6.0.4 on 2026-04-24 13:34
+# Generated by Django 6.0.4 on 2026-05-10 15:04
import django.db.models.deletion
import pgtrigger.compiler
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"posted_ts":\' || OLD."posted_ts" || \',"edited_ts":\' || OLD."edited_ts" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \',"text":\' || OLD."text" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="4d9a92efa5e09a6f1b6133b0058532a81f154a19",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."posted_ts", \'field\', "OLD"."edited_ts", \'field\', "OLD"."user_id", \'field\', "OLD"."channel_id", \'field\', "OLD"."text")\n )\n );\n RETURN NULL;\n ',
+ hash="07fe162a2e795edf3c9f3a558a07a129c0d932a7",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_e3727",
table="channel_channelmessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"posted_ts":\' || NEW."posted_ts" || \',"edited_ts":\' || NEW."edited_ts" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \',"text":\' || NEW."text" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="53d66e09ee2f16df9a76df7c84d04f03c3899ecb",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."posted_ts", \'field\', "NEW"."edited_ts", \'field\', "NEW"."user_id", \'field\', "NEW"."channel_id", \'field\', "NEW"."text")\n )\n );\n RETURN NULL;\n ',
+ hash="0d1f1559c08c23f871043630f4808a9e20716850",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_a5e85",
table="channel_channelmessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
- hash="2a95850c0c6614d80da1343422459dcf122d13b0",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
+ hash="c79a373af4522bc4774ae554e5e93789aea06d05",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_ab388",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"added_ts":\' || OLD."added_ts" || \',"user_id":\' || OLD."user_id" || \',"channel_id":\' || OLD."channel_id" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="bdcd7bfc868f23b48db6af8414d7bfb0b607149d",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"OLD\".\"id\", 'field', \"OLD\".\"added_ts\", 'field', \"OLD\".\"user_id\", 'field', \"OLD\".\"channel_id\")\n )\n );\n RETURN NULL;\n ",
+ hash="60bd75faca7769c12f0a42219e5abafcf46de6ca",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_d1dad",
table="channel_channeluser",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"added_ts":\' || NEW."added_ts" || \',"user_id":\' || NEW."user_id" || \',"channel_id":\' || NEW."channel_id" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="2c178a72a40e11e6ea1f2fca4b4e94fe0b590277",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"NEW\".\"id\", 'field', \"NEW\".\"added_ts\", 'field', \"NEW\".\"user_id\", 'field', \"NEW\".\"channel_id\")\n )\n );\n RETURN NULL;\n ",
+ hash="94006ba6a11d60a0c08e9e9812a1dd4dc2be4671",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_4abda",
table="channel_channeluser",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
- hash="38f70e9cefd5f5687f702110ced17fc7d875eb1b",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
+ hash="24b3dfe313c8d9fb49df15607acae940d611dd8a",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_be185",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"created_ts":\' || OLD."created_ts" || \',"name":\' || OLD."name" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="d5bc46053d7e825d228f68ce90aa068a3bf33458",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"OLD\".\"id\", 'field', \"OLD\".\"created_ts\", 'field', \"OLD\".\"name\")\n )\n );\n RETURN NULL;\n ",
+ hash="e4beddaa6aeea6939a68be72310dbf204993946d",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_71f70",
table="channel_channel",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"created_ts":\' || NEW."created_ts" || \',"name":\' || NEW."name" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="0ca4fae492403b7b12267933aadc6c8f8cc325ef",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"NEW\".\"id\", 'field', \"NEW\".\"created_ts\", 'field', \"NEW\".\"name\")\n )\n );\n RETURN NULL;\n ",
+ hash="4215198b8261bd097742e9227732675a3d0bdc1c",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_0f4bc",
table="channel_channel",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
- hash="570f8232ef884343064eb3240fdc36e61052297e",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
+ hash="80b7a9088bce0dcb1e3e7adf6b54e5473272f203",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_2f496",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"posted_ts":\' || OLD."posted_ts" || \',"edited_ts":\' || OLD."edited_ts" || \',"sender_id":\' || OLD."sender_id" || \',"recipient_id":\' || OLD."recipient_id" || \',"text":\' || OLD."text" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="80d30fe7e136d5e9fb35eb4a4d8f8e05080d7868",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."posted_ts", \'field\', "OLD"."edited_ts", \'field\', "OLD"."sender_id", \'field\', "OLD"."recipient_id", \'field\', "OLD"."text")\n )\n );\n RETURN NULL;\n ',
+ hash="04d3d16eba42ed3fcd7bae3e0c45b9fdcd0ec05f",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_5cdfa",
table="channel_privatemessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"posted_ts":\' || NEW."posted_ts" || \',"edited_ts":\' || NEW."edited_ts" || \',"sender_id":\' || NEW."sender_id" || \',"recipient_id":\' || NEW."recipient_id" || \',"text":\' || NEW."text" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="168788f9d00ca7895225db105540b91b8919ad66",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."posted_ts", \'field\', "NEW"."edited_ts", \'field\', "NEW"."sender_id", \'field\', "NEW"."recipient_id", \'field\', "NEW"."text")\n )\n );\n RETURN NULL;\n ',
+ hash="078224b936ff9395c5ea4cae1729671d25b0b1f5",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_4c85f",
table="channel_privatemessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
- hash="5adc21d8be79bfa8f0253ad28f0cef04b28c4752",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
+ hash="f329902c9edc996be6f7b56c8a3e318aed50f1c3",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_d9c27",
(function () {
+ var data_methods = ["post", "put", "patch"];
function foreach(obj, callback, start) {
var i;
var o;
out.push(s);
return out.join("");
},
- "xhr": function (method, url, callbacks, data) {
+ "xhr": function (method, url, data, load_cb, abort_cb, error_cb) {
var request = new XMLHttpRequest();
- if (!callbacks.hasOwnProperty("error")) {
- request.addEventListener(
- "error",
- function (msg) {
- console.log("xhr_error", msg);
- }
- );
- }
- if (!callbacks.hasOwnProperty("abort")) {
- request.addEventListener(
- "abort",
- function (msg) {
- console.log("xhr_abort", msg);
- }
- );
- }
- foreach(
- callbacks,
- function (event_name, callback) {
- request.addEventListener(event_name, callback);
+ request.addEventListener(
+ "load",
+ load_cb || function (event) {
+ console.log(
+ "xhr_load",
+ event.target.status,
+ event.target.response
+ );
+ }
+ );
+ request.addEventListener(
+ "abort",
+ abort_cb || function (event) {
+ console.log("xhr_abort", event.target.response);
+ }
+ );
+ request.addEventListener(
+ "error",
+ error_cb || function (event) {
+ console.log("xhr_error", event.target.response);
}
);
request.open(method, url);
- if (
- method.toLowerCase() === "put"
- || method.toLowerCase() === "post"
- ) {
- request.send(data);
+ if (data_methods.includes(method.toLowerCase())) {
+ request.setRequestHeader("Content-Type", "application/json");
+ if (typeof data === "object") {
+ data = JSON.stringify(data);
+ }
} else {
- request.send();
+ data = null;
}
+ request.send(data);
return request;
}
};
--- /dev/null
+(function () {
+ /*
+ - make it as simple as possible to connect and reconnect the websocket
+ - let's use fragments for navigation:
+ - #user:<user-id> for private messages
+ - #channel:<channel-id> for channels
+ var text_node = document.createTextNode;
+ var create_tag = document.createElement;
+ */
+
+ function websocket_message(event) {
+ console.log("websocket_message", event);
+ }
+
+ function websocket_close(event) {
+ console.log("websocket_close", event);
+ }
+
+ function websocket_error(event) {
+ console.log("websocket_error", event);
+ }
+
+ function websocket_connect() {
+ var websocket_schemas = {"http:": "ws:", "https:": "wss:"};
+ var websocket = new WebSocket(
+ websocket_schemas[window.location.protocol]
+ + "//"
+ + window.location.host
+ + "/"
+ );
+ websocket.addEventListener("message", websocket_message);
+ websocket.addEventListener("close", websocket_close);
+ websocket.addEventListener("error", websocket_error);
+ }
+
+ document.addEventListener(
+ "readystatechange",
+ function () {
+ if (document.readyState !== "complete") {
+ return;
+ }
+ websocket_connect();
+ }
+ );
+}());
{{ block.super }}
<link rel="stylesheet" href="{% static 'channel/styles.css' %}">
<script src="{% static 'channel/chatutils.js' %}"></script>
+ <script src="{% static 'channel/main.js' %}"></script>
{% endblock head %}
{% block header %}
{% if messages %}
from django.urls import path
from .views import ChannelMainView
+from .websocket import handle_websocket
urlpatterns = [
path("", ChannelMainView.as_view(), name="channel-main"),
]
+websocket_urls = {"/": handle_websocket}
--- /dev/null
+import json
+from asyncio import (
+ CancelledError,
+ ensure_future,
+ get_running_loop,
+)
+from contextlib import contextmanager
+from functools import partial
+from io import BytesIO
+from traceback import print_exc
+
+from asgiref.sync import sync_to_async
+from django.core.handlers.asgi import ASGIRequest
+from django.contrib.sessions.middleware import SessionMiddleware
+from django.contrib.auth import aget_user
+from django.db import connection
+
+from chat.triggers import TriggerChannel
+
+
+@contextmanager
+def listen_notify_handler(connection, callback):
+ loop = get_running_loop()
+ loop.add_reader(connection.fileno(), partial(connection.execute, "SELECT 1"))
+ connection.add_notify_handler(callback)
+ for name in TriggerChannel.registry:
+ connection.execute(f"LISTEN {name}")
+ try:
+ yield
+ finally:
+ for name in TriggerChannel.registry:
+ connection.execute(f"UNLISTEN {name}")
+ connection.remove_notify_handler(callback)
+ loop.remove_reader(connection.fileno())
+
+
+def filter_trigger_always(coro, data, user, user_channels):
+ ensure_future(coro)
+
+
+def filter_trigger_privatemessage(coro, data, user, user_channels):
+ if user.pk in (data["obj"]["sender_id"], data["obj"]["recipient_id"]):
+ ensure_future(coro)
+ else:
+ coro.close()
+
+
+filter_triggers = {
+ "user_user": filter_trigger_always,
+ "channel_channel": filter_trigger_always,
+ # "channel_channelmessage": filter_trigger_channelmessage,
+ # "channel_channeluser": filter_trigger_channeluser,
+ "channel_privatemessage": filter_trigger_privatemessage,
+}
+
+
+def process_triggers(send, user, user_channels, notification):
+ data = json.loads(notification.payload)
+ filter_triggers[data["table"]](
+ send({"type": "websocket.send", "text": notification.payload}),
+ data,
+ user,
+ user_channels,
+ )
+
+
+def get_user_channels(user):
+ return set(user.channels.all().values_list("pk", flat=True))
+
+
+async def process_ws(receive, send):
+ while True:
+ event = await receive()
+ if event["type"] == "websocket.connect":
+ await send({"type": "websocket.accept"})
+ elif event["type"] == "websocket.disconnect":
+ return
+ elif event["type"] == "websocket.receive":
+ # ...maybe make it possible to request data through the ws?
+ if event["text"] == "ping":
+ await send(
+ {
+ "type": "websocket.send",
+ "text": "pong",
+ },
+ )
+
+
+async def handle_websocket(scope, receive, send):
+ request = ASGIRequest({**scope, "method": "_ws"}, BytesIO())
+ SessionMiddleware(lambda x: None).process_request(request)
+ request.user = await aget_user(request)
+
+ if not request.user.is_authenticated:
+ await send({"type": "websocket.close"})
+ return
+
+ await sync_to_async(connection.connect)()
+ with listen_notify_handler(
+ connection.connection,
+ partial(
+ process_triggers,
+ send,
+ request.user,
+ await sync_to_async(get_user_channels)(request.user),
+ ),
+ ):
+ try:
+ await process_ws(receive, send)
+ except CancelledError:
+ pass
+ except Exception:
+ print_exc()
+ try:
+ await send({"type": "websocket.close"})
+ except Exception:
+ pass
async def application(scope, receive, send):
+ from channel.urls import websocket_urls
+
if scope["type"] == "lifespan":
await bridge.listen(receive, send)
+ elif scope["type"] == "websocket":
+ await websocket_urls.get(scope["path"])(scope, receive, send)
else:
await django_app(scope, receive, send)
from pgtrigger import After, Delete, Insert, Row, Statement, Trigger, Truncate, Update
-def fields_to_json(t, fields):
- return ",".join(f'"{field}":\' || {t}."{field}" || \'' for field in fields)
+def fields_to_json_build_object(table, fields):
+ args = []
+ for field in fields:
+ args.extend(("'field'", f'"{table}"."{field}"'))
+ return f"json_build_object({', '.join(args)})"
class TriggerChannel:
+ registry = {}
+
def __init__(self, name):
+ assert name not in self.registry
+ self.registry[name] = self
self.name = name
def __call__(self, fields):
func=f"""
PERFORM pg_notify(
'{self.name}',
- '{{"op":"' || TG_OP || '","table":"' || TG_TABLE_NAME ||
- '","obj":{{{fields_to_json("OLD", fields)}}}}}'
+ json_build_object(
+ 'op', TG_OP,
+ 'table', TG_TABLE_NAME,
+ 'obj', {fields_to_json_build_object("OLD", fields)}
+ )
);
RETURN NULL;
""",
func=f"""
PERFORM pg_notify(
'{self.name}',
- '{{"op":"' || TG_OP || '","table":"' || TG_TABLE_NAME ||
- '","obj":{{{fields_to_json("NEW", fields)}}}}}'
+ json_build_object(
+ 'op', TG_OP,
+ 'table', TG_TABLE_NAME,
+ 'obj', {fields_to_json_build_object("NEW", fields)}
+ )
);
RETURN NULL;
""",
func=f"""
PERFORM pg_notify(
'{self.name}',
- '{{"op":"' || TG_OP || '","table":"' || TG_TABLE_NAME || '"}}'
+ json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)
);
RETURN NULL;
""",
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
-"POT-Creation-Date: 2026-05-01 09:44+0000\n"
+"POT-Creation-Date: 2026-05-09 12:56+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
-#: user/email.py:29
+#: user/email.py:30
+msgid "New user password reset"
+msgstr ""
+
+#: user/email.py:32
#, python-brace-format
msgid ""
"Hello {username}\n"
"{host}"
msgstr ""
-#: user/email.py:35
+#: user/email.py:37
+msgid "Password reset request"
+msgstr ""
+
+#: user/email.py:39
#, python-brace-format
msgid ""
"Hello {username}\n"
"{host}"
msgstr ""
-#: user/email.py:40
-#, python-brace-format
-msgid "{host}: Password reset request"
-msgstr ""
-
-#: user/email.py:58
-#, python-brace-format
-msgid "{host}: Confirm your email address"
+#: user/email.py:61
+msgid "Confirm your email address"
msgstr ""
-#: user/email.py:60
+#: user/email.py:65
#, python-brace-format
msgid ""
"Hello {username}\n"
"{host}"
msgstr ""
-#: user/email.py:82
-msgid "Chat Registration Request"
+#: user/email.py:86
+msgid "Chat registration request"
msgstr ""
-#: user/email.py:83
+#: user/email.py:89
#, python-brace-format
msgid ""
"username: {username}\n"
def field_to_json(self, field_name, instance):
if field_name == "url":
name = self.model._meta.verbose_name.lower().replace(" ", "")
- value = reverse(f"{name}-detail", args=[instance.pk])
+ value = reverse(f"api-{name}-detail", args=[instance.pk])
if self.request:
value = self.request.build_absolute_uri(value)
return value
--- /dev/null
+import json
+
+from django.test.client import Client
+
+
+class RestClient(Client):
+ def put_json(self, path, data, *args, **kwargs):
+ kwargs.setdefault("content_type", "application/json")
+ return super().put(path, json.dumps(data), *args, **kwargs)
+
+ def post_json(self, path, data, *args, **kwargs):
+ kwargs.setdefault("content_type", "application/json")
+ return super().post(path, json.dumps(data), *args, **kwargs)
urlpatterns = []
+list_map = {"get": "list", "post": "create"}
+detail_map = {"get": "detail", "put": "update", "delete": "delete"}
def get_urls(view_class, name, *extra_patterns):
- for method_map in (
- {"get": "list", "post": "create"},
- {"get": "detail", "put": "update", "delete": "delete"},
+ for pattern, url_name, method_map in (
+ (f"{name}/", f"api-{name}-list", list_map),
+ (f"{name}/<int:id>/", f"api-{name}-detail", detail_map),
):
urlpatterns.append(
- path(
- f"{name}/",
- view_class.as_view(method_map=method_map),
- name=f"api-{name}-list",
- )
+ path(pattern, view_class.as_view(method_map=method_map), name=url_name)
)
urlpatterns.extend(extra_patterns)
def get_queryset(self):
return QuerySet(self.serializer.model).all()
- def get_object(self):
+ @cached_property
+ def instance(self):
return self.get_queryset().get(pk=self.kwargs["id"])
def paginate(self, queryset, to_json):
page_size = queryset.count()
elif "list_all" not in query:
page_size = settings.DEFAULT_PAGE_SIZE
+ if not queryset.ordered:
+ queryset = queryset.order_by("id")
queryset = queryset[max(count - page_size, 0) :]
else:
page_size = count
return {"indent": 4}
return {}
- def list(self):
- serializer = self.serializer(self.request)
+ def list(self, request, *args, **kwargs):
return HttpResponse(
json.dumps(
- self.paginate(self.get_queryset(), serializer.to_json),
+ self.paginate(self.get_queryset(), self.serializer(request).to_json),
**self.get_json_dump_kwargs(),
),
content_type="application/json",
)
- def create(self):
- serializer = self.serializer(self.request)
- instance = serializer.model()
- serializer.save(json.load(self.request), instance)
+ def create(self, request, *args, **kwargs):
+ serializer = self.serializer(request)
+ self.instance = serializer.model()
+ serializer.save(json.load(request), self.instance)
return HttpResponse(
- json.dumps(serializer.to_json(instance)),
+ json.dumps(serializer.to_json(self.instance)),
content_type="application/json",
status=201,
)
- def detail(self):
+ def detail(self, request, *args, **kwargs):
return HttpResponse(
json.dumps(
- self.serializer(self.request).to_json(self.get_object()),
+ self.serializer(request).to_json(self.instance),
**self.get_json_dump_kwargs(),
),
content_type="application/json",
)
- def update(self):
- serializer = self.serializer(self.request)
- instance = self.get_object()
- serializer.save(json.load(self.request), instance)
+ def update(self, request, *args, **kwargs):
+ serializer = self.serializer(request)
+ serializer.save(json.load(request), self.instance)
return HttpResponse(
- json.dumps(serializer.to_josn(instance)),
+ json.dumps(serializer.to_json(self.instance)),
content_type="application/json",
)
- def delete(self):
- self.get_object().delete()
+ def delete(self, request, *args, **kwargs):
+ self.instance.delete()
return HttpResponse(status=204)
@cached_property
return self.method_map[self.request.method.lower()]
def dispatch(self, request, *args, **kwargs):
- try:
- return getattr(self, self.action)()
- except Exception as e:
- return HttpResponse(
- json.dumps({type(e).__name__: e.args}),
- content_type="application/json",
- status=500,
- )
+ return getattr(self, self.action)(request, *args, **kwargs)
class PrivilegeRequiredMixin:
user = User.objects.get_by_natural_key(username_or_email)
except User.DoesNotExist:
return
+ if not user.email:
+ return
host = request.get_host()
relative_url = reverse(
"user-reset-password",
},
)
if created:
+ subject = _("New user password reset")
msg = _(
"Hello {username}\n\n"
"Your user has been created. Please reset your password:\n\n"
"{url}\n\nyours,\n{host}"
)
else:
+ subject = _("Password reset request")
msg = _(
"Hello {username}\n\n"
"Someone that is hopefully you, has requested a password reset link:\n\n"
"{url}\n\nyours,\n{host}"
)
send_mail(
- _("{host}: Password reset request").format(host=host),
+ f"{host}: {subject}",
msg.format(
host=host,
url=request.build_absolute_uri(relative_url),
query={"token": signing.dumps(form_data)},
)
host = request.get_host()
+ subject = _("Confirm your email address")
send_mail(
- _("{host}: Confirm your email address").format(host=host),
+ f"{host}: {subject}",
_(
"Hello {username}\n\n"
"Someone that is hopefully you, has filled in the registration form.\n"
query={key: value for key, value in form_data.items() if key != "message"},
)
)
+ subject = _("Chat registration request")
return EmailMultiAlternatives(
- _("Chat Registration Request"),
+ f"{request.get_host()}: {subject}",
_("username: {username}\nemail: {email}\nmessage: {message}\n\n{url}").format(
**form_data, url=url
),
-# Generated by Django 6.0.4 on 2026-04-24 13:34
+# Generated by Django 6.0.4 on 2026-05-10 15:04
import django.contrib.auth.validators
import django.utils.timezone
default=django.utils.timezone.now, verbose_name="date joined"
),
),
+ ("last_password_change", models.DateTimeField(blank=True, null=True)),
(
"groups",
models.ManyToManyField(
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || OLD."id" || \',"username":\' || OLD."username" || \',"email":\' || OLD."email" || \',"first_name":\' || OLD."first_name" || \',"last_name":\' || OLD."last_name" || \',"date_joined":\' || OLD."date_joined" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="d9fa857efc39e0e22a104e69363f4cfaecada94a",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."username", \'field\', "OLD"."email", \'field\', "OLD"."first_name", \'field\', "OLD"."last_name", \'field\', "OLD"."date_joined")\n )\n );\n RETURN NULL;\n ',
+ hash="9cf43edc5ce554d755b1e3978cfc5bf687c8f5ed",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_a66e5",
table="user_user",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n \'{"op":"\' || TG_OP || \'","table":"\' || TG_TABLE_NAME ||\n \'","obj":{"id":\' || NEW."id" || \',"username":\' || NEW."username" || \',"email":\' || NEW."email" || \',"first_name":\' || NEW."first_name" || \',"last_name":\' || NEW."last_name" || \',"date_joined":\' || NEW."date_joined" || \'}}\'\n );\n RETURN NULL;\n ',
- hash="bc33ad5f18bff543d5524d33e48f447ab5b9e997",
+ func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."username", \'field\', "NEW"."email", \'field\', "NEW"."first_name", \'field\', "NEW"."last_name", \'field\', "NEW"."date_joined")\n )\n );\n RETURN NULL;\n ',
+ hash="d48d015aa62321255d7ce9cde38703fe03d87881",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_72ed2",
table="user_user",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n '{\"op\":\"' || TG_OP || '\",\"table\":\"' || TG_TABLE_NAME || '\"}'\n );\n RETURN NULL;\n ",
- hash="967da6f33240942f142b2c1848e263089d958e83",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
+ hash="c7a19f0c2b05edc799fdee39cd011bddd2456b7d",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_8a96a",
+++ /dev/null
-# Generated by Django 6.0.4 on 2026-04-29 07:10
-
-from django.db import migrations, models
-
-
-class Migration(migrations.Migration):
- dependencies = [
- ("user", "0001_initial"),
- ]
-
- operations = [
- migrations.AddField(
- model_name="user",
- name="last_password_change",
- field=models.DateTimeField(blank=True, null=True),
- ),
- ]
+import json
from base64 import b64encode
+from datetime import UTC, datetime
from urllib.parse import parse_qsl, urlsplit
from django.conf import settings
from django.test import TestCase, override_settings
from django.urls import reverse
+from rest.tests import RestClient
from .models import User
# test all user-related stuff here
# - user rest view
+def pending_messages(request):
+ return [(message.level, message.message) for message in get_messages(request)]
+
+
@override_settings(DEFAULT_FROM_EMAIL="noreply@testserver.com")
class PasswordResetTest(TestCase):
- @staticmethod
- def messages(request):
- return [(message.level, message.message) for message in get_messages(request)]
-
def check_reset_password_response(self, response, expected_outbox, username=None):
request = response.wsgi_request
self.assertEqual(
- self.messages(request),
+ pending_messages(request),
[(INFO, "If this user exists, an email is currently being sent.")],
)
self.client.get(response.headers["Location"])
else:
self.assertIsInstance(response, HttpResponseRedirect)
self.assertEqual(
- self.messages(response.wsgi_request),
+ pending_messages(response.wsgi_request),
[(INFO, "Password reset complete.")],
)
self.client.get(reverse("user-login"))
self.assertEqual(response.status_code, 302)
self.assertEqual(response.content, b"")
self.assertEqual(
- self.messages(response.wsgi_request),
+ pending_messages(response.wsgi_request),
[(INFO, "Token validation failed")],
)
+
+class RegisterTest(TestCase):
@override_settings(ADMINS=["admin@testserver.com"])
def test_register(self):
self.assertEqual(len(mail.outbox), 0)
}
response = self.client.post(url, payload)
self.assertEqual(
- self.messages(response.wsgi_request),
+ pending_messages(response.wsgi_request),
[
(
INFO,
self.client.get(reverse("user-login"))
response = self.client.get(mail.outbox[0].body[len(prefix) : -len(suffix)])
self.assertEqual(
- self.messages(response.wsgi_request),
+ pending_messages(response.wsgi_request),
[(INFO, "Registration was filed with the admins.")],
)
self.assertEqual(response.status_code, 302)
self.client.force_login(admin)
response = self.client.get(mail.outbox[1].body[len(prefix) :])
self.assertEqual(
- self.messages(response.wsgi_request),
+ pending_messages(response.wsgi_request),
[(INFO, "User has been successfully created.")],
)
user = User.objects.get(email=payload["email"])
(user.username, user.email), (payload["username"], payload["email"])
)
+
+class UserTest(TestCase):
def test_get_full_email(self):
user = User.objects.create(email="test@testserver.com")
self.assertEqual(user.get_full_email(), user.email)
del user.first_name
self.assertEqual(user.get_full_email(), f"{user.last_name} <{user.email}>")
+
+class LoginTest(TestCase):
def is_redirect(self, response, expected_location=None):
self.assertEqual(response.status_code, 302)
if expected_location is not None:
user.save()
url = reverse("user-login")
chat_url = reverse("channel-main")
+ url = f"{url}?next={chat_url}"
self.is_redirect(self.client.get(chat_url), url)
response = self.client.post(
url,
self.assertEqual(self.client.get(chat_url).status_code, 200)
self.assertEqual(self.client.get(reverse("user-logout")).status_code, 302)
self.is_redirect(self.client.get(chat_url), url)
+
+
+class UserRestViewTest(TestCase):
+ client_class = RestClient
+ client: RestClient
+
+ @classmethod
+ def setUpTestData(cls):
+ for i in range(50):
+ User.objects.create(
+ username=f"steve{i}",
+ email=f"steve{i}@testserver.com",
+ last_login=datetime(2026, 3, 31, 5, 23, tzinfo=UTC),
+ )
+
+ user: User
+
+ def setUp(self):
+ self.user = User.objects.first()
+ self.client.force_login(self.user)
+
+ def test_api_user_list(self):
+ data1 = json.loads(self.client.get(reverse("api-user-list")).content)
+ ids1 = {item["id"] for item in data1["result"]}
+ ids2 = {
+ item["id"]
+ for item in json.loads(self.client.get(data1["previous"]).content)["result"]
+ }
+
+ self.assertEqual(len(ids1), settings.DEFAULT_PAGE_SIZE)
+ self.assertEqual(len(ids2), settings.DEFAULT_PAGE_SIZE)
+ self.assertEqual(len(ids1 & ids2), 0)
+
+ def test_api_user_detail(self):
+ data = json.loads(self.client.get(reverse("api-user-list")).content)["result"]
+ self.assertEqual(len(data), settings.DEFAULT_PAGE_SIZE)
+ for item in data:
+ item_data = json.loads(self.client.get(item["url"]).content)
+ self.assertEqual(item, item_data)
+
+ def test_api_user_create(self):
+ self.assertFalse(self.user.is_staff)
+ self.assertEqual(len(mail.outbox), 0)
+ with self.assertLogs("django.request", level="WARNING") as cm:
+ response = self.client.post_json(
+ reverse("api-user-list"),
+ {"username": "querty", "email": "querty@testserver.com"},
+ )
+ self.assertEqual(
+ cm.output, ["WARNING:django.request:Method Not Allowed: /api/user/"]
+ )
+ self.assertEqual(response.status_code, 405)
+ self.user.is_staff = True
+ self.user.save()
+ payload = {"username": "querty", "email": "querty@testserver.com"}
+ response = self.client.post_json(reverse("api-user-list"), payload)
+ self.assertEqual(response.status_code, 201)
+ new_user = json.loads(response.content)
+ instance = User.objects.get(pk=new_user["id"])
+ self.assertEqual(new_user["username"], instance.username)
+ self.assertEqual(new_user["email"], instance.email)
+ self.assertEqual(len(mail.outbox), 1)
+ self.assertEqual(mail.outbox[0].to, [payload["email"]])
+ self.assertEqual(
+ mail.outbox[0].subject,
+ f"{response.wsgi_request.get_host()}: New user password reset",
+ )
+
+ def test_api_user_update(self):
+ self.assertFalse(self.user.is_staff)
+ user_id = self.user.pk
+ self.user.refresh_from_db()
+ first_name = "Steve"
+ self.assertNotEqual(self.user.first_name, first_name)
+ response = self.client.put_json(
+ reverse("api-user-detail", args=[user_id]),
+ {"first_name": first_name},
+ )
+ self.assertEqual(json.loads(response.content)["first_name"], first_name)
+ self.user.refresh_from_db()
+ self.assertEqual(self.user.first_name, first_name)
+ other_user = User.objects.exclude(pk=user_id).first()
+ other_first_name = "Jonathan"
+ self.assertNotEqual(other_user.first_name, other_first_name)
+ with self.assertLogs("django.request", level="WARNING") as cm:
+ response = self.client.put_json(
+ reverse("api-user-detail", args=[other_user.pk]),
+ {"first_name": other_first_name},
+ )
+ self.assertEqual(
+ cm.output,
+ [f"WARNING:django.request:Method Not Allowed: /api/user/{other_user.pk}/"],
+ )
+ self.assertEqual(response.status_code, 405)
+ self.user.is_staff = True
+ self.user.save()
+ response = self.client.put_json(
+ reverse("api-user-detail", args=[other_user.pk]),
+ {"first_name": other_first_name},
+ )
+ self.assertEqual(json.loads(response.content)["first_name"], other_first_name)
+ other_user.refresh_from_db()
+ self.assertEqual(other_user.first_name, other_first_name)
+
+ def test_api_user_delete(self):
+ self.assertFalse(self.user.is_staff)
+ user = User.objects.exclude(pk=self.user.pk).last()
+ with self.assertLogs("django.request", level="WARNING") as cm:
+ response = self.client.delete(reverse("api-user-detail", args=[user.pk]))
+ self.assertEqual(
+ cm.output,
+ [f"WARNING:django.request:Method Not Allowed: /api/user/{user.pk}/"],
+ )
+ self.assertEqual(response.status_code, 405)
+ self.user.is_staff = True
+ self.user.save()
+ response = self.client.delete(reverse("api-user-detail", args=[user.pk]))
+ self.assertEqual(response.status_code, 204)
+ self.assertEqual(response.content, b"")
+ self.assertFalse(User.objects.filter(pk=user.pk).exists())
from django.urls import path
-from rest.urls import get_urls
+from rest.urls import detail_map, get_urls
from .views import (
LoginView,
LogoutView,
"user",
path(
"user/current/",
- UserRestView.as_view(
- method_map={"get": "detail", "put": "update", "delete": "delete"}
- ),
+ UserRestView.as_view(method_map=detail_map),
name="api-user-current",
),
)
if not user.is_privileged() and self.kwargs.get("id") == user.pk:
return ["GET", "PUT"]
return super().get_permitted_methods()
+
+ def create(self, request, *args, **kwargs):
+ response = super().create(request, *args, **kwargs)
+ if response.status_code == 201:
+ send_password_reset_email(request, self.instance.email, True)
+ return response