]> git.mar77i.info Git - hublib/blob - hub/websocket.py
big cleanup and refactoring #1
[hublib] / hub / websocket.py
1 import asyncio
2 import json
3 import pickle
4 import sys
5 from asyncio.exceptions import CancelledError
6 from functools import partial
7 from traceback import print_exception
8
9 from falcon import WebSocketDisconnected
10
11
12 class BaseWebSocketApp:
13 def __init__(self, hubapp):
14 self.hubapp = hubapp
15 self.conn = self.hubapp.app.hubapps["root"].conn
16
17 def task_done(self):
18 self.task = None
19
20 @staticmethod
21 async def process_websocket(conn, web_socket, extra_data={}, recipients=[]):
22 try:
23 while True:
24 data = json.loads(await web_socket.receive_text())
25 data.update(extra_data)
26 if callable(recipients):
27 current_recipients = recipients(data)
28 else:
29 current_recipients = recipients
30 for recipient in current_recipients:
31 await conn.publish(recipient, pickle.dumps(data))
32 except (CancelledError, WebSocketDisconnected):
33 pass
34
35 @staticmethod
36 async def process_pubsub(pubsub, web_socket):
37 try:
38 while True:
39 data = await pubsub.get_message(True, 0.3)
40 if not web_socket.ready or web_socket.closed:
41 break
42 if data is not None:
43 await web_socket.send_text(json.dumps(pickle.loads(data["data"])))
44 except (CancelledError, WebSocketDisconnected):
45 pass
46
47 async def on_websocket(
48 self,
49 req,
50 web_socket,
51 pubsub_name=None,
52 process_websockets_kwargs=None,
53 join_cb=None,
54 leave_cb=None,
55 ):
56 await web_socket.accept()
57 pubsub = self.conn.pubsub()
58 if pubsub_name:
59 await pubsub.subscribe(pubsub_name)
60 if callable(join_cb):
61 await join_cb()
62 try:
63 await asyncio.gather(
64 self.process_websocket(
65 self.conn, web_socket, **(process_websockets_kwargs or {})
66 ),
67 self.process_pubsub(pubsub, web_socket),
68 return_exceptions=True,
69 )
70 except (CancelledError, WebSocketDisconnected):
71 pass
72 except Exception:
73 print_exception(*sys.exc_info())
74 finally:
75 await web_socket.close()
76 if callable(leave_cb):
77 await leave_cb()
78
79
80 class WebSocketApp(BaseWebSocketApp):
81 async def join_leave_client_notify(self, redis, action, client_id):
82 await redis.publish(
83 f"{self.hubapp.name}-master",
84 pickle.dumps({"action": action, "client_id": client_id}),
85 )
86
87 async def on_websocket_client(self, req, web_socket):
88 client_id = await self.conn.incr("client_id")
89 return await self.on_websocket(
90 req,
91 web_socket,
92 f"{self.hubapp.name}-client-{client_id}",
93 {
94 "extra_data": {"client_id": client_id},
95 "recipients": [f"{self.hubapp.name}-master"],
96 },
97 partial(self.join_leave_client_notify, self.conn, "join", client_id),
98 partial(self.join_leave_client_notify, self.conn, "leave", client_id),
99 )
100
101 async def on_websocket_master(self, req, web_socket):
102 return await self.on_websocket(
103 req,
104 web_socket,
105 f"{self.hubapp.name}-master",
106 {"recipients": self.get_master_recipients},
107 )
108
109 def get_master_recipients(self, data):
110 return [
111 f"{self.hubapp.name}-client-{int(client_id)}"
112 for client_id in data.pop("client_ids", ())
113 ]