2 from asyncio
.exceptions
import CancelledError
6 from traceback
import print_exception
8 from falcon
import WebSocketDisconnected
9 from redis
.asyncio
import StrictRedis
11 from .utils
import get_redis_pass
, scramble
15 def __init__(self
, secret
):
16 self
.master_ws_uri
= f
"/{scramble(secret, 'ws')}"
17 self
.redis
= StrictRedis(password
=get_redis_pass("/etc/redis/redis.conf"))
18 asyncio
.ensure_future(self
.redis
.set("client_id", 0))
20 async def process_websocket(self
, client_id
, web_socket
):
23 data
= await web_socket
.receive_text()
25 parsed_data
= json
.loads(data
)
26 except json
.JSONDecodeError
:
28 if not isinstance(parsed_data
, dict):
29 parsed_data
= {"data": data}
30 parsed_data
["client_id"] = client_id
31 await self
.redis
.publish("master", pickle
.dumps(parsed_data
))
32 except (CancelledError
, WebSocketDisconnected
):
35 async def process_pubsub(self
, pubsub
, web_socket
):
38 data
= await pubsub
.get_message(True, .3)
39 if not web_socket
.ready
or web_socket
.closed
:
42 await web_socket
.send_text(json
.dumps(pickle
.loads(data
["data"])))
43 except (CancelledError
, WebSocketDisconnected
):
46 async def on_websocket(self
, req
, web_socket
):
47 client_id
= await self
.redis
.incr("client_id")
48 await web_socket
.accept()
49 pubsub
= self
.redis
.pubsub()
50 await pubsub
.subscribe(f
"client-{client_id}")
51 await self
.redis
.publish(
52 "master", pickle
.dumps({"action": "join", "client_id": client_id}
),
56 self
.process_websocket(client_id
, web_socket
),
57 self
.process_pubsub(pubsub
, web_socket
),
58 return_exceptions
=True,
60 except (CancelledError
, WebSocketDisconnected
):
63 print_exception(*sys
.exc_info())
65 await web_socket
.close()
66 await self
.redis
.publish(
68 pickle
.dumps({"action": "leave", "client_id": client_id}
),
71 async def process_websocket_master(self
, web_socket
):
74 data
= json
.loads(await web_socket
.receive_text())
75 for client_id
in data
.pop("client_ids", ()):
76 await self
.redis
.publish(
77 f
"client-{client_id}",
80 except (CancelledError
, WebSocketDisconnected
) as e
:
83 async def on_websocket_master(self
, req
, web_socket
):
84 await web_socket
.accept()
85 pubsub
= self
.redis
.pubsub()
86 await pubsub
.subscribe("master")
89 self
.process_websocket_master(web_socket
),
90 self
.process_pubsub(pubsub
, web_socket
),
91 return_exceptions
=True,
93 except (CancelledError
, WebSocketDisconnected
):
96 print_exception(*sys
.exc_info())
98 await web_socket
.close()
100 def add_routes(self
, app
):
101 app
.add_route("/ws", self
)
102 app
.add_route(self
.master_ws_uri
, self
, suffix
="master")
104 def update_context_vars(self
, context_vars
):
105 context_vars
["master_ws_uri"] = self
.master_ws_uri