]> git.mar77i.info Git - hublib/blob - hub/hub.py
da78e1f27b57bc31a50b7cce0f4dfcb3b296b753
[hublib] / hub / hub.py
1 import asyncio
2 from asyncio.exceptions import CancelledError
3 import json
4 import pickle
5 import sys
6 from traceback import print_exception
7
8 from falcon import WebSocketDisconnected
9 from redis.asyncio import StrictRedis
10
11 from .utils import get_redis_pass, scramble
12
13
14 class Hub:
15 def __init__(self, secret):
16 self.master_ws_uri = f"/{scramble(secret, 'ws')}"
17 self.redis = StrictRedis(password=get_redis_pass("/etc/redis/redis.conf"))
18 asyncio.ensure_future(self.redis.set("client_id", 0))
19
20 async def process_websocket(self, client_id, web_socket):
21 try:
22 while True:
23 data = await web_socket.receive_text()
24 try:
25 parsed_data = json.loads(data)
26 except json.JSONDecodeError:
27 parsed_data = None
28 if not isinstance(parsed_data, dict):
29 parsed_data = {"data": data}
30 parsed_data["client_id"] = client_id
31 await self.redis.publish("master", pickle.dumps(parsed_data))
32 except (CancelledError, WebSocketDisconnected):
33 pass
34
35 async def process_pubsub(self, pubsub, web_socket):
36 try:
37 while True:
38 data = await pubsub.get_message(True, .3)
39 if not web_socket.ready or web_socket.closed:
40 break
41 if data is not None:
42 await web_socket.send_text(json.dumps(pickle.loads(data["data"])))
43 except (CancelledError, WebSocketDisconnected):
44 pass
45
46 async def on_websocket(self, req, web_socket):
47 client_id = await self.redis.incr("client_id")
48 await web_socket.accept()
49 pubsub = self.redis.pubsub()
50 await pubsub.subscribe(f"client-{client_id}")
51 await self.redis.publish(
52 "master", pickle.dumps({"action": "join", "client_id": client_id}),
53 )
54 try:
55 await asyncio.gather(
56 self.process_websocket(client_id, web_socket),
57 self.process_pubsub(pubsub, web_socket),
58 return_exceptions=True,
59 )
60 except (CancelledError, WebSocketDisconnected):
61 pass
62 except Exception:
63 print_exception(*sys.exc_info())
64 finally:
65 await web_socket.close()
66 await self.redis.publish(
67 "master",
68 pickle.dumps({"action": "leave", "client_id": client_id}),
69 )
70
71 async def process_websocket_master(self, web_socket):
72 try:
73 while True:
74 data = json.loads(await web_socket.receive_text())
75 for client_id in data.pop("client_ids", ()):
76 await self.redis.publish(
77 f"client-{client_id}",
78 pickle.dumps(data),
79 )
80 except (CancelledError, WebSocketDisconnected) as e:
81 pass
82
83 async def on_websocket_master(self, req, web_socket):
84 await web_socket.accept()
85 pubsub = self.redis.pubsub()
86 await pubsub.subscribe("master")
87 try:
88 await asyncio.gather(
89 self.process_websocket_master(web_socket),
90 self.process_pubsub(pubsub, web_socket),
91 return_exceptions=True,
92 )
93 except (CancelledError, WebSocketDisconnected):
94 pass
95 except Exception:
96 print_exception(*sys.exc_info())
97 finally:
98 await web_socket.close()
99
100 def add_routes(self, app):
101 app.add_route("/ws", self)
102 app.add_route(self.master_ws_uri, self, suffix="master")
103
104 def update_context_vars(self, context_vars):
105 context_vars["master_ws_uri"] = self.master_ws_uri