5 from asyncio
.exceptions
import CancelledError
6 from functools
import partial
7 from traceback
import print_exception
9 from falcon
import WebSocketDisconnected
12 class BaseWebSocketApp
:
13 def __init__(self
, hubapp
):
15 self
.conn
= self
.hubapp
.app
.hubapps
["root"].conn
21 async def process_websocket(conn
, web_socket
, extra_data
={}, recipients
=[]):
24 data
= json
.loads(await web_socket
.receive_text())
25 data
.update(extra_data
)
26 if callable(recipients
):
27 current_recipients
= recipients(data
)
29 current_recipients
= recipients
30 for recipient
in current_recipients
:
31 await conn
.publish(recipient
, pickle
.dumps(data
))
32 except (CancelledError
, WebSocketDisconnected
):
36 async def process_pubsub(pubsub
, web_socket
):
39 data
= await pubsub
.get_message(True, 0.3)
40 if not web_socket
.ready
or web_socket
.closed
:
43 await web_socket
.send_text(json
.dumps(pickle
.loads(data
["data"])))
44 except (CancelledError
, WebSocketDisconnected
):
47 async def on_websocket(
52 process_websockets_kwargs
=None,
56 await web_socket
.accept()
57 pubsub
= self
.conn
.pubsub()
59 await pubsub
.subscribe(pubsub_name
)
64 self
.process_websocket(
65 self
.conn
, web_socket
, **(process_websockets_kwargs
or {})
67 self
.process_pubsub(pubsub
, web_socket
),
68 return_exceptions
=True,
70 except (CancelledError
, WebSocketDisconnected
):
73 print_exception(*sys
.exc_info())
75 await web_socket
.close()
76 if callable(leave_cb
):
80 class WebSocketApp(BaseWebSocketApp
):
81 async def join_leave_client_notify(self
, redis
, action
, client_id
):
83 f
"{self.hubapp.name}-master",
84 pickle
.dumps({"action": action, "client_id": client_id}
),
87 async def on_websocket_client(self
, req
, web_socket
):
88 client_id
= await self
.conn
.incr("client_id")
89 return await self
.on_websocket(
92 f
"{self.hubapp.name}-client-{client_id}",
94 "extra_data": {"client_id": client_id}
,
95 "recipients": [f
"{self.hubapp.name}-master"],
97 partial(self
.join_leave_client_notify
, self
.conn
, "join", client_id
),
98 partial(self
.join_leave_client_notify
, self
.conn
, "leave", client_id
),
101 async def on_websocket_master(self
, req
, web_socket
):
102 return await self
.on_websocket(
105 f
"{self.hubapp.name}-master",
106 {"recipients": self.get_master_recipients}
,
109 def get_master_recipients(self
, data
):
111 f
"{self.hubapp.name}-client-{int(client_id)}"
112 for client_id
in data
.pop("client_ids", ())