]> git.mar77i.info Git - hublib/blobdiff - hub/app.py
add a bunch of typehints
[hublib] / hub / app.py
index 2a0a712f515c6984a53f56ff562094c14433508d..3d701540fde74bc1bb262743d1876e0749382646 100644 (file)
@@ -1,44 +1,70 @@
 import socket
 import sys
 from argparse import ArgumentParser
-from itertools import chain
+from base64 import urlsafe_b64encode
+from hashlib import pbkdf2_hmac
 from pathlib import Path
 from secrets import token_urlsafe
 from subprocess import run
 from traceback import print_exception
+from typing import Generator
 from urllib.parse import urlunsplit
 
 from falcon.asgi import App as FalconApp
 from falcon.constants import MEDIA_HTML
 from jinja2 import Environment, FileSystemLoader, select_autoescape
+from redis.asyncio import StrictRedis
 from uvicorn import Config, Server
 
-from .hubapp import HubApp, RootApp
+from .static import HubApp, TemplateTreeFileApp
 
 
-class App(FalconApp):
-    def __init__(self, secret, **kwargs):
+class App(TemplateTreeFileApp, FalconApp):
+    @classmethod
+    def scan_files(cls, base_dir: Path) -> Generator[Path]:
+        for path in base_dir.iterdir():
+            if not path.is_dir() and not cls.is_ignored_filename(path):
+                yield path
+
+    def scramble(self, value: str | bytes) -> str:
+        if isinstance(value, str):
+            value = value.encode()
+        secret = self.secret
+        if isinstance(secret, str):
+            secret = secret.encode()
+        return urlsafe_b64encode(
+            pbkdf2_hmac("sha512", value, secret, 221100)
+        ).rstrip(b"=").decode("ascii")
+
+    def uri_tail(self, path: Path) -> str:
+        return self.scramble(super().uri_tail(path))
+
+    def __init__(self, base_dir: Path, secret: str, **kwargs):
+        from .utils import get_redis_pass
+
+        self.secret = secret or token_urlsafe(64)
         kwargs.setdefault("media_type", MEDIA_HTML)
-        super().__init__(**kwargs)
-        self.base_dir = Path(__file__).parents[1] / "webroot"
+        FalconApp.__init__(self, **kwargs)
+        TemplateTreeFileApp.__init__(
+            self, self, base_dir, "/derp", "root"
+        )
         self.env = Environment(
             loader=FileSystemLoader(self.base_dir),
-            autoescape=select_autoescape,
+            autoescape=select_autoescape(),
             extensions=["hub.utils.StaticTag"],
         )
-        self.secret = secret or token_urlsafe(64)
         self.hubapps = {}
-        RootApp(self, self.base_dir)
+        self.conn = StrictRedis(
+            username="default", password=get_redis_pass("/etc/redis/redis.conf")
+        )
         for base_dir in self.base_dir.iterdir():
-            if not base_dir.is_dir() or HubApp.is_ignored_filename(base_dir):
-                continue
-            HubApp(self, base_dir)
+            if base_dir.is_dir() and not self.is_ignored_filename(base_dir):
+                self.hubapps[base_dir.name] = HubApp(
+                    self, base_dir, base_uri=f"/derp/{base_dir.name}"
+                )
         self.add_error_handler(Exception, self.print_exception)
 
-    def get_hubapp_by_name(self, name):
-        return self.hubapps[name]
-
-    async def print_exception(self, req, resp, ex, params):
+    async def print_exception(self, req, resp, ex, params) -> None:
         print_exception(*sys.exc_info())
 
 
@@ -49,46 +75,30 @@ class HubServer(Server):
 
     async def startup(self, sockets: list[socket.socket] | None = None) -> None:
         await super().startup(sockets)
-        config = self.config
-        protocol_name = "https" if config.ssl else "http"
-        host = "0.0.0.0" if config.host is None else config.host
-        if ":" in host:
-            # It's an IPv6 address.
-            host = f"[{host.rstrip(']').lstrip('[')}]"
-
-        port = config.port
-        if port == 0:
-            try:
-                port = next(
-                    chain.from_iterable(
-                        (server.sockets for server in getattr(self, "servers", ()))
-                    )
-                ).getsockname()[1]
-            except StopIteration:
-                pass
-        if {"http": 80, "https": 443}[protocol_name] != port:
+        root_app = self.config.loaded_app.app
+        print("Secret:", root_app.secret)
+        for uri, file in root_app.files_per_uris.items():
+            if file.name == "index.html":
+                break
+        else:
+            raise ValueError("Root page not found!")
+        host, port, ssl = self.config.host, self.config.port, bool(self.config.ssl)
+        if port and port != (80, 443)[ssl]:
             host = f"{host}:{port}"
-        app = config.loaded_app.app
-        print("Secret:", app.secret)
-        for key, value in app.hubapps["root"].files_per_uris.items():
-            if Path(value.path.name).stem == "index.html":
-                url = urlunsplit((protocol_name, host, key, "", ""))
-                print("URL:", url)
-                if self.browser:
-                    run([self.browser, url])
-
-
-app: App
+        url = urlunsplit((f"http{'s'[:ssl]}", host, uri, "", ""))
+        print("URL:", url)
+        if self.browser:
+            run([self.browser, url])
 
 
 async def main():
-    global app
     ap = ArgumentParser()
     ap.add_argument("--secret")
-    ap.add_argument("--browser", default="firefox")
+    ap.add_argument("--browser", default="xdg-open")
     args = ap.parse_args()
-    app = App(args.secret)
-    config = Config("hub.app:app", port=5000, log_level="info")
+    app = App(Path(__file__).parents[1] / "webroot", args.secret)
+    await app.conn.set("client_id", 0)
+    config = Config(app, port=5000, log_level="info")
     config.setup_event_loop()
     hs = HubServer(config, browser=args.browser)
     await hs.serve()