]> git.mar77i.info Git - hublib/blobdiff - hub/hubapp.py
serve other hubapps too, consolidate and a lot more...
[hublib] / hub / hubapp.py
diff --git a/hub/hubapp.py b/hub/hubapp.py
new file mode 100644 (file)
index 0000000..bd2540b
--- /dev/null
@@ -0,0 +1,150 @@
+from pathlib import Path
+
+from falcon.constants import MEDIA_HTML, MEDIA_JS, MEDIA_TEXT
+from falcon.status_codes import HTTP_OK
+
+from .websocket import WebSocketHub
+from .utils import scramble
+
+MEDIA_CSS = "text/css"
+
+
+class StaticFile:
+    media_types_per_suffix = {
+        ".html": MEDIA_HTML,
+        ".js": MEDIA_JS,
+        ".css": MEDIA_CSS,
+        ".txt": MEDIA_TEXT,
+    }
+
+    def __init__(self, path):
+        self.path = path
+        self.media_type = self.get_media_type(path)
+        self.mtime = None
+        self.content = None
+
+    @classmethod
+    def get_media_type(cls, path):
+        return cls.media_types_per_suffix[path.suffix]
+
+    def get(self):
+        mtime = self.path.stat().st_mtime
+        if mtime != self.mtime:
+            with open(self.path) as fh:
+                self.content = fh.read()
+            self.mtime = mtime
+        return self.content
+
+    def __repr__(self):
+        return f"<{type(self).__name__} {self.path}>"
+
+
+class StaticTemplateFile(StaticFile):
+    TEMPLATE_SUFFIX = ".j2"
+
+    def __init__(self, path, hubapp):
+        super().__init__(path)
+        self.hubapp = hubapp
+        self.context = {"hubapp": self.hubapp}
+        self.template = hubapp.app.env.get_template(
+            str(path.relative_to(hubapp.app.env.loader.searchpath[0]))
+        )
+
+    def get(self):
+        return self.template.render(self.context)
+
+    @classmethod
+    def get_media_type(cls, path):
+        assert path.suffix == cls.TEMPLATE_SUFFIX
+        return cls.media_types_per_suffix[path.suffixes[-2]]
+
+
+class BaseHubApp:
+    SCAN_FILES_RECURSIVELY = True
+
+    def __init__(self, app, base_dir, name=None):
+        self.app = app
+        self.base_dir = base_dir
+        self.name = name if name is not None else base_dir.name
+        self.app.hubapps[self.name] = self
+        self.files_per_uris = {
+            self.uri_from(file.path): file for file in self.scan_files(base_dir)
+        }
+        for uri in self.files_per_uris:
+            app.add_route(uri, self)
+
+    @staticmethod
+    def is_master_uri(uri_tail):
+        slash = "/"
+        start = uri_tail.rfind(slash)
+        pos = uri_tail.find(".", start if start >= 0 else 0)
+        return (
+            uri_tail[start + len(slash):pos] == "master"
+            or "master" in uri_tail[:start].split(slash)
+        )
+
+    def uri_from(self, path):
+        if isinstance(path, Path) and path.is_absolute():
+            uri_tail = str(path.relative_to(self.base_dir))
+        else:
+            uri_tail = str(path)
+        suffix = StaticTemplateFile.TEMPLATE_SUFFIX
+        if uri_tail.endswith(suffix):
+            uri_tail = uri_tail[:-len(suffix)]
+
+        if self.is_master_uri(uri_tail):
+            uri_tail = scramble(self.app.secret, uri_tail)
+        elif uri_tail == "index.html":
+            uri_tail = ""
+        name = self.name
+        if name and uri_tail:
+            name = f"{name}/"
+        return f"/{name}{uri_tail}"
+
+    def scan_files(self, base_dir):
+        stack = [base_dir.iterdir()]
+        while len(stack):
+            try:
+                path = next(stack[-1])
+            except StopIteration:
+                stack.pop()
+                continue
+            if path.is_dir():
+                if self.SCAN_FILES_RECURSIVELY:
+                    stack.append(path.iterdir())
+            elif not self.is_ignored_filename(path):
+                if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX:
+                    static_file = StaticTemplateFile(path, self)
+                else:
+                    static_file = StaticFile(path)
+                yield static_file
+
+    @staticmethod
+    def is_ignored_filename(path):
+        return path.name.startswith(".")
+
+    async def on_get(self, req, resp):
+        resource = self.files_per_uris[req.path]
+        resp.content_type = resource.media_type
+        resp.data = resource.get().encode()
+        resp.status = HTTP_OK
+
+
+class HubApp(BaseHubApp):
+    def __init__(self, app, base_dir):
+        super().__init__(app, base_dir)
+        self.wsh = WebSocketHub(self)
+        self.ws_uris = {self.uri_from(f"ws_{value}"): value for value in ("client", "master")}
+        for uri, suffix in self.ws_uris.items():
+            app.add_route(uri, self.wsh, suffix=suffix)
+
+
+class RootApp(BaseHubApp):
+    SCAN_FILES_RECURSIVELY = False
+
+    def __init__(self, app, base_dir):
+        super().__init__(app, base_dir, "")
+
+    @staticmethod
+    def is_master_uri(uri_tail):
+        return True