5 from asyncio
.exceptions
import CancelledError
6 from functools
import partial
7 from traceback
import print_exception
9 from falcon
import WebSocketDisconnected
11 from .static
import TreeFileApp
15 def __init__(self
, hubapp
: TreeFileApp
):
16 self
.name
= hubapp
.name
17 self
.conn
= hubapp
.root
.conn
20 async def process_websocket(conn
, web_socket
, extra_data
=None, recipients
=None):
23 data
= json
.loads(await web_socket
.receive_text())
25 data
.update(extra_data
)
26 if callable(recipients
):
27 current_recipients
= recipients(data
)
29 current_recipients
= recipients
31 raise ValueError("no recipients specified")
32 for recipient
in current_recipients
:
33 await conn
.publish(recipient
, pickle
.dumps(data
))
34 except (CancelledError
, WebSocketDisconnected
):
38 async def process_pubsub(pubsub
, web_socket
):
41 data
= await pubsub
.get_message(True, 0.3)
42 if not web_socket
.ready
or web_socket
.closed
:
45 await web_socket
.send_text(json
.dumps(pickle
.loads(data
["data"])))
46 except (CancelledError
, WebSocketDisconnected
):
49 async def on_websocket(
54 process_websockets_kwargs
=None,
58 await web_socket
.accept()
59 pubsub
= self
.conn
.pubsub()
61 await pubsub
.subscribe(pubsub_name
)
66 self
.process_websocket(
67 self
.conn
, web_socket
, **(process_websockets_kwargs
or {})
69 self
.process_pubsub(pubsub
, web_socket
),
70 return_exceptions
=True,
72 except (CancelledError
, WebSocketDisconnected
):
75 print_exception(*sys
.exc_info())
77 await web_socket
.close()
78 if callable(leave_cb
):
81 async def client_notify(self
, redis
, action
, client_id
):
83 f
"{self.name}-master",
84 pickle
.dumps({"action": action, "client_id": client_id}
),
87 async def on_websocket_client(self
, req
, web_socket
):
88 client_id
= await self
.conn
.incr("client_id")
89 return await self
.on_websocket(
92 f
"{self.name}-client-{client_id}",
94 "extra_data": {"client_id": client_id}
,
95 "recipients": [f
"{self.name}-master"],
97 partial(self
.client_notify
, self
.conn
, "join", client_id
),
98 partial(self
.client_notify
, self
.conn
, "leave", client_id
),
101 async def on_websocket_master(self
, req
, web_socket
):
102 return await self
.on_websocket(
105 f
"{self.name}-master",
106 {"recipients": self.get_master_recipients}
,
109 def get_master_recipients(self
, data
):
111 f
"{self.name}-client-{int(client_id)}"
112 for client_id
in data
.pop("client_ids", ())