import socket import sys from argparse import ArgumentParser from base64 import urlsafe_b64encode from hashlib import pbkdf2_hmac from pathlib import Path from secrets import token_urlsafe from subprocess import run from traceback import print_exception from urllib.parse import urlunsplit from falcon.asgi import App as FalconApp from falcon.constants import MEDIA_HTML from jinja2 import Environment, FileSystemLoader, select_autoescape from redis.asyncio import StrictRedis from uvicorn import Config, Server from .static import HubApp, TemplateTreeFileApp class App(TemplateTreeFileApp, FalconApp): @classmethod def scan_files(cls, base_dir): for path in base_dir.iterdir(): if not path.is_dir() and not cls.is_ignored_filename(path): yield path def scramble(self, value): if isinstance(value, str): value = value.encode() secret = self.secret if isinstance(secret, str): secret = secret.encode() return urlsafe_b64encode( pbkdf2_hmac("sha512", value, secret, 221100) ).rstrip(b"=").decode("ascii") def uri_tail(self, path: Path) -> str: return self.scramble(super().uri_tail(path)) def __init__(self, base_dir, secret, **kwargs): from .utils import get_redis_pass self.secret = secret or token_urlsafe(64) kwargs.setdefault("media_type", MEDIA_HTML) FalconApp.__init__(self, **kwargs) TemplateTreeFileApp.__init__( self, self, base_dir, "/derp", "root" ) self.env = Environment( loader=FileSystemLoader(self.base_dir), autoescape=select_autoescape(), extensions=["hub.utils.StaticTag"], ) self.hubapps = {} self.conn = StrictRedis( username="default", password=get_redis_pass("/etc/redis/redis.conf") ) for base_dir in self.base_dir.iterdir(): if base_dir.is_dir() and not self.is_ignored_filename(base_dir): self.hubapps[base_dir.name] = HubApp( self, base_dir, base_uri=f"/derp/{base_dir.name}" ) self.add_error_handler(Exception, self.print_exception) async def print_exception(self, req, resp, ex, params): print_exception(*sys.exc_info()) class HubServer(Server): def __init__(self, *args, browser, **kwargs): self.browser = browser super().__init__(*args, **kwargs) async def startup(self, sockets: list[socket.socket] | None = None) -> None: await super().startup(sockets) root_app = self.config.loaded_app.app print("Secret:", root_app.secret) for uri, file in root_app.files_per_uris.items(): if file.name == "index.html": break else: raise ValueError("Root page not found!") host, port, ssl = self.config.host, self.config.port, bool(self.config.ssl) if port and port != (80, 443)[ssl]: host = f"{host}:{port}" url = urlunsplit((f"http{'s'[:ssl]}", host, uri, "", "")) print("URL:", url) if self.browser: run([self.browser, url]) async def main(): ap = ArgumentParser() ap.add_argument("--secret") ap.add_argument("--browser", default="xdg-open") args = ap.parse_args() app = App(Path(__file__).parents[1] / "webroot", args.secret) await app.conn.set("client_id", 0) config = Config(app, port=5000, log_level="info") config.setup_event_loop() hs = HubServer(config, browser=args.browser) await hs.serve()