+from pathlib import Path
+
+from falcon.constants import MEDIA_HTML, MEDIA_JS, MEDIA_TEXT
+from falcon.status_codes import HTTP_OK
+
+from .websocket import WebSocketHub
+from .utils import scramble
+
+MEDIA_CSS = "text/css"
+
+
+class StaticFile:
+ media_types_per_suffix = {
+ ".html": MEDIA_HTML,
+ ".js": MEDIA_JS,
+ ".css": MEDIA_CSS,
+ ".txt": MEDIA_TEXT,
+ }
+
+ def __init__(self, path):
+ self.path = path
+ self.media_type = self.get_media_type(path)
+ self.mtime = None
+ self.content = None
+
+ @classmethod
+ def get_media_type(cls, path):
+ return cls.media_types_per_suffix[path.suffix]
+
+ def get(self):
+ mtime = self.path.stat().st_mtime
+ if mtime != self.mtime:
+ with open(self.path) as fh:
+ self.content = fh.read()
+ self.mtime = mtime
+ return self.content
+
+ def __repr__(self):
+ return f"<{type(self).__name__} {self.path}>"
+
+
+class StaticTemplateFile(StaticFile):
+ TEMPLATE_SUFFIX = ".j2"
+
+ def __init__(self, path, hubapp):
+ super().__init__(path)
+ self.hubapp = hubapp
+ self.context = {"hubapp": self.hubapp}
+ self.template = hubapp.app.env.get_template(
+ str(path.relative_to(hubapp.app.env.loader.searchpath[0]))
+ )
+
+ def get(self):
+ return self.template.render(self.context)
+
+ @classmethod
+ def get_media_type(cls, path):
+ assert path.suffix == cls.TEMPLATE_SUFFIX
+ return cls.media_types_per_suffix[path.suffixes[-2]]
+
+
+class BaseHubApp:
+ SCAN_FILES_RECURSIVELY = True
+
+ def __init__(self, app, base_dir, name=None):
+ self.app = app
+ self.base_dir = base_dir
+ self.name = name if name is not None else base_dir.name
+ self.app.hubapps[self.name] = self
+ self.files_per_uris = {
+ self.uri_from(file.path): file for file in self.scan_files(base_dir)
+ }
+ for uri in self.files_per_uris:
+ app.add_route(uri, self)
+
+ @staticmethod
+ def is_master_uri(uri_tail):
+ slash = "/"
+ start = uri_tail.rfind(slash)
+ pos = uri_tail.find(".", start if start >= 0 else 0)
+ return (
+ uri_tail[start + len(slash):pos] == "master"
+ or "master" in uri_tail[:start].split(slash)
+ )
+
+ def uri_from(self, path):
+ if isinstance(path, Path) and path.is_absolute():
+ uri_tail = str(path.relative_to(self.base_dir))
+ else:
+ uri_tail = str(path)
+ suffix = StaticTemplateFile.TEMPLATE_SUFFIX
+ if uri_tail.endswith(suffix):
+ uri_tail = uri_tail[:-len(suffix)]
+
+ if self.is_master_uri(uri_tail):
+ uri_tail = scramble(self.app.secret, uri_tail)
+ elif uri_tail == "index.html":
+ uri_tail = ""
+ name = self.name
+ if name and uri_tail:
+ name = f"{name}/"
+ return f"/{name}{uri_tail}"
+
+ def scan_files(self, base_dir):
+ stack = [base_dir.iterdir()]
+ while len(stack):
+ try:
+ path = next(stack[-1])
+ except StopIteration:
+ stack.pop()
+ continue
+ if path.is_dir():
+ if self.SCAN_FILES_RECURSIVELY:
+ stack.append(path.iterdir())
+ elif not self.is_ignored_filename(path):
+ if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX:
+ static_file = StaticTemplateFile(path, self)
+ else:
+ static_file = StaticFile(path)
+ yield static_file
+
+ @staticmethod
+ def is_ignored_filename(path):
+ return path.name.startswith(".")
+
+ async def on_get(self, req, resp):
+ resource = self.files_per_uris[req.path]
+ resp.content_type = resource.media_type
+ resp.data = resource.get().encode()
+ resp.status = HTTP_OK
+
+
+class HubApp(BaseHubApp):
+ def __init__(self, app, base_dir):
+ super().__init__(app, base_dir)
+ self.wsh = WebSocketHub(self)
+ self.ws_uris = {self.uri_from(f"ws_{value}"): value for value in ("client", "master")}
+ for uri, suffix in self.ws_uris.items():
+ app.add_route(uri, self.wsh, suffix=suffix)
+
+
+class RootApp(BaseHubApp):
+ SCAN_FILES_RECURSIVELY = False
+
+ def __init__(self, app, base_dir):
+ super().__init__(app, base_dir, "")
+
+ @staticmethod
+ def is_master_uri(uri_tail):
+ return True