from argparse import ArgumentParser
from base64 import urlsafe_b64encode
from hashlib import pbkdf2_hmac
-from itertools import chain
from pathlib import Path
from secrets import token_urlsafe
from subprocess import run
from traceback import print_exception
+from typing import Generator
from urllib.parse import urlunsplit
from falcon.asgi import App as FalconApp
from falcon.constants import MEDIA_HTML
from jinja2 import Environment, FileSystemLoader, select_autoescape
+from redis.asyncio import StrictRedis
from uvicorn import Config, Server
-from .hubapp import HubApp, RootApp
+from .static import HubApp, TemplateTreeFileApp
-class App(FalconApp):
- def __init__(self, secret, **kwargs):
- kwargs.setdefault("media_type", MEDIA_HTML)
- super().__init__(**kwargs)
- self.base_dir = Path(__file__).parents[1] / "webroot"
- self.env = Environment(
- loader=FileSystemLoader(self.base_dir),
- autoescape=select_autoescape(),
- extensions=["hub.utils.StaticTag"],
- )
- self.secret = secret or token_urlsafe(64)
- self.hubapps = {}
- RootApp(self, self.base_dir)
- for base_dir in self.base_dir.iterdir():
- if not base_dir.is_dir() or HubApp.is_ignored_filename(base_dir):
- continue
- HubApp(self, base_dir)
- self.add_error_handler(Exception, self.print_exception)
+class App(TemplateTreeFileApp, FalconApp):
+ @classmethod
+ def scan_files(cls, base_dir: Path) -> Generator[Path]:
+ for path in base_dir.iterdir():
+ if not path.is_dir() and not cls.is_ignored_filename(path):
+ yield path
- def scramble(self, value):
+ def scramble(self, value: str | bytes) -> str:
if isinstance(value, str):
value = value.encode()
secret = self.secret
pbkdf2_hmac("sha512", value, secret, 221100)
).rstrip(b"=").decode("ascii")
- async def print_exception(self, req, resp, ex, params):
+ def uri_tail(self, path: Path) -> str:
+ return self.scramble(super().uri_tail(path))
+
+ def __init__(self, base_dir: Path, secret: str, **kwargs):
+ from .utils import get_redis_pass
+
+ self.secret = secret or token_urlsafe(64)
+ kwargs.setdefault("media_type", MEDIA_HTML)
+ FalconApp.__init__(self, **kwargs)
+ TemplateTreeFileApp.__init__(
+ self, self, base_dir, "/derp", "root"
+ )
+ self.env = Environment(
+ loader=FileSystemLoader(self.base_dir),
+ autoescape=select_autoescape(),
+ extensions=["hub.utils.StaticTag"],
+ )
+ self.hubapps = {}
+ self.conn = StrictRedis(
+ username="default", password=get_redis_pass("/etc/redis/redis.conf")
+ )
+ for base_dir in self.base_dir.iterdir():
+ if base_dir.is_dir() and not self.is_ignored_filename(base_dir):
+ self.hubapps[base_dir.name] = HubApp(
+ self, base_dir, base_uri=f"/derp/{base_dir.name}"
+ )
+ self.add_error_handler(Exception, self.print_exception)
+
+ async def print_exception(self, req, resp, ex, params) -> None:
print_exception(*sys.exc_info())
async def startup(self, sockets: list[socket.socket] | None = None) -> None:
await super().startup(sockets)
- config = self.config
- protocol_name = "https" if config.ssl else "http"
- host = "0.0.0.0" if config.host is None else config.host
- if ":" in host:
- # It's an IPv6 address.
- host = f"[{host.rstrip(']').lstrip('[')}]"
-
- port = config.port
- if port == 0:
- try:
- port = next(
- chain.from_iterable(
- (server.sockets for server in getattr(self, "servers", ()))
- )
- ).getsockname()[1]
- except StopIteration:
- pass
- if {"http": 80, "https": 443}[protocol_name] != port:
+ root_app = self.config.loaded_app.app
+ print("Secret:", root_app.secret)
+ for uri, file in root_app.files_per_uris.items():
+ if file.name == "index.html":
+ break
+ else:
+ raise ValueError("Root page not found!")
+ host, port, ssl = self.config.host, self.config.port, bool(self.config.ssl)
+ if port and port != (80, 443)[ssl]:
host = f"{host}:{port}"
- app = config.loaded_app.app
- print("Secret:", app.secret)
- for key, value in app.hubapps["root"].files_per_uris.items():
- if Path(value.path.name).stem == "index.html":
- url = urlunsplit((protocol_name, host, key, "", ""))
- print("URL:", url)
- if self.browser:
- run([self.browser, url])
-
-
-app: App
+ url = urlunsplit((f"http{'s'[:ssl]}", host, uri, "", ""))
+ print("URL:", url)
+ if self.browser:
+ run([self.browser, url])
async def main():
- global app
ap = ArgumentParser()
ap.add_argument("--secret")
- ap.add_argument("--browser", default="firefox")
+ ap.add_argument("--browser", default="xdg-open")
args = ap.parse_args()
- app = App(args.secret)
- config = Config("hub.app:app", port=5000, log_level="info")
+ app = App(Path(__file__).parents[1] / "webroot", args.secret)
+ await app.conn.set("client_id", 0)
+ config = Config(app, port=5000, log_level="info")
config.setup_event_loop()
hs = HubServer(config, browser=args.browser)
await hs.serve()