from pathlib import Path from falcon.constants import MEDIA_HTML, MEDIA_JS, MEDIA_TEXT from falcon.status_codes import HTTP_OK from jinja2 import Template from .websocket import WebSocketHub MEDIA_CSS = "text/css" class StaticFile: media_types_per_suffix = { ".html": MEDIA_HTML, ".js": MEDIA_JS, ".css": MEDIA_CSS, ".txt": MEDIA_TEXT, } content: str | None def __init__(self, path): self.path = path self.media_type = self.get_media_type(path) self.mtime = None self.content = None @classmethod def get_media_type(cls, path): return cls.media_types_per_suffix[path.suffix] def get(self): mtime = self.path.stat().st_mtime if mtime != self.mtime: with open(self.path) as fh: self.content = fh.read() self.mtime = mtime return self.content def __repr__(self): return f"<{type(self).__name__} {self.path}>" class StaticTemplateFile(StaticFile): TEMPLATE_SUFFIX = ".j2" content: Template | None def __init__(self, path, hubapp): super().__init__(path) self.hubapp = hubapp self.context = {"hubapp": self.hubapp} def get(self) -> str: mtime = self.path.stat().st_mtime if mtime != self.mtime: env = self.hubapp.app.env self.content = env.get_template( str(self.path.relative_to(env.loader.searchpath[0])) ) self.mtime = mtime return self.content.render(self.context) @classmethod def get_media_type(cls, path): assert path.suffix == cls.TEMPLATE_SUFFIX return cls.media_types_per_suffix[path.suffixes[-2]] class BaseHubApp: SCAN_FILES_RECURSIVELY = True def __init__(self, app, base_dir, name=None): self.app = app self.base_dir = base_dir self.name = name if name is not None else base_dir.name self.app.hubapps[self.name] = self self.files_per_uris = { self.uri_from(file.path): file for file in self.scan_files(base_dir) } for uri in self.files_per_uris: app.add_route(uri, self) @staticmethod def is_master_uri(uri_tail): slash = "/" start = uri_tail.rfind(slash) pos = uri_tail.find(".", start if start >= 0 else 0) return ( uri_tail[start + len(slash):pos] == "master" or "master" in uri_tail[:start].split(slash) ) def _uri_tail(self, path, suffix): if isinstance(path, Path): if path.is_absolute(): path = path.relative_to(self.base_dir) uri_tail = str(path) else: uri_tail = str(path) if uri_tail.endswith(suffix): uri_tail = uri_tail[:-len(suffix)] if self.is_master_uri(uri_tail): return self.app.scramble(uri_tail) elif uri_tail == "index.html": return "" return uri_tail def uri_from(self, path) -> str | Path: uri_tail = self._uri_tail(path, StaticTemplateFile.TEMPLATE_SUFFIX) name = self.name if name == "root": name = "" return f"/{name}{'/' if name and uri_tail else ''}{uri_tail}" def scan_files(self, base_dir): stack = [base_dir.iterdir()] while len(stack): try: path = next(stack[-1]) except StopIteration: stack.pop() continue if path.is_dir(): if self.SCAN_FILES_RECURSIVELY: stack.append(path.iterdir()) elif not self.is_ignored_filename(path): if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX: static_file = StaticTemplateFile(path, self) else: static_file = StaticFile(path) yield static_file @staticmethod def is_ignored_filename(path): return path.name.startswith(".") async def on_get(self, req, resp): resource = self.files_per_uris[req.path] resp.content_type = resource.media_type resp.data = resource.get().encode() resp.status = HTTP_OK class HubApp(BaseHubApp): def __init__(self, app, base_dir): super().__init__(app, base_dir) self.wsh = WebSocketHub(self) self.ws_uris = {self.uri_from(f"ws_{value}"): value for value in ("client", "master")} for uri, suffix in self.ws_uris.items(): app.add_route(uri, self.wsh, suffix=suffix) class RootApp(BaseHubApp): SCAN_FILES_RECURSIVELY = False def __init__(self, app, base_dir): super().__init__(app, base_dir, "root") @staticmethod def is_master_uri(uri_tail): return True