from pathlib import Path from falcon.constants import MEDIA_HTML, MEDIA_JS, MEDIA_TEXT from falcon.request import Request from falcon.response import Response from falcon.status_codes import HTTP_OK from jinja2 import Template from .websocket import WebSocketApp MEDIA_CSS = "text/css" class StaticFile: """ Basic static file wrapper. """ media_types_per_suffix: dict[str, str] = { ".html": MEDIA_HTML, ".js": MEDIA_JS, ".css": MEDIA_CSS, ".txt": MEDIA_TEXT, } def __init__(self, path: Path): self.path = path self.name = path.name self.media_type = self.get_media_type(path) self.mtime: float | None = None self.content: bytes | None = None @classmethod def get_media_type(cls, path: Path): return cls.media_types_per_suffix[path.suffix] def get(self) -> bytes: mtime = self.path.stat().st_mtime if mtime != self.mtime: with open(self.path, "rb") as fh: self.content = fh.read() self.mtime = mtime return self.content def __repr__(self): return f"<{type(self).__name__} {self.path}>" class TreeFileApp: """ Map a directory tree base_dir to a base_uri and serve it statically. Map index.html files to their relative roots: index.html to "/" (root uri) to index.html and a/index.html to "/a" (directory uri) """ @staticmethod def is_ignored_filename(path): return path.name.startswith(".") @classmethod def scan_files(cls, base_dir): stack = [base_dir.iterdir()] while len(stack): try: path = next(stack[-1]) except StopIteration: stack.pop() continue if path.is_dir(): stack.append(path.iterdir()) elif not cls.is_ignored_filename(path): yield path def get_file(self, path: Path) -> StaticFile: return StaticFile(path) def uri_tail(self, path: Path) -> str: """ Return the "local" path, relative to self.base_dir, if applicable """ assert isinstance(path, Path) if path.is_absolute(): path = path.relative_to(self.base_dir) return str(path) def uri(self, path: Path) -> str: uri_tail = self.uri_tail(path) if uri_tail == "index.html": return self.base_uri or "/" index_suffix = "/index.html" if uri_tail.endswith(index_suffix): uri_tail = uri_tail[:-len(index_suffix)] return f"{self.base_uri}/{uri_tail.lstrip('/')}" @staticmethod def create_name(root_base_uri, base_uri): if base_uri.startswith(root_base_uri): base_uri = base_uri[len(root_base_uri):] return base_uri.replace("/", ".").strip(".") def __init__(self, root, base_dir, base_uri="/", name=None): self.root = root self.base_dir = base_dir self.base_uri = base_uri.rstrip("/") self.name = name or self.create_name(self.root.base_uri, self.base_uri) assert self.name, self.name self.files_per_uris: dict[str, StaticFile] = {} for path in self.scan_files(base_dir): static_file = self.get_file(path) uri = self.uri(static_file.path) self.files_per_uris[uri] = static_file root.add_route(uri, self) async def on_get(self, req: Request, resp: Response): resource = self.files_per_uris[req.path] resp.content_type = resource.media_type resp.data = resource.get() resp.status = HTTP_OK class StaticTemplateFile(StaticFile): TEMPLATE_SUFFIX = ".j2" content: Template | None def __init__(self, path: Path, hubapp: "TemplateTreeFileApp"): super().__init__(path) self.name = path.stem self.hubapp = hubapp self.context = {"static_file": self} def get(self) -> bytes: mtime = self.path.stat().st_mtime if mtime != self.mtime: env = self.hubapp.root.env self.content = env.get_template( str(self.path.relative_to(env.loader.searchpath[0])) ) self.mtime = mtime return self.content.render(self.context).encode() @classmethod def get_media_type(cls, path: Path) -> str: assert path.suffix == cls.TEMPLATE_SUFFIX return cls.media_types_per_suffix[path.suffixes[-2]] class TemplateTreeFileApp(TreeFileApp): def get_file(self, path: Path) -> StaticFile: if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX: return StaticTemplateFile(path, self) return super().get_file(path) def uri_tail(self, path: Path) -> str: uri_tail = super().uri_tail(path) if uri_tail.endswith(StaticTemplateFile.TEMPLATE_SUFFIX): uri_tail = uri_tail[:-len(StaticTemplateFile.TEMPLATE_SUFFIX)] return uri_tail class HubApp(TemplateTreeFileApp): def __init__(self, app, base_dir, base_uri="/"): super().__init__(app, base_dir, base_uri) self.ws_app = WebSocketApp(self) for suffix in ("client", "master"): app.add_route(self.uri(Path(f"ws_{suffix}")), self.ws_app, suffix=suffix) @staticmethod def is_master_uri(path: Path) -> bool: assert isinstance(path, Path) basename = path.name pos = basename.find(".") if pos != -1: basename = basename[:pos] return basename in ("master", "ws_master") def uri(self, path: Path) -> str: uri = super().uri(path) if not self.is_master_uri(path): return uri pos = uri.rstrip("/").rfind("/") assert pos >= 0 scrambled_uri = self.root.scramble(uri) return f"{uri[:pos]}/{scrambled_uri}"