From: mar77i Date: Wed, 13 May 2026 14:24:12 +0000 (+0200) Subject: redo the websocket basics. evolve the frontend some X-Git-Url: https://git.mar77i.info/?a=commitdiff_plain;ds=sidebyside;p=chat redo the websocket basics. evolve the frontend some --- diff --git a/channel/migrations/0002_initial.py b/channel/migrations/0002_initial.py index 00f69ab..7bfb871 100644 --- a/channel/migrations/0002_initial.py +++ b/channel/migrations/0002_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 6.0.4 on 2026-05-10 15:04 +# Generated by Django 6.0.4 on 2026-05-10 16:08 import django.db.models.deletion import pgtrigger.compiler @@ -77,8 +77,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_delete", sql=pgtrigger.compiler.UpsertTriggerSql( - func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."posted_ts", \'field\', "OLD"."edited_ts", \'field\', "OLD"."user_id", \'field\', "OLD"."channel_id", \'field\', "OLD"."text")\n )\n );\n RETURN NULL;\n ', - hash="07fe162a2e795edf3c9f3a558a07a129c0d932a7", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'posted_ts', OLD.\"posted_ts\", 'edited_ts', OLD.\"edited_ts\", 'user_id', OLD.\"user_id\", 'channel_id', OLD.\"channel_id\", 'text', OLD.\"text\")\n )::text\n );\n RETURN NULL;\n ", + hash="7ba4cb9385597146a3744642fee6fd90273ab194", operation="DELETE", pgid="pgtrigger_chat_channel_delete_e3727", table="channel_channelmessage", @@ -91,8 +91,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_insert_update", sql=pgtrigger.compiler.UpsertTriggerSql( - func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."posted_ts", \'field\', "NEW"."edited_ts", \'field\', "NEW"."user_id", \'field\', "NEW"."channel_id", \'field\', "NEW"."text")\n )\n );\n RETURN NULL;\n ', - hash="0d1f1559c08c23f871043630f4808a9e20716850", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'posted_ts', NEW.\"posted_ts\", 'edited_ts', NEW.\"edited_ts\", 'user_id', NEW.\"user_id\", 'channel_id', NEW.\"channel_id\", 'text', NEW.\"text\")\n )::text\n );\n RETURN NULL;\n ", + hash="f87d55936e3130f4e625ccbdbe1f8c7a3fe2565f", operation="INSERT OR UPDATE", pgid="pgtrigger_chat_channel_insert_update_a5e85", table="channel_channelmessage", @@ -105,8 +105,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_truncate", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ", - hash="c79a373af4522bc4774ae554e5e93789aea06d05", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ", + hash="57512b4dada90146242c467082af3ce1b0c6f05f", level="STATEMENT", operation="TRUNCATE", pgid="pgtrigger_chat_channel_truncate_ab388", @@ -120,8 +120,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_delete", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"OLD\".\"id\", 'field', \"OLD\".\"added_ts\", 'field', \"OLD\".\"user_id\", 'field', \"OLD\".\"channel_id\")\n )\n );\n RETURN NULL;\n ", - hash="60bd75faca7769c12f0a42219e5abafcf46de6ca", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'added_ts', OLD.\"added_ts\", 'user_id', OLD.\"user_id\", 'channel_id', OLD.\"channel_id\")\n )::text\n );\n RETURN NULL;\n ", + hash="6cc20d50ebbcea8caeb3bd13275bf535d040f530", operation="DELETE", pgid="pgtrigger_chat_channel_delete_d1dad", table="channel_channeluser", @@ -134,8 +134,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_insert_update", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"NEW\".\"id\", 'field', \"NEW\".\"added_ts\", 'field', \"NEW\".\"user_id\", 'field', \"NEW\".\"channel_id\")\n )\n );\n RETURN NULL;\n ", - hash="94006ba6a11d60a0c08e9e9812a1dd4dc2be4671", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'added_ts', NEW.\"added_ts\", 'user_id', NEW.\"user_id\", 'channel_id', NEW.\"channel_id\")\n )::text\n );\n RETURN NULL;\n ", + hash="4147fdb70b2195e8662cef56fb9019c79ff648ae", operation="INSERT OR UPDATE", pgid="pgtrigger_chat_channel_insert_update_4abda", table="channel_channeluser", @@ -148,8 +148,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_truncate", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ", - hash="24b3dfe313c8d9fb49df15607acae940d611dd8a", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ", + hash="8fe4e4c6978aaefec007e16e93ac0002ccab5ff8", level="STATEMENT", operation="TRUNCATE", pgid="pgtrigger_chat_channel_truncate_be185", @@ -163,8 +163,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_delete", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"OLD\".\"id\", 'field', \"OLD\".\"created_ts\", 'field', \"OLD\".\"name\")\n )\n );\n RETURN NULL;\n ", - hash="e4beddaa6aeea6939a68be72310dbf204993946d", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'created_ts', OLD.\"created_ts\", 'name', OLD.\"name\")\n )::text\n );\n RETURN NULL;\n ", + hash="c0a6078826bfdf3cb70c5b92af27e740fbefce0b", operation="DELETE", pgid="pgtrigger_chat_channel_delete_71f70", table="channel_channel", @@ -177,8 +177,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_insert_update", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('field', \"NEW\".\"id\", 'field', \"NEW\".\"created_ts\", 'field', \"NEW\".\"name\")\n )\n );\n RETURN NULL;\n ", - hash="4215198b8261bd097742e9227732675a3d0bdc1c", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'created_ts', NEW.\"created_ts\", 'name', NEW.\"name\")\n )::text\n );\n RETURN NULL;\n ", + hash="003239b69bd859926cf38ab07601ac6d1280667d", operation="INSERT OR UPDATE", pgid="pgtrigger_chat_channel_insert_update_0f4bc", table="channel_channel", @@ -191,8 +191,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_truncate", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ", - hash="80b7a9088bce0dcb1e3e7adf6b54e5473272f203", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ", + hash="ba2eca8121197d1ff8d9db8908d9a1819dbb7e17", level="STATEMENT", operation="TRUNCATE", pgid="pgtrigger_chat_channel_truncate_2f496", @@ -206,8 +206,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_delete", sql=pgtrigger.compiler.UpsertTriggerSql( - func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."posted_ts", \'field\', "OLD"."edited_ts", \'field\', "OLD"."sender_id", \'field\', "OLD"."recipient_id", \'field\', "OLD"."text")\n )\n );\n RETURN NULL;\n ', - hash="04d3d16eba42ed3fcd7bae3e0c45b9fdcd0ec05f", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'posted_ts', OLD.\"posted_ts\", 'edited_ts', OLD.\"edited_ts\", 'sender_id', OLD.\"sender_id\", 'recipient_id', OLD.\"recipient_id\", 'text', OLD.\"text\")\n )::text\n );\n RETURN NULL;\n ", + hash="72ff576a3277d314ec9e6446c0f67e9d95e86bf6", operation="DELETE", pgid="pgtrigger_chat_channel_delete_5cdfa", table="channel_privatemessage", @@ -220,8 +220,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_insert_update", sql=pgtrigger.compiler.UpsertTriggerSql( - func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."posted_ts", \'field\', "NEW"."edited_ts", \'field\', "NEW"."sender_id", \'field\', "NEW"."recipient_id", \'field\', "NEW"."text")\n )\n );\n RETURN NULL;\n ', - hash="078224b936ff9395c5ea4cae1729671d25b0b1f5", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'posted_ts', NEW.\"posted_ts\", 'edited_ts', NEW.\"edited_ts\", 'sender_id', NEW.\"sender_id\", 'recipient_id', NEW.\"recipient_id\", 'text', NEW.\"text\")\n )::text\n );\n RETURN NULL;\n ", + hash="7b9480ce21e70b78c5890c896936ec25ec4498da", operation="INSERT OR UPDATE", pgid="pgtrigger_chat_channel_insert_update_4c85f", table="channel_privatemessage", @@ -234,8 +234,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_truncate", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ", - hash="f329902c9edc996be6f7b56c8a3e318aed50f1c3", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ", + hash="6f2966fe582724501863bf905470cdd82634d33d", level="STATEMENT", operation="TRUNCATE", pgid="pgtrigger_chat_channel_truncate_d9c27", diff --git a/channel/serializers.py b/channel/serializers.py index 903f702..7e0ad06 100644 --- a/channel/serializers.py +++ b/channel/serializers.py @@ -13,6 +13,7 @@ class PrivateMessageSerializer(ModelSerializer): "recipient_id", "text", ] + create_fields = ["sender_id", "recipient_id", "text"] update_fields = ["text"] def from_json(self, data, instance=None): diff --git a/channel/static/channel/main.js b/channel/static/channel/main.js index 7be0c27..7b0bbd0 100644 --- a/channel/static/channel/main.js +++ b/channel/static/channel/main.js @@ -1,36 +1,232 @@ +/*global chatutils*/ (function () { /* - - make it as simple as possible to connect and reconnect the websocket - let's use fragments for navigation: - #user: for private messages - #channel: for channels - var text_node = document.createTextNode; - var create_tag = document.createElement; */ + var text_node = document.createTextNode.bind(document); + var create_tag = document.createElement.bind(document); + var messages; + var unloading = false; + var websocket = null; + var serial = 1; + var callbacks = {}; + var websocket_schemas = {"http:": "ws:", "https:": "wss:"}; + var current_channel = null; + var current_user = null; + + function reconnect_button_add() { + var button; + var p = chatutils.query("p", "reconnect_button"); + if (p) { + reconnect_button_display(true); + reconnect_button_disabled(false); + return; + } + p = create_tag("P"); + button = create_tag("BUTTON"); + button.appendChild(text_node("Reconnect")); + button.addEventListener("click", websocket_connect); + p.appendChild(button); + p.classList.add("reconnect_button"); + messages.appendChild(p); + } + + function reconnect_button_display(state) { + var p = chatutils.query("p", "reconnect_button"); + if (p) { + p.style.display = ( + state + ? "block" + : "none" + ); + } + } + + function reconnect_button_disabled(state) { + var p = chatutils.query("p", "reconnect_btn"); + var matches; + var i; + if (p) { + matches = p.getElementsByTagName("button"); + for (i = 0; i < matches.length; i += 1) { + matches[i].disabled = state; + } + } + } + + function get_ul(div) { + var matches = div.getElementsByTagName("UL"); + if (matches.length === 0) { + return div.appendChild(create_tag("UL")); + } + return matches[0]; + } + + function setup_user_list(content) { + var ul = get_ul(chatutils.query("div", "user-list")); + var i; + var li; + while (ul.children.length > 0) { + ul.removeChild(ul.firstElementChild); + } + for (i = 0; i < content.result.length; i += 1) { + if (content.result[i].is_authenticated) { + current_user = content.result[i]; + } + li = create_tag("LI"); + li.appendChild(create_tag("A")).appendChild( + text_node(content.result[i].username) + ); + li.children[0].href = "#user:" + content.result[i].id; + ul.appendChild(li); + } + } + + function add_message(msg) { + var p = create_tag("P"); + p.appendChild(text_node(msg)); + messages.insertBefore(p, chatutils.query("p", "reconnect_button")); + } function websocket_message(event) { - console.log("websocket_message", event); + var data = JSON.parse(event.data); + if (data.hasOwnProperty("serial")) { + if (callbacks.hasOwnProperty(data.serial)) { + callbacks[data.serial](JSON.parse(data.content)); + delete callbacks[event.data.serial]; + } else { + add_message("missing callback" + data); + } + return; + } + add_message(event.data); + } + + function websocket_close() { + websocket = null; + if (unloading === false) { + reconnect_button_add(); + } } - function websocket_close(event) { - console.log("websocket_close", event); + function ws_api_request(method, path, data, callback) { + var obj = {"method": method, "path": path}; + obj.serial = serial; + callbacks[serial] = callback; + serial += 1; + if (data) { + switch (method) { + case "GET": + obj.query_string = data; + break; + case "POST": + obj.body = data; + break; + } + } + websocket.send(JSON.stringify(obj)); } - function websocket_error(event) { - console.log("websocket_error", event); + function websocket_open() { + reconnect_button_display(false); + ws_api_request("GET", "/api/user/", null, setup_user_list); + hash_change(); } function websocket_connect() { - var websocket_schemas = {"http:": "ws:", "https:": "wss:"}; - var websocket = new WebSocket( + websocket = new WebSocket( websocket_schemas[window.location.protocol] + "//" + window.location.host + "/" ); + websocket.addEventListener("open", websocket_open); websocket.addEventListener("message", websocket_message); websocket.addEventListener("close", websocket_close); - websocket.addEventListener("error", websocket_error); + reconnect_button_disabled(true); + } + + function clear_messages() { + var child; + var i; + for (i = messages.children.length - 1; i >= 0; i -= 1) { + child = messages.children[i]; + if (!child.classList.contains("reconnect_button")) { + messages.removeChild(child); + } + } + } + + function populate_messages(data) { + var i; + var p; + for (i = data.result.length - 1; i >= 0; i -= 1) { + p = create_tag("P"); + p.appendChild(text_node(data.result[i].sender_id)); + p.appendChild(text_node(": ")); + p.appendChild(text_node(data.result[i].text)); + messages.insertBefore(p, messages.firstChild); + } + } + + function hash_change() { + if (location.hash === current_channel) { + return; + } + current_channel = location.hash; + clear_messages(); + if (location.hash.startsWith("#user:")) { + ws_api_request( + "GET", + "/api/privatemessage/", + "user=" + location.hash.substring(6), + populate_messages + ); + } + } + + function add_message_reply(data) { + var p = create_tag("P"); + p.appendChild(text_node(data.sender_id)); + p.appendChild(text_node(": ")); + p.appendChild(text_node(data.text)); + messages.insertBefore(p, chatutils.query("p", "reconnect_button")); + } + + function add_message_reply_sent(data) { + chatutils.query("div", "messages-footer").getElementsByTagName( + "textarea" + )[0].value = ""; + add_message_reply(data); + } + + function send_message() { + var textarea = chatutils.query( + "div", + "messages-footer" + ).getElementsByTagName("textarea")[0]; + if (websocket === null || !textarea.value) { + return; + } + if (current_channel.startsWith("#user:")) { + ws_api_request( + "POST", + "/api/privatemessage/", + JSON.stringify( + { + "recipient_id": parseInt( + current_channel.substring(6), + 10 + ), + "sender_id": current_user.id, + "text": textarea.value + } + ), + add_message_reply_sent + ); + } } document.addEventListener( @@ -39,7 +235,19 @@ if (document.readyState !== "complete") { return; } + messages = chatutils.query("div", "messages"); websocket_connect(); + chatutils.query("div", "messages-footer").getElementsByTagName( + "BUTTON" + )[0].addEventListener("click", send_message); + } + ); + + window.addEventListener("hashchange", hash_change); + window.addEventListener( + "beforeunload", + function () { + unloading = true; } ); }()); diff --git a/channel/templates/channel/main.html b/channel/templates/channel/main.html index 9ab1ff7..7e6376c 100644 --- a/channel/templates/channel/main.html +++ b/channel/templates/channel/main.html @@ -3,7 +3,6 @@ {% block head %} {{ block.super }} - {% endblock head %} {% block header %} diff --git a/channel/urls.py b/channel/urls.py index bba6c8c..3757d9b 100644 --- a/channel/urls.py +++ b/channel/urls.py @@ -1,9 +1,12 @@ from django.urls import path -from .views import ChannelMainView -from .websocket import handle_websocket +from .views import ChannelMainView, PrivateMessageRestView +from .websocket import WebSocketHandler +from rest.urls import get_urls urlpatterns = [ path("", ChannelMainView.as_view(), name="channel-main"), ] -websocket_urls = {"/": handle_websocket} +websocket_urls = {"/": WebSocketHandler().handle} + +get_urls(PrivateMessageRestView, "privatemessage") diff --git a/channel/views.py b/channel/views.py index 857d492..fcba646 100644 --- a/channel/views.py +++ b/channel/views.py @@ -13,11 +13,17 @@ class PrivateMessageRestView(ModelRestView): serializer = PrivateMessageSerializer def get_queryset(self): - return ( - super() - .get_queryset() - .filter(Q(sender=self.request.user.pk) | Q(recipient=self.request.user.pk)) - ) + queryset = super().get_queryset() + user_id = self.request.user.pk + if "user" in self.request.GET: + other_id = int(self.request.GET["user"]) + queryset = queryset.filter( + Q(sender=user_id, recipient=other_id) + | Q(sender=other_id, recipient=user_id) + ) + else: + queryset = queryset.filter(Q(sender=user_id) | Q(recipient=user_id)) + return queryset class ChannelRestView(PrivilegeRequiredMixin, ModelRestView): diff --git a/channel/websocket.py b/channel/websocket.py index 0b0b105..3309fa7 100644 --- a/channel/websocket.py +++ b/channel/websocket.py @@ -1,117 +1,191 @@ import json -from asyncio import ( - CancelledError, - ensure_future, - get_running_loop, -) -from contextlib import contextmanager -from functools import partial +from asyncio import CancelledError, Event, Queue, gather, get_running_loop +from contextlib import asynccontextmanager +from importlib import import_module from io import BytesIO +from logging import getLogger from traceback import print_exc +from types import SimpleNamespace from asgiref.sync import sync_to_async +from django.conf import settings from django.core.handlers.asgi import ASGIRequest -from django.contrib.sessions.middleware import SessionMiddleware from django.contrib.auth import aget_user from django.db import connection +from django.urls import get_resolver, Resolver404 from chat.triggers import TriggerChannel +session_engine = import_module(settings.SESSION_ENGINE) +logger = getLogger(__name__) +ShutDownSentinel = type("ShutDownSentinelType", (), {})() + + +class WebsocketConnection: + @staticmethod + async def aget_user(scope): + headers = dict(scope.get("headers", [])) + session_key = None + cookie_str = headers.get(b"cookie", b"").decode() + cookie_name = "sessionid=" + pos = cookie_str.find(cookie_name) + if pos >= 0: + pos += len(cookie_name) + end_pos = cookie_str.find(";", pos) + if end_pos == -1: + end_pos = None + session_key = cookie_str[pos:end_pos] + + scope["user"] = await aget_user( + SimpleNamespace( # noqa + session=session_engine.SessionStore(session_key=session_key) + ) + ) + + def __init__(self, scope, receive, send): + self.scope = scope + self.receive = receive + self.send = send + self.loop = get_running_loop() + self.active_event = Event() + self.notification_queue = Queue() + self.resolver = get_resolver() + + def process_triggers(self, notification): + data = json.loads(notification.payload) + if data["table"] == "user_user": + pass + elif data["table"] == "channel_privatemessage": + user_id = self.scope["user"].pk + obj = data["obj"] + if user_id != obj["sender_id"] and user_id != obj["recipient_id"]: + return + else: + return + self.loop.call_soon_threadsafe( + self.notification_queue.put_nowait, + {"type": "websocket.send", "text": notification.payload}, + ) -@contextmanager -def listen_notify_handler(connection, callback): - loop = get_running_loop() - loop.add_reader(connection.fileno(), partial(connection.execute, "SELECT 1")) - connection.add_notify_handler(callback) - for name in TriggerChannel.registry: - connection.execute(f"LISTEN {name}") - try: - yield - finally: - for name in TriggerChannel.registry: - connection.execute(f"UNLISTEN {name}") - connection.remove_notify_handler(callback) - loop.remove_reader(connection.fileno()) - - -def filter_trigger_always(coro, data, user, user_channels): - ensure_future(coro) - - -def filter_trigger_privatemessage(coro, data, user, user_channels): - if user.pk in (data["obj"]["sender_id"], data["obj"]["recipient_id"]): - ensure_future(coro) - else: - coro.close() - - -filter_triggers = { - "user_user": filter_trigger_always, - "channel_channel": filter_trigger_always, - # "channel_channelmessage": filter_trigger_channelmessage, - # "channel_channeluser": filter_trigger_channeluser, - "channel_privatemessage": filter_trigger_privatemessage, -} - - -def process_triggers(send, user, user_channels, notification): - data = json.loads(notification.payload) - filter_triggers[data["table"]]( - send({"type": "websocket.send", "text": notification.payload}), - data, - user, - user_channels, - ) - + @staticmethod + def db_idle(db_conn): + try: + db_conn.poll() + except Exception: + pass -def get_user_channels(user): - return set(user.channels.all().values_list("pk", flat=True)) + @asynccontextmanager + async def listen_notify_handler(self): + await sync_to_async(connection.connect)() + db_conn = connection.connection + db_conn.add_notify_handler(self.process_triggers) + self.loop.add_reader(db_conn.fileno(), self.db_idle, db_conn) + for name in TriggerChannel.registry: + await sync_to_async(db_conn.execute)(f"LISTEN {name}") + try: + yield + finally: + for name in TriggerChannel.registry: + try: + await sync_to_async(db_conn.execute)(f"UNLISTEN {name}") + except Exception: + pass + self.loop.remove_reader(db_conn.fileno()) + db_conn.remove_notify_handler(self.process_triggers) + await sync_to_async(connection.close)() + + async def process_ws(self): + try: + while True: + event = await self.receive() + if event["type"] == "websocket.connect": + await self.send({"type": "websocket.accept"}) + self.active_event.set() + elif event["type"] == "websocket.disconnect": + break + elif event["type"] == "websocket.receive": + await self.send( + { + "type": "websocket.send", + "text": json.dumps( + await self.get_api_response(json.loads(event["text"])) + ), + } + ) + finally: + self.active_event.clear() + # wake up send_loop() to end it + await self.notification_queue.put(ShutDownSentinel) + + async def get_api_request(self, request_data): + body = request_data.get("body", "") + if not isinstance(body, str): + body = json.dumps(body) + ws_scope = { + **self.scope, + "type": "http", + "method": request_data["method"], + "path": request_data["path"], + "serial": request_data["serial"], + } + if "query_string" in request_data: + ws_scope["query_string"] = request_data["query_string"] + else: + ws_scope.pop("query_string", None) + request = ASGIRequest(ws_scope, BytesIO(body.encode())) + request.user = self.scope["user"] + request.resolver_match = self.resolver.resolve(request.path_info) + if not request.resolver_match.url_name.startswith("api-"): + raise Resolver404 + return request + + async def get_api_response(self, request_data): + try: + request = await self.get_api_request(request_data) + except Resolver404: + return { + "status_code": 404, + "content": "resource not found", + "serial": request_data["serial"], + } + else: + logger.info(f"ws api call: {request.method} {request.path}") + resolver_match = request.resolver_match + response = await sync_to_async(resolver_match.func)( + request, + *resolver_match.args, + **resolver_match.kwargs, + ) + return { + "status_code": response.status_code, + "content": response.content.decode(), + "serial": request.scope["serial"], + } + + async def send_loop(self): + await self.active_event.wait() + while self.active_event.is_set(): + item = await self.notification_queue.get() + if item is ShutDownSentinel: + break + try: + await self.send(item) + finally: + self.notification_queue.task_done() -async def process_ws(receive, send): - while True: - event = await receive() - if event["type"] == "websocket.connect": - await send({"type": "websocket.accept"}) - elif event["type"] == "websocket.disconnect": - return - elif event["type"] == "websocket.receive": - # ...maybe make it possible to request data through the ws? - if event["text"] == "ping": - await send( - { - "type": "websocket.send", - "text": "pong", - }, - ) - - -async def handle_websocket(scope, receive, send): - request = ASGIRequest({**scope, "method": "_ws"}, BytesIO()) - SessionMiddleware(lambda x: None).process_request(request) - request.user = await aget_user(request) - - if not request.user.is_authenticated: - await send({"type": "websocket.close"}) - return - - await sync_to_async(connection.connect)() - with listen_notify_handler( - connection.connection, - partial( - process_triggers, - send, - request.user, - await sync_to_async(get_user_channels)(request.user), - ), - ): - try: - await process_ws(receive, send) - except CancelledError: - pass - except Exception: - print_exc() +class WebSocketHandler: + async def handle(self, scope, receive, send): + ws_conn = WebsocketConnection(scope, receive, send) + await ws_conn.aget_user(scope) + async with ws_conn.listen_notify_handler(): try: - await send({"type": "websocket.close"}) - except Exception: + await gather(ws_conn.process_ws(), ws_conn.send_loop()) + except CancelledError: pass + except Exception: + print_exc() + try: + await send({"type": "websocket.close"}) + except Exception: + pass diff --git a/chat/static/chat/base.js b/chat/static/chat/base.js index eb2c384..558ab83 100644 --- a/chat/static/chat/base.js +++ b/chat/static/chat/base.js @@ -1,4 +1,7 @@ +/*global chatutils*/ (function () { + var messages = chatutils.query("ul", "messages"); + function fade_messages(event) { var li = event.target; if (li.tagName !== "LI" || li.classList.contains("fade-out")) { @@ -13,16 +16,12 @@ {once: true} ); } + document.addEventListener( "readystatechange", function () { - var sel; - if (document.readyState !== "complete") { - return; - } - sel = document.querySelector("ul.messages"); - if (sel !== null) { - sel.addEventListener("click", fade_messages); + if (document.readyState === "complete" && messages !== null) { + messages.addEventListener("click", fade_messages); } } ); diff --git a/channel/static/channel/chatutils.js b/chat/static/chat/chatutils.js similarity index 89% rename from channel/static/channel/chatutils.js rename to chat/static/chat/chatutils.js index 5f57503..b6b6dee 100644 --- a/channel/static/channel/chatutils.js +++ b/chat/static/chat/chatutils.js @@ -63,6 +63,17 @@ window.chatutils = { "foreach": foreach, + "query": function (tag_name, class_name) { + var matches = document.getElementsByClassName(class_name); + var i; + tag_name = tag_name.toUpperCase(); + for (i = 0; i < matches.length; i += 1) { + if (matches[i].tagName.toUpperCase() === tag_name) { + return matches[i]; + } + } + return null; + }, "select": function (selector) { if (selector.substring(0, 1) === "#") { return document.getElementById(selector.substring(1)); @@ -167,8 +178,18 @@ out.push(s); return out.join(""); }, - "xhr": function (method, url, data, load_cb, abort_cb, error_cb) { + "xhr": function (method, url, data, ready_cb, load_cb, abort_cb) { var request = new XMLHttpRequest(); + if (ready_cb) { + request.addEventListener( + "readystatechange", + function (event) { + if (event.target.readyState === 2) { + ready_cb(); + } + } + ); + } request.addEventListener( "load", load_cb || function (event) { @@ -185,12 +206,6 @@ console.log("xhr_abort", event.target.response); } ); - request.addEventListener( - "error", - error_cb || function (event) { - console.log("xhr_error", event.target.response); - } - ); request.open(method, url); if (data_methods.includes(method.toLowerCase())) { request.setRequestHeader("Content-Type", "application/json"); diff --git a/chat/templates/chat/base.html b/chat/templates/chat/base.html index 8378e6d..bb5d96d 100644 --- a/chat/templates/chat/base.html +++ b/chat/templates/chat/base.html @@ -11,6 +11,7 @@ {% endblock meta %} + {% block title %} diff --git a/chat/triggers.py b/chat/triggers.py index 469e849..85fbef1 100644 --- a/chat/triggers.py +++ b/chat/triggers.py @@ -2,9 +2,7 @@ from pgtrigger import After, Delete, Insert, Row, Statement, Trigger, Truncate, def fields_to_json_build_object(table, fields): - args = [] - for field in fields: - args.extend(("'field'", f'"{table}"."{field}"')) + args = (f"'{field}', {table}.\"{field}\"" for field in fields) return f"json_build_object({', '.join(args)})" @@ -29,7 +27,7 @@ class TriggerChannel: 'op', TG_OP, 'table', TG_TABLE_NAME, 'obj', {fields_to_json_build_object("OLD", fields)} - ) + )::text ); RETURN NULL; """, @@ -46,7 +44,7 @@ class TriggerChannel: 'op', TG_OP, 'table', TG_TABLE_NAME, 'obj', {fields_to_json_build_object("NEW", fields)} - ) + )::text ); RETURN NULL; """, @@ -59,7 +57,7 @@ class TriggerChannel: func=f""" PERFORM pg_notify( '{self.name}', - json_build_object('op', TG_OP, 'table', TG_TABLE_NAME) + json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text ); RETURN NULL; """, diff --git a/chat/websockets.py b/chat/websockets.py deleted file mode 100644 index e69de29..0000000 diff --git a/rest/serializers.py b/rest/serializers.py index e852b9d..f46687c 100644 --- a/rest/serializers.py +++ b/rest/serializers.py @@ -1,4 +1,5 @@ -from datetime import datetime +from datetime import UTC, datetime +from logging import getLogger from typing import Type from django.db.models import ( @@ -10,6 +11,8 @@ from django.db.models import ( ) from django.urls import reverse +logger = getLogger(__name__) + class ModelSerializer: model: Type[Model] @@ -29,10 +32,11 @@ class ModelSerializer: return value field = self.model._meta.get_field(field_name) value = getattr(instance, field_name) - if isinstance(field, DateTimeField): - value = value.isoformat(timespec="milliseconds") - elif isinstance(field, (ManyToManyField, ManyToManyRel)): - value = [v.pk for v in value.all()] + if value is not None: + if isinstance(field, DateTimeField): + value = value.isoformat(timespec="milliseconds") + elif isinstance(field, (ManyToManyField, ManyToManyRel)): + value = [v.pk for v in value.all()] return value def to_json(self, instance): @@ -69,6 +73,8 @@ class ModelSerializer: return instance, m2m def save(self, data, instance): + ts = datetime.now().replace(tzinfo=UTC) + logger.info(f"[{ts.isoformat(' ')}] API pk: {instance.pk} data: {data}") instance, m2m = self.from_json(data, instance) instance.save() for field_name, value in m2m.items(): diff --git a/user/migrations/0001_initial.py b/user/migrations/0001_initial.py index 1f38c83..89c3dca 100644 --- a/user/migrations/0001_initial.py +++ b/user/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 6.0.4 on 2026-05-10 15:04 +# Generated by Django 6.0.4 on 2026-05-10 16:08 import django.contrib.auth.validators import django.utils.timezone @@ -131,8 +131,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_delete", sql=pgtrigger.compiler.UpsertTriggerSql( - func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "OLD"."id", \'field\', "OLD"."username", \'field\', "OLD"."email", \'field\', "OLD"."first_name", \'field\', "OLD"."last_name", \'field\', "OLD"."date_joined")\n )\n );\n RETURN NULL;\n ', - hash="9cf43edc5ce554d755b1e3978cfc5bf687c8f5ed", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', OLD.\"id\", 'username', OLD.\"username\", 'email', OLD.\"email\", 'first_name', OLD.\"first_name\", 'last_name', OLD.\"last_name\", 'date_joined', OLD.\"date_joined\")\n )::text\n );\n RETURN NULL;\n ", + hash="bcee39bce4ea6cf3d69f7c702a80fcb0113219d2", operation="DELETE", pgid="pgtrigger_chat_channel_delete_a66e5", table="user_user", @@ -145,8 +145,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_insert_update", sql=pgtrigger.compiler.UpsertTriggerSql( - func='\n PERFORM pg_notify(\n \'chat_channel\',\n json_build_object(\n \'op\', TG_OP,\n \'table\', TG_TABLE_NAME,\n \'obj\', json_build_object(\'field\', "NEW"."id", \'field\', "NEW"."username", \'field\', "NEW"."email", \'field\', "NEW"."first_name", \'field\', "NEW"."last_name", \'field\', "NEW"."date_joined")\n )\n );\n RETURN NULL;\n ', - hash="d48d015aa62321255d7ce9cde38703fe03d87881", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object(\n 'op', TG_OP,\n 'table', TG_TABLE_NAME,\n 'obj', json_build_object('id', NEW.\"id\", 'username', NEW.\"username\", 'email', NEW.\"email\", 'first_name', NEW.\"first_name\", 'last_name', NEW.\"last_name\", 'date_joined', NEW.\"date_joined\")\n )::text\n );\n RETURN NULL;\n ", + hash="c64b0aad3a81f9de89768c6437a129acf7d653ed", operation="INSERT OR UPDATE", pgid="pgtrigger_chat_channel_insert_update_72ed2", table="user_user", @@ -159,8 +159,8 @@ class Migration(migrations.Migration): trigger=pgtrigger.compiler.Trigger( name="chat_channel_truncate", sql=pgtrigger.compiler.UpsertTriggerSql( - func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)\n );\n RETURN NULL;\n ", - hash="c7a19f0c2b05edc799fdee39cd011bddd2456b7d", + func="\n PERFORM pg_notify(\n 'chat_channel',\n json_build_object('op', TG_OP, 'table', TG_TABLE_NAME)::text\n );\n RETURN NULL;\n ", + hash="f18566b929de311157d6db53ceb4dcf6cb8a924d", level="STATEMENT", operation="TRUNCATE", pgid="pgtrigger_chat_channel_truncate_8a96a",