X-Git-Url: https://git.mar77i.info/?a=blobdiff_plain;f=hub%2Fapp.py;h=3d701540fde74bc1bb262743d1876e0749382646;hb=f591748910835b1a11c3765723d9d30193a5bd26;hp=959024f2c2a41bd2bd3e223e6b58bc12a0c444db;hpb=6128e895bc2a5da5fe645cc9a7ad74ac75af4f6b;p=hublib diff --git a/hub/app.py b/hub/app.py index 959024f..3d70154 100644 --- a/hub/app.py +++ b/hub/app.py @@ -1,46 +1,70 @@ import socket import sys from argparse import ArgumentParser -from itertools import chain +from base64 import urlsafe_b64encode +from hashlib import pbkdf2_hmac from pathlib import Path from secrets import token_urlsafe from subprocess import run from traceback import print_exception +from typing import Generator from urllib.parse import urlunsplit from falcon.asgi import App as FalconApp from falcon.constants import MEDIA_HTML from jinja2 import Environment, FileSystemLoader, select_autoescape +from redis.asyncio import StrictRedis from uvicorn import Config, Server -from .hubapp import HubApp, RootApp +from .static import HubApp, TemplateTreeFileApp -class App(FalconApp): - def __init__(self, secret, **kwargs): +class App(TemplateTreeFileApp, FalconApp): + @classmethod + def scan_files(cls, base_dir: Path) -> Generator[Path]: + for path in base_dir.iterdir(): + if not path.is_dir() and not cls.is_ignored_filename(path): + yield path + + def scramble(self, value: str | bytes) -> str: + if isinstance(value, str): + value = value.encode() + secret = self.secret + if isinstance(secret, str): + secret = secret.encode() + return urlsafe_b64encode( + pbkdf2_hmac("sha512", value, secret, 221100) + ).rstrip(b"=").decode("ascii") + + def uri_tail(self, path: Path) -> str: + return self.scramble(super().uri_tail(path)) + + def __init__(self, base_dir: Path, secret: str, **kwargs): + from .utils import get_redis_pass + + self.secret = secret or token_urlsafe(64) kwargs.setdefault("media_type", MEDIA_HTML) - super().__init__(**kwargs) - self.base_dir = Path(__file__).parents[1] / "webroot" + FalconApp.__init__(self, **kwargs) + TemplateTreeFileApp.__init__( + self, self, base_dir, "/derp", "root" + ) self.env = Environment( loader=FileSystemLoader(self.base_dir), - autoescape=select_autoescape, + autoescape=select_autoescape(), extensions=["hub.utils.StaticTag"], ) - self.secret = secret or token_urlsafe(64) self.hubapps = {} - RootApp(self, self.base_dir) + self.conn = StrictRedis( + username="default", password=get_redis_pass("/etc/redis/redis.conf") + ) for base_dir in self.base_dir.iterdir(): - if not base_dir.is_dir() or HubApp.is_ignored_filename(base_dir): - continue - HubApp(self, base_dir) + if base_dir.is_dir() and not self.is_ignored_filename(base_dir): + self.hubapps[base_dir.name] = HubApp( + self, base_dir, base_uri=f"/derp/{base_dir.name}" + ) self.add_error_handler(Exception, self.print_exception) - def get_hubapp_by_name(self, name): - if name == "root": - name = "" - return self.hubapps[name] - - async def print_exception(self, req, resp, ex, params): + async def print_exception(self, req, resp, ex, params) -> None: print_exception(*sys.exc_info()) @@ -51,46 +75,30 @@ class HubServer(Server): async def startup(self, sockets: list[socket.socket] | None = None) -> None: await super().startup(sockets) - config = self.config - protocol_name = "https" if config.ssl else "http" - host = "0.0.0.0" if config.host is None else config.host - if ":" in host: - # It's an IPv6 address. - host = f"[{host.rstrip(']').lstrip('[')}]" - - port = config.port - if port == 0: - try: - port = next( - chain.from_iterable( - (server.sockets for server in getattr(self, "servers", ())) - ) - ).getsockname()[1] - except StopIteration: - pass - if {"http": 80, "https": 443}[protocol_name] != port: + root_app = self.config.loaded_app.app + print("Secret:", root_app.secret) + for uri, file in root_app.files_per_uris.items(): + if file.name == "index.html": + break + else: + raise ValueError("Root page not found!") + host, port, ssl = self.config.host, self.config.port, bool(self.config.ssl) + if port and port != (80, 443)[ssl]: host = f"{host}:{port}" - app = config.loaded_app.app - print("Secret:", app.secret) - for key, value in app.root_app.files_per_uris.items(): - if Path(value.path.name).stem == "index.html": - url = urlunsplit((protocol_name, host, key, "", "")) - print("URL:", url) - if self.browser: - run([self.browser, url]) - - -app: App + url = urlunsplit((f"http{'s'[:ssl]}", host, uri, "", "")) + print("URL:", url) + if self.browser: + run([self.browser, url]) async def main(): - global app ap = ArgumentParser() ap.add_argument("--secret") - ap.add_argument("--browser", default="firefox") + ap.add_argument("--browser", default="xdg-open") args = ap.parse_args() - app = App(args.secret) - config = Config("hub.app:app", port=5000, log_level="info") + app = App(Path(__file__).parents[1] / "webroot", args.secret) + await app.conn.set("client_id", 0) + config = Config(app, port=5000, log_level="info") config.setup_event_loop() hs = HubServer(config, browser=args.browser) await hs.serve()