]> git.mar77i.info Git - hublib/blobdiff - hub/hub.py
serve other hubapps too, consolidate and a lot more...
[hublib] / hub / hub.py
diff --git a/hub/hub.py b/hub/hub.py
deleted file mode 100644 (file)
index da78e1f..0000000
+++ /dev/null
@@ -1,105 +0,0 @@
-import asyncio
-from asyncio.exceptions import CancelledError
-import json
-import pickle
-import sys
-from traceback import print_exception
-
-from falcon import WebSocketDisconnected
-from redis.asyncio import StrictRedis
-
-from .utils import get_redis_pass, scramble
-
-
-class Hub:
-    def __init__(self, secret):
-        self.master_ws_uri = f"/{scramble(secret, 'ws')}"
-        self.redis = StrictRedis(password=get_redis_pass("/etc/redis/redis.conf"))
-        asyncio.ensure_future(self.redis.set("client_id", 0))
-
-    async def process_websocket(self, client_id, web_socket):
-        try:
-            while True:
-                data = await web_socket.receive_text()
-                try:
-                    parsed_data = json.loads(data)
-                except json.JSONDecodeError:
-                    parsed_data = None
-                if not isinstance(parsed_data, dict):
-                    parsed_data = {"data": data}
-                parsed_data["client_id"] = client_id
-                await self.redis.publish("master", pickle.dumps(parsed_data))
-        except (CancelledError, WebSocketDisconnected):
-            pass
-
-    async def process_pubsub(self, pubsub, web_socket):
-        try:
-            while True:
-                data = await pubsub.get_message(True, .3)
-                if not web_socket.ready or web_socket.closed:
-                    break
-                if data is not None:
-                    await web_socket.send_text(json.dumps(pickle.loads(data["data"])))
-        except (CancelledError, WebSocketDisconnected):
-            pass
-
-    async def on_websocket(self, req, web_socket):
-        client_id = await self.redis.incr("client_id")
-        await web_socket.accept()
-        pubsub = self.redis.pubsub()
-        await pubsub.subscribe(f"client-{client_id}")
-        await self.redis.publish(
-            "master", pickle.dumps({"action": "join", "client_id": client_id}),
-        )
-        try:
-            await asyncio.gather(
-                self.process_websocket(client_id, web_socket),
-                self.process_pubsub(pubsub, web_socket),
-                return_exceptions=True,
-            )
-        except (CancelledError, WebSocketDisconnected):
-            pass
-        except Exception:
-            print_exception(*sys.exc_info())
-        finally:
-            await web_socket.close()
-            await self.redis.publish(
-                "master",
-                pickle.dumps({"action": "leave", "client_id": client_id}),
-            )
-
-    async def process_websocket_master(self, web_socket):
-        try:
-            while True:
-                data = json.loads(await web_socket.receive_text())
-                for client_id in data.pop("client_ids", ()):
-                    await self.redis.publish(
-                        f"client-{client_id}",
-                        pickle.dumps(data),
-                    )
-        except (CancelledError, WebSocketDisconnected) as e:
-            pass
-
-    async def on_websocket_master(self, req, web_socket):
-        await web_socket.accept()
-        pubsub = self.redis.pubsub()
-        await pubsub.subscribe("master")
-        try:
-            await asyncio.gather(
-                self.process_websocket_master(web_socket),
-                self.process_pubsub(pubsub, web_socket),
-                return_exceptions=True,
-            )
-        except (CancelledError, WebSocketDisconnected):
-            pass
-        except Exception:
-            print_exception(*sys.exc_info())
-        finally:
-            await web_socket.close()
-
-    def add_routes(self, app):
-        app.add_route("/ws", self)
-        app.add_route(self.master_ws_uri, self, suffix="master")
-
-    def update_context_vars(self, context_vars):
-        context_vars["master_ws_uri"] = self.master_ws_uri