5 from asyncio
.exceptions
import CancelledError
6 from functools
import partial
7 from traceback
import print_exception
9 from falcon
import WebSocketDisconnected
13 def __init__(self
, hubapp
):
14 self
.name
= hubapp
.name
15 self
.conn
= hubapp
.root
.conn
18 async def process_websocket(conn
, web_socket
, extra_data
=None, recipients
=None):
21 data
= json
.loads(await web_socket
.receive_text())
23 data
.update(extra_data
)
24 if callable(recipients
):
25 current_recipients
= recipients(data
)
27 current_recipients
= recipients
29 raise ValueError("no recipients specified")
30 for recipient
in current_recipients
:
31 await conn
.publish(recipient
, pickle
.dumps(data
))
32 except (CancelledError
, WebSocketDisconnected
):
36 async def process_pubsub(pubsub
, web_socket
):
39 data
= await pubsub
.get_message(True, 0.3)
40 if not web_socket
.ready
or web_socket
.closed
:
43 await web_socket
.send_text(json
.dumps(pickle
.loads(data
["data"])))
44 except (CancelledError
, WebSocketDisconnected
):
47 async def on_websocket(
52 process_websockets_kwargs
=None,
56 await web_socket
.accept()
57 pubsub
= self
.conn
.pubsub()
59 await pubsub
.subscribe(pubsub_name
)
64 self
.process_websocket(
65 self
.conn
, web_socket
, **(process_websockets_kwargs
or {})
67 self
.process_pubsub(pubsub
, web_socket
),
68 return_exceptions
=True,
70 except (CancelledError
, WebSocketDisconnected
):
73 print_exception(*sys
.exc_info())
75 await web_socket
.close()
76 if callable(leave_cb
):
79 async def client_notify(self
, redis
, action
, client_id
):
81 f
"{self.name}-master",
82 pickle
.dumps({"action": action, "client_id": client_id}
),
85 async def on_websocket_client(self
, req
, web_socket
):
86 client_id
= await self
.conn
.incr("client_id")
87 return await self
.on_websocket(
90 f
"{self.name}-client-{client_id}",
92 "extra_data": {"client_id": client_id}
,
93 "recipients": [f
"{self.name}-master"],
95 partial(self
.client_notify
, self
.conn
, "join", client_id
),
96 partial(self
.client_notify
, self
.conn
, "leave", client_id
),
99 async def on_websocket_master(self
, req
, web_socket
):
100 return await self
.on_websocket(
103 f
"{self.name}-master",
104 {"recipients": self.get_master_recipients}
,
107 def get_master_recipients(self
, data
):
109 f
"{self.name}-client-{int(client_id)}"
110 for client_id
in data
.pop("client_ids", ())