from base64 import urlsafe_b64encode from hashlib import pbkdf2_hmac from pathlib import Path from secrets import token_urlsafe from falcon.constants import MEDIA_HTML, MEDIA_JS, MEDIA_TEXT from falcon.status_codes import HTTP_OK from jinja2 import Template from redis.asyncio import StrictRedis from .websocket import WebSocketApp MEDIA_CSS = "text/css" class StaticFile: """ Basic static file wrapper. """ media_types_per_suffix = { ".html": MEDIA_HTML, ".js": MEDIA_JS, ".css": MEDIA_CSS, ".txt": MEDIA_TEXT, } content: str | None def __init__(self, path): self.path = path self.name = path.name self.media_type = self.get_media_type(path) self.mtime = None self.content = None @classmethod def get_media_type(cls, path): return cls.media_types_per_suffix[path.suffix] def get(self): mtime = self.path.stat().st_mtime if mtime != self.mtime: with open(self.path) as fh: self.content = fh.read() self.mtime = mtime return self.content def __repr__(self): return f"<{type(self).__name__} {self.path}>" class TreeFileApp: """ Map a directory tree base_dir to a base_uri and serve it statically. Map index.html files to their relative roots: index.html to "/" (root uri) to index.html and "/a" (directory uri) to a/index.html """ @staticmethod def is_ignored_filename(path): return path.name.startswith(".") @classmethod def scan_files(cls, base_dir): stack = [base_dir.iterdir()] while len(stack): try: path = next(stack[-1]) except StopIteration: stack.pop() continue if path.is_dir(): stack.append(path.iterdir()) elif not cls.is_ignored_filename(path): yield path def get_file(self, path: Path) -> StaticFile: return StaticFile(path) def uri_tail(self, path: Path) -> str: """ Return the "local" path, relative to self.base_dir, if applicable """ assert isinstance(path, Path) if path.is_absolute(): path = path.relative_to(self.base_dir) return str(path) def uri(self, path: Path) -> str: uri_tail = self.uri_tail(path) if uri_tail == "index.html": return self.base_uri or "/" index_suffix = "/index.html" if uri_tail.endswith(index_suffix): uri_tail = uri_tail[:-len(index_suffix)] return f"{self.base_uri}/{uri_tail.lstrip('/')}" def __init__(self, app, base_dir, base_uri="/"): self.app = app self.base_dir = base_dir self.base_uri = base_uri.rstrip("/") self.name = self.base_uri.replace("/", ".").strip(".") or "root" self.files_per_uris = {} for path in self.scan_files(base_dir): static_file = self.get_file(path) uri = self.uri(static_file.path) self.files_per_uris[uri] = static_file app.add_route(uri, self) async def on_get(self, req, resp): resource = self.files_per_uris[req.path] resp.content_type = resource.media_type resp.data = resource.get().encode() resp.status = HTTP_OK class StaticTemplateFile(StaticFile): TEMPLATE_SUFFIX = ".j2" content: Template | None def __init__(self, path, hubapp): super().__init__(path) self.name = path.stem self.hubapp = hubapp self.context = {"static_file": self} def get(self) -> str: mtime = self.path.stat().st_mtime if mtime != self.mtime: env = self.hubapp.app.env self.content = env.get_template( str(self.path.relative_to(env.loader.searchpath[0])) ) self.mtime = mtime return self.content.render(self.context) @classmethod def get_media_type(cls, path): assert path.suffix == cls.TEMPLATE_SUFFIX return cls.media_types_per_suffix[path.suffixes[-2]] class TemplateTreeFileApp(TreeFileApp): def get_file(self, path: Path) -> StaticFile: if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX: return StaticTemplateFile(path, self) return super().get_file(path) def uri_tail(self, path: Path) -> str: uri_tail = super().uri_tail(path) if uri_tail.endswith(StaticTemplateFile.TEMPLATE_SUFFIX): uri_tail = uri_tail[:-len(StaticTemplateFile.TEMPLATE_SUFFIX)] return uri_tail class RootApp(TemplateTreeFileApp): @classmethod def scan_files(cls, base_dir): for path in base_dir.iterdir(): if not path.is_dir() and not cls.is_ignored_filename(path): yield path def __init__(self, app, base_dir, base_uri="/", secret=None): from .utils import get_redis_pass self.secret = secret or token_urlsafe(64) super().__init__(app, base_dir, base_uri) self.conn = StrictRedis(username="default", password=get_redis_pass("/etc/redis/redis.conf")) async def setup(self): await self.conn.set("client_id", 0) def scramble(self, value): if isinstance(value, str): value = value.encode() secret = self.secret if isinstance(secret, str): secret = secret.encode() return urlsafe_b64encode( pbkdf2_hmac("sha512", value, secret, 221100) ).rstrip(b"=").decode("ascii") def uri_tail(self, path: Path) -> str: return self.scramble(super().uri_tail(path)) class HubApp(TemplateTreeFileApp): def __init__(self, app, base_dir, base_uri="/"): super().__init__(app, base_dir, base_uri) self.ws_app = WebSocketApp(self) self.ws_uris = { self.uri(Path(f"ws_{value}")): value for value in ("client", "master") } for uri, suffix in self.ws_uris.items(): app.add_route(uri, self.ws_app, suffix=suffix) @staticmethod def is_master_uri(path: Path) -> bool: assert isinstance(path, Path) basename = path.name pos = basename.find(".") if pos != -1: basename = basename[:pos] return basename in "master" def uri(self, path: Path) -> str: uri = super().uri(path) if self.is_master_uri(path): pos = uri.rstrip("/").rfind("/") scrambled_uri = self.app.hubapps["root"].scramble(uri) if pos == -1: uri = scrambled_uri else: uri = f"{uri[:pos]}/{scrambled_uri}" return uri