-# Generated by Django 6.0.4 on 2026-05-10 15:04
+# Generated by Django 6.0.4 on 2026-05-10 16:08
import django.db.models.deletion
import pgtrigger.compiler
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."posted_ts", \'field\', "OLD"."edited_ts", \'field\', "OLD"."user_id", \'field\', "OLD"."channel_id", \'field\', "OLD"."text")\n )\n );\n RETURN NULL;\n ',
- hash="07fe162a2e795edf3c9f3a558a07a129c0d932a7",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'posted_ts', OLD.\"posted_ts\", 'edited_ts', OLD.\"edited_ts\", 'user_id', OLD.\"user_id\", 'channel_id', OLD.\"channel_id\", 'text', OLD.\"text\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="7ba4cb9385597146a3744642fee6fd90273ab194",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_e3727",
table="channel_channelmessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."posted_ts", \'field\', "NEW"."edited_ts", \'field\', "NEW"."user_id", \'field\', "NEW"."channel_id", \'field\', "NEW"."text")\n )\n );\n RETURN NULL;\n ',
- hash="0d1f1559c08c23f871043630f4808a9e20716850",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'posted_ts', NEW.\"posted_ts\", 'edited_ts', NEW.\"edited_ts\", 'user_id', NEW.\"user_id\", 'channel_id', NEW.\"channel_id\", 'text', NEW.\"text\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="f87d55936e3130f4e625ccbdbe1f8c7a3fe2565f",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_a5e85",
table="channel_channelmessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
- hash="c79a373af4522bc4774ae554e5e93789aea06d05",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ",
+ hash="57512b4dada90146242c467082af3ce1b0c6f05f",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_ab388",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"OLD\".\"id\", 'field', \"OLD\".\"added_ts\", 'field', \"OLD\".\"user_id\", 'field', \"OLD\".\"channel_id\")\n )\n );\n RETURN NULL;\n ",
- hash="60bd75faca7769c12f0a42219e5abafcf46de6ca",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'added_ts', OLD.\"added_ts\", 'user_id', OLD.\"user_id\", 'channel_id', OLD.\"channel_id\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="6cc20d50ebbcea8caeb3bd13275bf535d040f530",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_d1dad",
table="channel_channeluser",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"NEW\".\"id\", 'field', \"NEW\".\"added_ts\", 'field', \"NEW\".\"user_id\", 'field', \"NEW\".\"channel_id\")\n )\n );\n RETURN NULL;\n ",
- hash="94006ba6a11d60a0c08e9e9812a1dd4dc2be4671",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'added_ts', NEW.\"added_ts\", 'user_id', NEW.\"user_id\", 'channel_id', NEW.\"channel_id\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="4147fdb70b2195e8662cef56fb9019c79ff648ae",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_4abda",
table="channel_channeluser",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
- hash="24b3dfe313c8d9fb49df15607acae940d611dd8a",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ",
+ hash="8fe4e4c6978aaefec007e16e93ac0002ccab5ff8",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_be185",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"OLD\".\"id\", 'field', \"OLD\".\"created_ts\", 'field', \"OLD\".\"name\")\n )\n );\n RETURN NULL;\n ",
- hash="e4beddaa6aeea6939a68be72310dbf204993946d",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'created_ts', OLD.\"created_ts\", 'name', OLD.\"name\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="c0a6078826bfdf3cb70c5b92af27e740fbefce0b",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_71f70",
table="channel_channel",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"NEW\".\"id\", 'field', \"NEW\".\"created_ts\", 'field', \"NEW\".\"name\")\n )\n );\n RETURN NULL;\n ",
- hash="4215198b8261bd097742e9227732675a3d0bdc1c",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'created_ts', NEW.\"created_ts\", 'name', NEW.\"name\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="003239b69bd859926cf38ab07601ac6d1280667d",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_0f4bc",
table="channel_channel",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
- hash="80b7a9088bce0dcb1e3e7adf6b54e5473272f203",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ",
+ hash="ba2eca8121197d1ff8d9db8908d9a1819dbb7e17",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_2f496",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."posted_ts", \'field\', "OLD"."edited_ts", \'field\', "OLD"."sender_id", \'field\', "OLD"."recipient_id", \'field\', "OLD"."text")\n )\n );\n RETURN NULL;\n ',
- hash="04d3d16eba42ed3fcd7bae3e0c45b9fdcd0ec05f",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'posted_ts', OLD.\"posted_ts\", 'edited_ts', OLD.\"edited_ts\", 'sender_id', OLD.\"sender_id\", 'recipient_id', OLD.\"recipient_id\", 'text', OLD.\"text\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="72ff576a3277d314ec9e6446c0f67e9d95e86bf6",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_5cdfa",
table="channel_privatemessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."posted_ts", \'field\', "NEW"."edited_ts", \'field\', "NEW"."sender_id", \'field\', "NEW"."recipient_id", \'field\', "NEW"."text")\n )\n );\n RETURN NULL;\n ',
- hash="078224b936ff9395c5ea4cae1729671d25b0b1f5",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'posted_ts', NEW.\"posted_ts\", 'edited_ts', NEW.\"edited_ts\", 'sender_id', NEW.\"sender_id\", 'recipient_id', NEW.\"recipient_id\", 'text', NEW.\"text\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="7b9480ce21e70b78c5890c896936ec25ec4498da",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_4c85f",
table="channel_privatemessage",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
- hash="f329902c9edc996be6f7b56c8a3e318aed50f1c3",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ",
+ hash="6f2966fe582724501863bf905470cdd82634d33d",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_d9c27",
"recipient_id",
"text",
]
+ create_fields = ["sender_id", "recipient_id", "text"]
update_fields = ["text"]
def from_json(self, data, instance=None):
+/*global chatutils*/
(function () {
/*
- - make it as simple as possible to connect and reconnect the websocket
- let's use fragments for navigation:
- #user:<user-id> for private messages
- #channel:<channel-id> for channels
- var text_node = document.createTextNode;
- var create_tag = document.createElement;
*/
+ var text_node = document.createTextNode.bind(document);
+ var create_tag = document.createElement.bind(document);
+ var messages;
+ var unloading = false;
+ var websocket = null;
+ var serial = 1;
+ var callbacks = {};
+ var websocket_schemas = {"http:": "ws:", "https:": "wss:"};
+ var current_channel = null;
+ var current_user = null;
+
+ function reconnect_button_add() {
+ var button;
+ var p = chatutils.query("p", "reconnect_button");
+ if (p) {
+ reconnect_button_display(true);
+ reconnect_button_disabled(false);
+ return;
+ }
+ p = create_tag("P");
+ button = create_tag("BUTTON");
+ button.appendChild(text_node("Reconnect"));
+ button.addEventListener("click", websocket_connect);
+ p.appendChild(button);
+ p.classList.add("reconnect_button");
+ messages.appendChild(p);
+ }
+
+ function reconnect_button_display(state) {
+ var p = chatutils.query("p", "reconnect_button");
+ if (p) {
+ p.style.display = (
+ state
+ ? "block"
+ : "none"
+ );
+ }
+ }
+
+ function reconnect_button_disabled(state) {
+ var p = chatutils.query("p", "reconnect_btn");
+ var matches;
+ var i;
+ if (p) {
+ matches = p.getElementsByTagName("button");
+ for (i = 0; i < matches.length; i += 1) {
+ matches[i].disabled = state;
+ }
+ }
+ }
+
+ function get_ul(div) {
+ var matches = div.getElementsByTagName("UL");
+ if (matches.length === 0) {
+ return div.appendChild(create_tag("UL"));
+ }
+ return matches[0];
+ }
+
+ function setup_user_list(content) {
+ var ul = get_ul(chatutils.query("div", "user-list"));
+ var i;
+ var li;
+ while (ul.children.length > 0) {
+ ul.removeChild(ul.firstElementChild);
+ }
+ for (i = 0; i < content.result.length; i += 1) {
+ if (content.result[i].is_authenticated) {
+ current_user = content.result[i];
+ }
+ li = create_tag("LI");
+ li.appendChild(create_tag("A")).appendChild(
+ text_node(content.result[i].username)
+ );
+ li.children[0].href = "#user:" + content.result[i].id;
+ ul.appendChild(li);
+ }
+ }
+
+ function add_message(msg) {
+ var p = create_tag("P");
+ p.appendChild(text_node(msg));
+ messages.insertBefore(p, chatutils.query("p", "reconnect_button"));
+ }
function websocket_message(event) {
- console.log("websocket_message", event);
+ var data = JSON.parse(event.data);
+ if (data.hasOwnProperty("serial")) {
+ if (callbacks.hasOwnProperty(data.serial)) {
+ callbacks[data.serial](JSON.parse(data.content));
+ delete callbacks[event.data.serial];
+ } else {
+ add_message("missing callback" + data);
+ }
+ return;
+ }
+ add_message(event.data);
+ }
+
+ function websocket_close() {
+ websocket = null;
+ if (unloading === false) {
+ reconnect_button_add();
+ }
}
- function websocket_close(event) {
- console.log("websocket_close", event);
+ function ws_api_request(method, path, data, callback) {
+ var obj = {"method": method, "path": path};
+ obj.serial = serial;
+ callbacks[serial] = callback;
+ serial += 1;
+ if (data) {
+ switch (method) {
+ case "GET":
+ obj.query_string = data;
+ break;
+ case "POST":
+ obj.body = data;
+ break;
+ }
+ }
+ websocket.send(JSON.stringify(obj));
}
- function websocket_error(event) {
- console.log("websocket_error", event);
+ function websocket_open() {
+ reconnect_button_display(false);
+ ws_api_request("GET", "/api/user/", null, setup_user_list);
+ hash_change();
}
function websocket_connect() {
- var websocket_schemas = {"http:": "ws:", "https:": "wss:"};
- var websocket = new WebSocket(
+ websocket = new WebSocket(
websocket_schemas[window.location.protocol]
+ "//"
+ window.location.host
+ "/"
);
+ websocket.addEventListener("open", websocket_open);
websocket.addEventListener("message", websocket_message);
websocket.addEventListener("close", websocket_close);
- websocket.addEventListener("error", websocket_error);
+ reconnect_button_disabled(true);
+ }
+
+ function clear_messages() {
+ var child;
+ var i;
+ for (i = messages.children.length - 1; i >= 0; i -= 1) {
+ child = messages.children[i];
+ if (!child.classList.contains("reconnect_button")) {
+ messages.removeChild(child);
+ }
+ }
+ }
+
+ function populate_messages(data) {
+ var i;
+ var p;
+ for (i = data.result.length - 1; i >= 0; i -= 1) {
+ p = create_tag("P");
+ p.appendChild(text_node(data.result[i].sender_id));
+ p.appendChild(text_node(": "));
+ p.appendChild(text_node(data.result[i].text));
+ messages.insertBefore(p, messages.firstChild);
+ }
+ }
+
+ function hash_change() {
+ if (location.hash === current_channel) {
+ return;
+ }
+ current_channel = location.hash;
+ clear_messages();
+ if (location.hash.startsWith("#user:")) {
+ ws_api_request(
+ "GET",
+ "/api/privatemessage/",
+ "user=" + location.hash.substring(6),
+ populate_messages
+ );
+ }
+ }
+
+ function add_message_reply(data) {
+ var p = create_tag("P");
+ p.appendChild(text_node(data.sender_id));
+ p.appendChild(text_node(": "));
+ p.appendChild(text_node(data.text));
+ messages.insertBefore(p, chatutils.query("p", "reconnect_button"));
+ }
+
+ function add_message_reply_sent(data) {
+ chatutils.query("div", "messages-footer").getElementsByTagName(
+ "textarea"
+ )[0].value = "";
+ add_message_reply(data);
+ }
+
+ function send_message() {
+ var textarea = chatutils.query(
+ "div",
+ "messages-footer"
+ ).getElementsByTagName("textarea")[0];
+ if (websocket === null || !textarea.value) {
+ return;
+ }
+ if (current_channel.startsWith("#user:")) {
+ ws_api_request(
+ "POST",
+ "/api/privatemessage/",
+ JSON.stringify(
+ {
+ "recipient_id": parseInt(
+ current_channel.substring(6),
+ 10
+ ),
+ "sender_id": current_user.id,
+ "text": textarea.value
+ }
+ ),
+ add_message_reply_sent
+ );
+ }
}
document.addEventListener(
if (document.readyState !== "complete") {
return;
}
+ messages = chatutils.query("div", "messages");
websocket_connect();
+ chatutils.query("div", "messages-footer").getElementsByTagName(
+ "BUTTON"
+ )[0].addEventListener("click", send_message);
+ }
+ );
+
+ window.addEventListener("hashchange", hash_change);
+ window.addEventListener(
+ "beforeunload",
+ function () {
+ unloading = true;
}
);
}());
{% block head %}
{{ block.super }}
<link rel="stylesheet" href="{% static 'channel/styles.css' %}">
- <script src="{% static 'channel/chatutils.js' %}"></script>
<script src="{% static 'channel/main.js' %}"></script>
{% endblock head %}
{% block header %}
from django.urls import path
-from .views import ChannelMainView
-from .websocket import handle_websocket
+from .views import ChannelMainView, PrivateMessageRestView
+from .websocket import WebSocketHandler
+from rest.urls import get_urls
urlpatterns = [
path("", ChannelMainView.as_view(), name="channel-main"),
]
-websocket_urls = {"/": handle_websocket}
+websocket_urls = {"/": WebSocketHandler().handle}
+
+get_urls(PrivateMessageRestView, "privatemessage")
serializer = PrivateMessageSerializer
def get_queryset(self):
- return (
- super()
- .get_queryset()
- .filter(Q(sender=self.request.user.pk) | Q(recipient=self.request.user.pk))
- )
+ queryset = super().get_queryset()
+ user_id = self.request.user.pk
+ if "user" in self.request.GET:
+ other_id = int(self.request.GET["user"])
+ queryset = queryset.filter(
+ Q(sender=user_id, recipient=other_id)
+ | Q(sender=other_id, recipient=user_id)
+ )
+ else:
+ queryset = queryset.filter(Q(sender=user_id) | Q(recipient=user_id))
+ return queryset
class ChannelRestView(PrivilegeRequiredMixin, ModelRestView):
import json
-from asyncio import (
- CancelledError,
- ensure_future,
- get_running_loop,
-)
-from contextlib import contextmanager
-from functools import partial
+from asyncio import CancelledError, Event, Queue, gather, get_running_loop
+from contextlib import asynccontextmanager
+from importlib import import_module
from io import BytesIO
+from logging import getLogger
from traceback import print_exc
+from types import SimpleNamespace
from asgiref.sync import sync_to_async
+from django.conf import settings
from django.core.handlers.asgi import ASGIRequest
-from django.contrib.sessions.middleware import SessionMiddleware
from django.contrib.auth import aget_user
from django.db import connection
+from django.urls import get_resolver, Resolver404
from chat.triggers import TriggerChannel
+session_engine = import_module(settings.SESSION_ENGINE)
+logger = getLogger(__name__)
+ShutDownSentinel = type("ShutDownSentinelType", (), {})()
+
+
+class WebsocketConnection:
+ @staticmethod
+ async def aget_user(scope):
+ headers = dict(scope.get("headers", []))
+ session_key = None
+ cookie_str = headers.get(b"cookie", b"").decode()
+ cookie_name = "sessionid="
+ pos = cookie_str.find(cookie_name)
+ if pos >= 0:
+ pos += len(cookie_name)
+ end_pos = cookie_str.find(";", pos)
+ if end_pos == -1:
+ end_pos = None
+ session_key = cookie_str[pos:end_pos]
+
+ scope["user"] = await aget_user(
+ SimpleNamespace( # noqa
+ session=session_engine.SessionStore(session_key=session_key)
+ )
+ )
+
+ def __init__(self, scope, receive, send):
+ self.scope = scope
+ self.receive = receive
+ self.send = send
+ self.loop = get_running_loop()
+ self.active_event = Event()
+ self.notification_queue = Queue()
+ self.resolver = get_resolver()
+
+ def process_triggers(self, notification):
+ data = json.loads(notification.payload)
+ if data["table"] == "user_user":
+ pass
+ elif data["table"] == "channel_privatemessage":
+ user_id = self.scope["user"].pk
+ obj = data["obj"]
+ if user_id != obj["sender_id"] and user_id != obj["recipient_id"]:
+ return
+ else:
+ return
+ self.loop.call_soon_threadsafe(
+ self.notification_queue.put_nowait,
+ {"type": "websocket.send", "text": notification.payload},
+ )
-@contextmanager
-def listen_notify_handler(connection, callback):
- loop = get_running_loop()
- loop.add_reader(connection.fileno(), partial(connection.execute, "SELECT 1"))
- connection.add_notify_handler(callback)
- for name in TriggerChannel.registry:
- connection.execute(f"LISTEN {name}")
- try:
- yield
- finally:
- for name in TriggerChannel.registry:
- connection.execute(f"UNLISTEN {name}")
- connection.remove_notify_handler(callback)
- loop.remove_reader(connection.fileno())
-
-
-def filter_trigger_always(coro, data, user, user_channels):
- ensure_future(coro)
-
-
-def filter_trigger_privatemessage(coro, data, user, user_channels):
- if user.pk in (data["obj"]["sender_id"], data["obj"]["recipient_id"]):
- ensure_future(coro)
- else:
- coro.close()
-
-
-filter_triggers = {
- "user_user": filter_trigger_always,
- "channel_channel": filter_trigger_always,
- # "channel_channelmessage": filter_trigger_channelmessage,
- # "channel_channeluser": filter_trigger_channeluser,
- "channel_privatemessage": filter_trigger_privatemessage,
-}
-
-
-def process_triggers(send, user, user_channels, notification):
- data = json.loads(notification.payload)
- filter_triggers[data["table"]](
- send({"type": "websocket.send", "text": notification.payload}),
- data,
- user,
- user_channels,
- )
-
+ @staticmethod
+ def db_idle(db_conn):
+ try:
+ db_conn.poll()
+ except Exception:
+ pass
-def get_user_channels(user):
- return set(user.channels.all().values_list("pk", flat=True))
+ @asynccontextmanager
+ async def listen_notify_handler(self):
+ await sync_to_async(connection.connect)()
+ db_conn = connection.connection
+ db_conn.add_notify_handler(self.process_triggers)
+ self.loop.add_reader(db_conn.fileno(), self.db_idle, db_conn)
+ for name in TriggerChannel.registry:
+ await sync_to_async(db_conn.execute)(f"LISTEN {name}")
+ try:
+ yield
+ finally:
+ for name in TriggerChannel.registry:
+ try:
+ await sync_to_async(db_conn.execute)(f"UNLISTEN {name}")
+ except Exception:
+ pass
+ self.loop.remove_reader(db_conn.fileno())
+ db_conn.remove_notify_handler(self.process_triggers)
+ await sync_to_async(connection.close)()
+
+ async def process_ws(self):
+ try:
+ while True:
+ event = await self.receive()
+ if event["type"] == "websocket.connect":
+ await self.send({"type": "websocket.accept"})
+ self.active_event.set()
+ elif event["type"] == "websocket.disconnect":
+ break
+ elif event["type"] == "websocket.receive":
+ await self.send(
+ {
+ "type": "websocket.send",
+ "text": json.dumps(
+ await self.get_api_response(json.loads(event["text"]))
+ ),
+ }
+ )
+ finally:
+ self.active_event.clear()
+ # wake up send_loop() to end it
+ await self.notification_queue.put(ShutDownSentinel)
+
+ async def get_api_request(self, request_data):
+ body = request_data.get("body", "")
+ if not isinstance(body, str):
+ body = json.dumps(body)
+ ws_scope = {
+ **self.scope,
+ "type": "http",
+ "method": request_data["method"],
+ "path": request_data["path"],
+ "serial": request_data["serial"],
+ }
+ if "query_string" in request_data:
+ ws_scope["query_string"] = request_data["query_string"]
+ else:
+ ws_scope.pop("query_string", None)
+ request = ASGIRequest(ws_scope, BytesIO(body.encode()))
+ request.user = self.scope["user"]
+ request.resolver_match = self.resolver.resolve(request.path_info)
+ if not request.resolver_match.url_name.startswith("api-"):
+ raise Resolver404
+ return request
+
+ async def get_api_response(self, request_data):
+ try:
+ request = await self.get_api_request(request_data)
+ except Resolver404:
+ return {
+ "status_code": 404,
+ "content": "resource not found",
+ "serial": request_data["serial"],
+ }
+ else:
+ logger.info(f"ws api call: {request.method} {request.path}")
+ resolver_match = request.resolver_match
+ response = await sync_to_async(resolver_match.func)(
+ request,
+ *resolver_match.args,
+ **resolver_match.kwargs,
+ )
+ return {
+ "status_code": response.status_code,
+ "content": response.content.decode(),
+ "serial": request.scope["serial"],
+ }
+
+ async def send_loop(self):
+ await self.active_event.wait()
+ while self.active_event.is_set():
+ item = await self.notification_queue.get()
+ if item is ShutDownSentinel:
+ break
+ try:
+ await self.send(item)
+ finally:
+ self.notification_queue.task_done()
-async def process_ws(receive, send):
- while True:
- event = await receive()
- if event["type"] == "websocket.connect":
- await send({"type": "websocket.accept"})
- elif event["type"] == "websocket.disconnect":
- return
- elif event["type"] == "websocket.receive":
- # ...maybe make it possible to request data through the ws?
- if event["text"] == "ping":
- await send(
- {
- "type": "websocket.send",
- "text": "pong",
- },
- )
-
-
-async def handle_websocket(scope, receive, send):
- request = ASGIRequest({**scope, "method": "_ws"}, BytesIO())
- SessionMiddleware(lambda x: None).process_request(request)
- request.user = await aget_user(request)
-
- if not request.user.is_authenticated:
- await send({"type": "websocket.close"})
- return
-
- await sync_to_async(connection.connect)()
- with listen_notify_handler(
- connection.connection,
- partial(
- process_triggers,
- send,
- request.user,
- await sync_to_async(get_user_channels)(request.user),
- ),
- ):
- try:
- await process_ws(receive, send)
- except CancelledError:
- pass
- except Exception:
- print_exc()
+class WebSocketHandler:
+ async def handle(self, scope, receive, send):
+ ws_conn = WebsocketConnection(scope, receive, send)
+ await ws_conn.aget_user(scope)
+ async with ws_conn.listen_notify_handler():
try:
- await send({"type": "websocket.close"})
- except Exception:
+ await gather(ws_conn.process_ws(), ws_conn.send_loop())
+ except CancelledError:
pass
+ except Exception:
+ print_exc()
+ try:
+ await send({"type": "websocket.close"})
+ except Exception:
+ pass
+/*global chatutils*/
(function () {
+ var messages = chatutils.query("ul", "messages");
+
function fade_messages(event) {
var li = event.target;
if (li.tagName !== "LI" || li.classList.contains("fade-out")) {
{once: true}
);
}
+
document.addEventListener(
"readystatechange",
function () {
- var sel;
- if (document.readyState !== "complete") {
- return;
- }
- sel = document.querySelector("ul.messages");
- if (sel !== null) {
- sel.addEventListener("click", fade_messages);
+ if (document.readyState === "complete" && messages !== null) {
+ messages.addEventListener("click", fade_messages);
}
}
);
window.chatutils = {
"foreach": foreach,
+ "query": function (tag_name, class_name) {
+ var matches = document.getElementsByClassName(class_name);
+ var i;
+ tag_name = tag_name.toUpperCase();
+ for (i = 0; i < matches.length; i += 1) {
+ if (matches[i].tagName.toUpperCase() === tag_name) {
+ return matches[i];
+ }
+ }
+ return null;
+ },
"select": function (selector) {
if (selector.substring(0, 1) === "#") {
return document.getElementById(selector.substring(1));
out.push(s);
return out.join("");
},
- "xhr": function (method, url, data, load_cb, abort_cb, error_cb) {
+ "xhr": function (method, url, data, ready_cb, load_cb, abort_cb) {
var request = new XMLHttpRequest();
+ if (ready_cb) {
+ request.addEventListener(
+ "readystatechange",
+ function (event) {
+ if (event.target.readyState === 2) {
+ ready_cb();
+ }
+ }
+ );
+ }
request.addEventListener(
"load",
load_cb || function (event) {
console.log("xhr_abort", event.target.response);
}
);
- request.addEventListener(
- "error",
- error_cb || function (event) {
- console.log("xhr_error", event.target.response);
- }
- );
request.open(method, url);
if (data_methods.includes(method.toLowerCase())) {
request.setRequestHeader("Content-Type", "application/json");
<meta name="keywords" content="{{ meta_keywords }}">
{% endblock meta %}
<link rel="stylesheet" href="{% static 'chat/styles.css' %}">
+ <script src="{% static 'chat/chatutils.js' %}"></script>
<script src="{% static 'chat/base.js' %}"></script>
<title>
{% block title %}
def fields_to_json_build_object(table, fields):
- args = []
- for field in fields:
- args.extend(("'field'", f'"{table}"."{field}"'))
+ args = (f"'{field}', {table}.\"{field}\"" for field in fields)
return f"json_build_object({', '.join(args)})"
'op', TG_OP,
'table', TG_TABLE_NAME,
'obj', {fields_to_json_build_object("OLD", fields)}
- )
+ )::text
);
RETURN NULL;
""",
'op', TG_OP,
'table', TG_TABLE_NAME,
'obj', {fields_to_json_build_object("NEW", fields)}
- )
+ )::text
);
RETURN NULL;
""",
func=f"""
PERFORM pg_notify(
'{self.name}',
- json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)
+ json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text
);
RETURN NULL;
""",
-from datetime import datetime
+from datetime import UTC, datetime
+from logging import getLogger
from typing import Type
from django.db.models import (
)
from django.urls import reverse
+logger = getLogger(__name__)
+
class ModelSerializer:
model: Type[Model]
return value
field = self.model._meta.get_field(field_name)
value = getattr(instance, field_name)
- if isinstance(field, DateTimeField):
- value = value.isoformat(timespec="milliseconds")
- elif isinstance(field, (ManyToManyField, ManyToManyRel)):
- value = [v.pk for v in value.all()]
+ if value is not None:
+ if isinstance(field, DateTimeField):
+ value = value.isoformat(timespec="milliseconds")
+ elif isinstance(field, (ManyToManyField, ManyToManyRel)):
+ value = [v.pk for v in value.all()]
return value
def to_json(self, instance):
return instance, m2m
def save(self, data, instance):
+ ts = datetime.now().replace(tzinfo=UTC)
+ logger.info(f"[{ts.isoformat(' ')}] API pk: {instance.pk} data: {data}")
instance, m2m = self.from_json(data, instance)
instance.save()
for field_name, value in m2m.items():
-# Generated by Django 6.0.4 on 2026-05-10 15:04
+# Generated by Django 6.0.4 on 2026-05-10 16:08
import django.contrib.auth.validators
import django.utils.timezone
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_delete",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."username", \'field\', "OLD"."email", \'field\', "OLD"."first_name", \'field\', "OLD"."last_name", \'field\', "OLD"."date_joined")\n )\n );\n RETURN NULL;\n ',
- hash="9cf43edc5ce554d755b1e3978cfc5bf687c8f5ed",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'username', OLD.\"username\", 'email', OLD.\"email\", 'first_name', OLD.\"first_name\", 'last_name', OLD.\"last_name\", 'date_joined', OLD.\"date_joined\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="bcee39bce4ea6cf3d69f7c702a80fcb0113219d2",
operation="DELETE",
pgid="pgtrigger_chat_channel_delete_a66e5",
table="user_user",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_insert_update",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."username", \'field\', "NEW"."email", \'field\', "NEW"."first_name", \'field\', "NEW"."last_name", \'field\', "NEW"."date_joined")\n )\n );\n RETURN NULL;\n ',
- hash="d48d015aa62321255d7ce9cde38703fe03d87881",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'username', NEW.\"username\", 'email', NEW.\"email\", 'first_name', NEW.\"first_name\", 'last_name', NEW.\"last_name\", 'date_joined', NEW.\"date_joined\")\n )::text\n );\n RETURN NULL;\n ",
+ hash="c64b0aad3a81f9de89768c6437a129acf7d653ed",
operation="INSERT OR UPDATE",
pgid="pgtrigger_chat_channel_insert_update_72ed2",
table="user_user",
trigger=pgtrigger.compiler.Trigger(
name="chat_channel_truncate",
sql=pgtrigger.compiler.UpsertTriggerSql(
- func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ",
- hash="c7a19f0c2b05edc799fdee39cd011bddd2456b7d",
+ func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ",
+ hash="f18566b929de311157d6db53ceb4dcf6cb8a924d",
level="STATEMENT",
operation="TRUNCATE",
pgid="pgtrigger_chat_channel_truncate_8a96a",