+import socket
+import sys
from argparse import ArgumentParser
+from base64 import urlsafe_b64encode
+from hashlib import pbkdf2_hmac
+from itertools import chain
+from pathlib import Path
+from secrets import token_urlsafe
+from subprocess import run
+from traceback import print_exception
+from urllib.parse import urlunsplit
-from falcon.asgi import App
+from falcon.asgi import App as FalconApp
from falcon.constants import MEDIA_HTML
+from jinja2 import Environment, FileSystemLoader, select_autoescape
+from uvicorn import Config, Server
-from .cryptresource import CryptResource
+from .hubapp import HubApp, RootApp
-def get_app():
- app = App(media_type=MEDIA_HTML)
+class App(FalconApp):
+ def __init__(self, secret, **kwargs):
+ kwargs.setdefault("media_type", MEDIA_HTML)
+ super().__init__(**kwargs)
+ self.base_dir = Path(__file__).parents[1] / "webroot"
+ self.env = Environment(
+ loader=FileSystemLoader(self.base_dir),
+ autoescape=select_autoescape(),
+ extensions=["hub.utils.StaticTag"],
+ )
+ self.secret = secret or token_urlsafe(64)
+ self.hubapps = {}
+ RootApp(self, self.base_dir)
+ for base_dir in self.base_dir.iterdir():
+ if not base_dir.is_dir() or HubApp.is_ignored_filename(base_dir):
+ continue
+ HubApp(self, base_dir)
+ self.add_error_handler(Exception, self.print_exception)
+
+ def scramble(self, value):
+ if isinstance(value, str):
+ value = value.encode()
+ secret = self.secret
+ if isinstance(secret, str):
+ secret = secret.encode()
+ return urlsafe_b64encode(
+ pbkdf2_hmac("sha512", value, secret, 221100)
+ ).rstrip(b"=").decode("ascii")
+
+ async def print_exception(self, req, resp, ex, params):
+ print_exception(*sys.exc_info())
+
+
+class HubServer(Server):
+ def __init__(self, *args, browser, **kwargs):
+ self.browser = browser
+ super().__init__(*args, **kwargs)
+
+ async def startup(self, sockets: list[socket.socket] | None = None) -> None:
+ await super().startup(sockets)
+ config = self.config
+ protocol_name = "https" if config.ssl else "http"
+ host = "0.0.0.0" if config.host is None else config.host
+ if ":" in host:
+ # It's an IPv6 address.
+ host = f"[{host.rstrip(']').lstrip('[')}]"
+
+ port = config.port
+ if port == 0:
+ try:
+ port = next(
+ chain.from_iterable(
+ (server.sockets for server in getattr(self, "servers", ()))
+ )
+ ).getsockname()[1]
+ except StopIteration:
+ pass
+ if {"http": 80, "https": 443}[protocol_name] != port:
+ host = f"{host}:{port}"
+ app = config.loaded_app.app
+ print("Secret:", app.secret)
+ for key, value in app.hubapps["root"].files_per_uris.items():
+ if Path(value.path.name).stem == "index.html":
+ url = urlunsplit((protocol_name, host, key, "", ""))
+ print("URL:", url)
+ if self.browser:
+ run([self.browser, url])
+
+
+app: App
+
+
+async def main():
+ global app
ap = ArgumentParser()
ap.add_argument("--secret")
+ ap.add_argument("--browser", default="firefox")
args = ap.parse_args()
- cr = CryptResource(args.secret)
- cr.register_on(app)
- return app
+ app = App(args.secret)
+ config = Config("hub.app:app", port=5000, log_level="info")
+ config.setup_event_loop()
+ hs = HubServer(config, browser=args.browser)
+ await hs.serve()