]> git.mar77i.info Git - hublib/blob - hub/websocket.py
89a7e2f3cfa4fd79baed82bc75c6581620eccab5
[hublib] / hub / websocket.py
1 import asyncio
2 import json
3 import pickle
4 import sys
5 from asyncio.exceptions import CancelledError
6 from functools import partial
7 from traceback import print_exception
8
9 from falcon import WebSocketDisconnected
10
11
12 class WebSocketApp:
13 def __init__(self, hubapp):
14 self.name = hubapp.name
15 self.conn = hubapp.root.conn
16
17 @staticmethod
18 async def process_websocket(conn, web_socket, extra_data=None, recipients=None):
19 try:
20 while True:
21 data = json.loads(await web_socket.receive_text())
22 if extra_data:
23 data.update(extra_data)
24 if callable(recipients):
25 current_recipients = recipients(data)
26 elif recipients:
27 current_recipients = recipients
28 else:
29 raise ValueError("no recipients specified")
30 for recipient in current_recipients:
31 await conn.publish(recipient, pickle.dumps(data))
32 except (CancelledError, WebSocketDisconnected):
33 pass
34
35 @staticmethod
36 async def process_pubsub(pubsub, web_socket):
37 try:
38 while True:
39 data = await pubsub.get_message(True, 0.3)
40 if not web_socket.ready or web_socket.closed:
41 break
42 if data is not None:
43 await web_socket.send_text(json.dumps(pickle.loads(data["data"])))
44 except (CancelledError, WebSocketDisconnected):
45 pass
46
47 async def on_websocket(
48 self,
49 req,
50 web_socket,
51 pubsub_name=None,
52 process_websockets_kwargs=None,
53 join_cb=None,
54 leave_cb=None,
55 ):
56 await web_socket.accept()
57 pubsub = self.conn.pubsub()
58 if pubsub_name:
59 await pubsub.subscribe(pubsub_name)
60 if callable(join_cb):
61 await join_cb()
62 try:
63 await asyncio.gather(
64 self.process_websocket(
65 self.conn, web_socket, **(process_websockets_kwargs or {})
66 ),
67 self.process_pubsub(pubsub, web_socket),
68 return_exceptions=True,
69 )
70 except (CancelledError, WebSocketDisconnected):
71 pass
72 except Exception:
73 print_exception(*sys.exc_info())
74 finally:
75 await web_socket.close()
76 if callable(leave_cb):
77 await leave_cb()
78
79 async def client_notify(self, redis, action, client_id):
80 await redis.publish(
81 f"{self.name}-master",
82 pickle.dumps({"action": action, "client_id": client_id}),
83 )
84
85 async def on_websocket_client(self, req, web_socket):
86 client_id = await self.conn.incr("client_id")
87 return await self.on_websocket(
88 req,
89 web_socket,
90 f"{self.name}-client-{client_id}",
91 {
92 "extra_data": {"client_id": client_id},
93 "recipients": [f"{self.name}-master"],
94 },
95 partial(self.client_notify, self.conn, "join", client_id),
96 partial(self.client_notify, self.conn, "leave", client_id),
97 )
98
99 async def on_websocket_master(self, req, web_socket):
100 return await self.on_websocket(
101 req,
102 web_socket,
103 f"{self.name}-master",
104 {"recipients": self.get_master_recipients},
105 )
106
107 def get_master_recipients(self, data):
108 return [
109 f"{self.name}-client-{int(client_id)}"
110 for client_id in data.pop("client_ids", ())
111 ]