]> git.mar77i.info Git - hublib/blob - hub/websocket.py
0ce426ad30269d223eb10cb17a732f6423e1b06d
[hublib] / hub / websocket.py
1 import asyncio
2 import json
3 import pickle
4 import sys
5 from asyncio.exceptions import CancelledError
6 from functools import partial
7 from traceback import print_exception
8
9 from falcon import WebSocketDisconnected
10
11 from .static import TreeFileApp
12
13
14 class WebSocketApp:
15 def __init__(self, hubapp: TreeFileApp):
16 self.name = hubapp.name
17 self.conn = hubapp.root.conn
18
19 @staticmethod
20 async def process_websocket(conn, web_socket, extra_data=None, recipients=None):
21 try:
22 while True:
23 data = json.loads(await web_socket.receive_text())
24 if extra_data:
25 data.update(extra_data)
26 if callable(recipients):
27 current_recipients = recipients(data)
28 elif recipients:
29 current_recipients = recipients
30 else:
31 raise ValueError("no recipients specified")
32 for recipient in current_recipients:
33 await conn.publish(recipient, pickle.dumps(data))
34 except (CancelledError, WebSocketDisconnected):
35 pass
36
37 @staticmethod
38 async def process_pubsub(pubsub, web_socket):
39 try:
40 while True:
41 data = await pubsub.get_message(True, 0.3)
42 if not web_socket.ready or web_socket.closed:
43 break
44 if data is not None:
45 await web_socket.send_text(json.dumps(pickle.loads(data["data"])))
46 except (CancelledError, WebSocketDisconnected):
47 pass
48
49 async def on_websocket(
50 self,
51 req,
52 web_socket,
53 pubsub_name=None,
54 process_websockets_kwargs=None,
55 join_cb=None,
56 leave_cb=None,
57 ):
58 await web_socket.accept()
59 pubsub = self.conn.pubsub()
60 if pubsub_name:
61 await pubsub.subscribe(pubsub_name)
62 if callable(join_cb):
63 await join_cb()
64 try:
65 await asyncio.gather(
66 self.process_websocket(
67 self.conn, web_socket, **(process_websockets_kwargs or {})
68 ),
69 self.process_pubsub(pubsub, web_socket),
70 return_exceptions=True,
71 )
72 except (CancelledError, WebSocketDisconnected):
73 pass
74 except Exception:
75 print_exception(*sys.exc_info())
76 finally:
77 await web_socket.close()
78 if callable(leave_cb):
79 await leave_cb()
80
81 async def client_notify(self, redis, action, client_id):
82 await redis.publish(
83 f"{self.name}-master",
84 pickle.dumps({"action": action, "client_id": client_id}),
85 )
86
87 async def on_websocket_client(self, req, web_socket):
88 client_id = await self.conn.incr("client_id")
89 return await self.on_websocket(
90 req,
91 web_socket,
92 f"{self.name}-client-{client_id}",
93 {
94 "extra_data": {"client_id": client_id},
95 "recipients": [f"{self.name}-master"],
96 },
97 partial(self.client_notify, self.conn, "join", client_id),
98 partial(self.client_notify, self.conn, "leave", client_id),
99 )
100
101 async def on_websocket_master(self, req, web_socket):
102 return await self.on_websocket(
103 req,
104 web_socket,
105 f"{self.name}-master",
106 {"recipients": self.get_master_recipients},
107 )
108
109 def get_master_recipients(self, data):
110 return [
111 f"{self.name}-client-{int(client_id)}"
112 for client_id in data.pop("client_ids", ())
113 ]