+import socket
+import sys
from argparse import ArgumentParser
-from itertools import chain
+from base64 import urlsafe_b64encode
+from hashlib import pbkdf2_hmac
+from pathlib import Path
from secrets import token_urlsafe
-import socket
from subprocess import run
-import sys
from traceback import print_exception
-from typing import List, Optional
+from typing import Generator
from urllib.parse import urlunsplit
-from falcon.asgi import App
+from falcon.asgi import App as FalconApp
from falcon.constants import MEDIA_HTML
+from jinja2 import Environment, FileSystemLoader, select_autoescape
+from redis.asyncio import StrictRedis
from uvicorn import Config, Server
-from .staticresource import StaticResource
-from .hub import Hub
+from .static import HubApp, TemplateTreeFileApp
-class HubApp(App):
- def __init__(self, secret, browser, hubapp, *args, **kwargs):
- kwargs.setdefault("media_type", MEDIA_HTML)
- super().__init__(*args, **kwargs)
+class App(TemplateTreeFileApp, FalconApp):
+ @classmethod
+ def scan_files(cls, base_dir: Path) -> Generator[Path]:
+ for path in base_dir.iterdir():
+ if not path.is_dir() and not cls.is_ignored_filename(path):
+ yield path
+
+ def scramble(self, value: str | bytes) -> str:
+ if isinstance(value, str):
+ value = value.encode()
+ secret = self.secret
+ if isinstance(secret, str):
+ secret = secret.encode()
+ return urlsafe_b64encode(
+ pbkdf2_hmac("sha512", value, secret, 221100)
+ ).rstrip(b"=").decode("ascii")
+
+ def uri_tail(self, path: Path) -> str:
+ return self.scramble(super().uri_tail(path))
+
+ def __init__(self, base_dir: Path, secret: str, **kwargs):
+ from .utils import get_redis_pass
+
self.secret = secret or token_urlsafe(64)
- self.hub = Hub(self.secret)
- self.sr = StaticResource(self.secret, hubapp)
- self.browser = browser
- self.hub.add_routes(self)
- self.sr.add_routes(self)
- self.hub.update_context_vars(self.sr.context_vars)
+ kwargs.setdefault("media_type", MEDIA_HTML)
+ FalconApp.__init__(self, **kwargs)
+ TemplateTreeFileApp.__init__(
+ self, self, base_dir, "/derp", "root"
+ )
+ self.env = Environment(
+ loader=FileSystemLoader(self.base_dir),
+ autoescape=select_autoescape(),
+ extensions=["hub.utils.StaticTag"],
+ )
+ self.hubapps = {}
+ self.conn = StrictRedis(
+ username="default", password=get_redis_pass("/etc/redis/redis.conf")
+ )
+ for base_dir in self.base_dir.iterdir():
+ if base_dir.is_dir() and not self.is_ignored_filename(base_dir):
+ self.hubapps[base_dir.name] = HubApp(
+ self, base_dir, base_uri=f"/derp/{base_dir.name}"
+ )
self.add_error_handler(Exception, self.print_exception)
- async def print_exception(self, req, resp, ex, params):
+ async def print_exception(self, req, resp, ex, params) -> None:
print_exception(*sys.exc_info())
-def get_app():
- ap = ArgumentParser()
- ap.add_argument("--secret")
- ap.add_argument("--browser", default="firefox")
- ap.add_argument("--hubapp", default="first")
- args = ap.parse_args()
- return HubApp(args.secret, args.browser, args.hubapp)
-
-
class HubServer(Server):
- async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None:
- await super().startup(sockets)
- config = self.config
- protocol_name = "https" if config.ssl else "http"
- host = "0.0.0.0" if config.host is None else config.host
- if ":" in host:
- # It's an IPv6 address.
- host = f"[{host.rstrip(']').lstrip('[')}]"
+ def __init__(self, *args, browser, **kwargs):
+ self.browser = browser
+ super().__init__(*args, **kwargs)
- port = config.port
- if port == 0:
- try:
- port = next(
- chain.from_iterable((server.sockets for server in getattr(self, "servers", ())))
- ).getsockname()[1]
- except StopIteration:
- pass
- if {"http": 80, "https": 443}[protocol_name] != port:
- host = f"{host}:{port}"
- app = config.loaded_app.app
- print("master_uri", app.sr.master_uri)
- master_url = urlunsplit((protocol_name, host, app.sr.master_uri, "", ""))
- print("secret:", app.secret)
- if not app.browser:
- print("master url", master_url)
+ async def startup(self, sockets: list[socket.socket] | None = None) -> None:
+ await super().startup(sockets)
+ root_app = self.config.loaded_app.app
+ print("Secret:", root_app.secret)
+ for uri, file in root_app.files_per_uris.items():
+ if file.name == "index.html":
+ break
else:
- run([app.browser, master_url])
+ raise ValueError("Root page not found!")
+ host, port, ssl = self.config.host, self.config.port, bool(self.config.ssl)
+ if port and port != (80, 443)[ssl]:
+ host = f"{host}:{port}"
+ url = urlunsplit((f"http{'s'[:ssl]}", host, uri, "", ""))
+ print("URL:", url)
+ if self.browser:
+ run([self.browser, url])
-def main():
- HubServer(
- Config("hub.app:get_app", factory=True, port=5000, log_level="info")
- ).run()
+async def main():
+ ap = ArgumentParser()
+ ap.add_argument("--secret")
+ ap.add_argument("--browser", default="xdg-open")
+ args = ap.parse_args()
+ app = App(Path(__file__).parents[1] / "webroot", args.secret)
+ await app.conn.set("client_id", 0)
+ config = Config(app, port=5000, log_level="info")
+ config.setup_event_loop()
+ hs = HubServer(config, browser=args.browser)
+ await hs.serve()