+import socket
+import sys
from argparse import ArgumentParser
+from base64 import urlsafe_b64encode
+from hashlib import pbkdf2_hmac
+from pathlib import Path
+from secrets import token_urlsafe
+from subprocess import run
+from traceback import print_exception
+from typing import Generator
+from urllib.parse import urlunsplit
-from falcon.asgi import App
+from falcon.asgi import App as FalconApp
from falcon.constants import MEDIA_HTML
+from jinja2 import Environment, FileSystemLoader, select_autoescape
+from redis.asyncio import StrictRedis
+from uvicorn import Config, Server
-from .cryptresource import CryptResource
+from .static import HubApp, TemplateTreeFileApp
-def get_app():
- app = App(media_type=MEDIA_HTML)
+class App(TemplateTreeFileApp, FalconApp):
+ @classmethod
+ def scan_files(cls, base_dir: Path) -> Generator[Path]:
+ for path in base_dir.iterdir():
+ if not path.is_dir() and not cls.is_ignored_filename(path):
+ yield path
+
+ def scramble(self, value: str | bytes) -> str:
+ if isinstance(value, str):
+ value = value.encode()
+ secret = self.secret
+ if isinstance(secret, str):
+ secret = secret.encode()
+ return urlsafe_b64encode(
+ pbkdf2_hmac("sha512", value, secret, 221100)
+ ).rstrip(b"=").decode("ascii")
+
+ def uri_tail(self, path: Path) -> str:
+ return self.scramble(super().uri_tail(path))
+
+ def __init__(self, base_dir: Path, secret: str, **kwargs):
+ from .utils import get_redis_pass
+
+ self.secret = secret or token_urlsafe(64)
+ kwargs.setdefault("media_type", MEDIA_HTML)
+ FalconApp.__init__(self, **kwargs)
+ TemplateTreeFileApp.__init__(
+ self, self, base_dir, "/derp", "root"
+ )
+ self.env = Environment(
+ loader=FileSystemLoader(self.base_dir),
+ autoescape=select_autoescape(),
+ extensions=["hub.utils.StaticTag"],
+ )
+ self.hubapps = {}
+ self.conn = StrictRedis(
+ username="default", password=get_redis_pass("/etc/redis/redis.conf")
+ )
+ for base_dir in self.base_dir.iterdir():
+ if base_dir.is_dir() and not self.is_ignored_filename(base_dir):
+ self.hubapps[base_dir.name] = HubApp(
+ self, base_dir, base_uri=f"/derp/{base_dir.name}"
+ )
+ self.add_error_handler(Exception, self.print_exception)
+
+ async def print_exception(self, req, resp, ex, params) -> None:
+ print_exception(*sys.exc_info())
+
+
+class HubServer(Server):
+ def __init__(self, *args, browser, **kwargs):
+ self.browser = browser
+ super().__init__(*args, **kwargs)
+
+ async def startup(self, sockets: list[socket.socket] | None = None) -> None:
+ await super().startup(sockets)
+ root_app = self.config.loaded_app.app
+ print("Secret:", root_app.secret)
+ for uri, file in root_app.files_per_uris.items():
+ if file.name == "index.html":
+ break
+ else:
+ raise ValueError("Root page not found!")
+ host, port, ssl = self.config.host, self.config.port, bool(self.config.ssl)
+ if port and port != (80, 443)[ssl]:
+ host = f"{host}:{port}"
+ url = urlunsplit((f"http{'s'[:ssl]}", host, uri, "", ""))
+ print("URL:", url)
+ if self.browser:
+ run([self.browser, url])
+
+
+async def main():
ap = ArgumentParser()
ap.add_argument("--secret")
+ ap.add_argument("--browser", default="xdg-open")
args = ap.parse_args()
- cr = CryptResource(args.secret)
- cr.register_on(app)
- return app
+ app = App(Path(__file__).parents[1] / "webroot", args.secret)
+ await app.conn.set("client_id", 0)
+ config = Config(app, port=5000, log_level="info")
+ config.setup_event_loop()
+ hs = HubServer(config, browser=args.browser)
+ await hs.serve()