5 from asyncio
.exceptions
import CancelledError
6 from functools
import partial
7 from traceback
import print_exception
9 from falcon
import WebSocketDisconnected
10 from redis
.asyncio
import StrictRedis
12 from .utils
import get_redis_pass
15 class BaseWebSocketHub
:
17 client_ids_sem
= asyncio
.Semaphore(0)
20 def __class_init(cls
, redis
):
24 asyncio
.create_task(cls
.initialize_client_ids(redis
))
27 async def initialize_client_ids(cls
, redis
):
28 await redis
.set("client_id", 0)
29 cls
.client_ids_sem
.release()
32 self
.redis
= StrictRedis(password
=get_redis_pass("/etc/redis/redis.conf"))
33 self
.__class
_init
(self
.redis
)
39 async def process_websocket(redis
, web_socket
, extra_data
={}, recipients
=[]):
42 data
= json
.loads(await web_socket
.receive_text())
43 data
.update(extra_data
)
44 if callable(recipients
):
45 current_recipients
= recipients(data
)
47 current_recipients
= recipients
48 for recipient
in current_recipients
:
49 await redis
.publish(recipient
, pickle
.dumps(data
))
50 except (CancelledError
, WebSocketDisconnected
):
54 async def process_pubsub(pubsub
, web_socket
):
57 data
= await pubsub
.get_message(True, 0.3)
58 if not web_socket
.ready
or web_socket
.closed
:
61 await web_socket
.send_text(json
.dumps(pickle
.loads(data
["data"])))
62 except (CancelledError
, WebSocketDisconnected
):
65 async def on_websocket(
70 process_websockets_kwargs
=None,
74 await web_socket
.accept()
75 pubsub
= self
.redis
.pubsub()
77 await pubsub
.subscribe(pubsub_name
)
82 self
.process_websocket(
83 self
.redis
, web_socket
, **(process_websockets_kwargs
or {})
85 self
.process_pubsub(pubsub
, web_socket
),
86 return_exceptions
=True,
88 except (CancelledError
, WebSocketDisconnected
):
91 print_exception(*sys
.exc_info())
93 await web_socket
.close()
94 if callable(leave_cb
):
98 class WebSocketHub(BaseWebSocketHub
):
99 def __init__(self
, hubapp
):
103 async def join_leave_client_notify(self
, redis
, action
, client_id
):
105 f
"{self.hubapp.name}-master",
106 pickle
.dumps({"action": action, "client_id": client_id}
),
109 async def on_websocket_client(self
, req
, web_socket
):
110 await self
.client_ids_sem
.acquire()
112 client_id
= await self
.redis
.incr("client_id")
114 self
.client_ids_sem
.release()
115 return await self
.on_websocket(
118 f
"{self.hubapp.name}-client-{client_id}",
120 "extra_data": {"client_id": client_id}
,
121 "recipients": [f
"{self.hubapp.name}-master"],
123 partial(self
.join_leave_client_notify
, self
.redis
, "join", client_id
),
124 partial(self
.join_leave_client_notify
, self
.redis
, "leave", client_id
),
127 async def on_websocket_master(self
, req
, web_socket
):
128 return await self
.on_websocket(
131 f
"{self.hubapp.name}-master",
132 {"recipients": self.get_master_recipients}
,
135 def get_master_recipients(self
, data
):
137 f
"{self.hubapp.name}-client-{int(client_id)}"
138 for client_id
in data
.pop("client_ids", ())