+from base64 import urlsafe_b64encode
+from hashlib import pbkdf2_hmac
from pathlib import Path
+from secrets import token_urlsafe
from falcon.constants import MEDIA_HTML, MEDIA_JS, MEDIA_TEXT
from falcon.status_codes import HTTP_OK
from jinja2 import Template
+from redis.asyncio import StrictRedis
-from .websocket import WebSocketHub
+from .websocket import WebSocketApp
MEDIA_CSS = "text/css"
+
class StaticFile:
+ """
+ Basic static file wrapper.
+ """
media_types_per_suffix = {
".html": MEDIA_HTML,
".js": MEDIA_JS,
def __init__(self, path):
self.path = path
+ self.name = path.name
self.media_type = self.get_media_type(path)
self.mtime = None
self.content = None
return f"<{type(self).__name__} {self.path}>"
+class TreeFileApp:
+ """
+ Map a directory tree base_dir to a base_uri and serve it statically.
+ Map index.html files to their relative roots:
+ index.html to "/" (root uri) to index.html and
+ "/a" (directory uri) to a/index.html
+ """
+ @staticmethod
+ def is_ignored_filename(path):
+ return path.name.startswith(".")
+
+ @classmethod
+ def scan_files(cls, base_dir):
+ stack = [base_dir.iterdir()]
+ while len(stack):
+ try:
+ path = next(stack[-1])
+ except StopIteration:
+ stack.pop()
+ continue
+ if path.is_dir():
+ stack.append(path.iterdir())
+ elif not cls.is_ignored_filename(path):
+ yield path
+
+ def get_file(self, path: Path) -> StaticFile:
+ return StaticFile(path)
+
+ def uri_tail(self, path: Path) -> str:
+ """
+ Return the "local" path, relative to self.base_dir, if applicable
+ """
+ assert isinstance(path, Path)
+ if path.is_absolute():
+ path = path.relative_to(self.base_dir)
+ return str(path)
+
+ def uri(self, path: Path) -> str:
+ uri_tail = self.uri_tail(path)
+ if uri_tail == "index.html":
+ return self.base_uri or "/"
+ index_suffix = "/index.html"
+ if uri_tail.endswith(index_suffix):
+ uri_tail = uri_tail[:-len(index_suffix)]
+ return f"{self.base_uri}/{uri_tail.lstrip('/')}"
+
+ def __init__(self, app, base_dir, base_uri="/"):
+ self.app = app
+ self.base_dir = base_dir
+ self.base_uri = base_uri.rstrip("/")
+ self.name = self.base_uri.replace("/", ".").strip(".") or "root"
+ self.files_per_uris = {}
+ for path in self.scan_files(base_dir):
+ static_file = self.get_file(path)
+ uri = self.uri(static_file.path)
+ self.files_per_uris[uri] = static_file
+ app.add_route(uri, self)
+
+ async def on_get(self, req, resp):
+ resource = self.files_per_uris[req.path]
+ resp.content_type = resource.media_type
+ resp.data = resource.get().encode()
+ resp.status = HTTP_OK
+
+
class StaticTemplateFile(StaticFile):
TEMPLATE_SUFFIX = ".j2"
content: Template | None
def __init__(self, path, hubapp):
super().__init__(path)
+ self.name = path.stem
self.hubapp = hubapp
- self.context = {"hubapp": self.hubapp}
+ self.context = {"static_file": self}
def get(self) -> str:
mtime = self.path.stat().st_mtime
return cls.media_types_per_suffix[path.suffixes[-2]]
-class BaseHubApp:
- SCAN_FILES_RECURSIVELY = True
-
- def __init__(self, app, base_dir, name=None):
- self.app = app
- self.base_dir = base_dir
- self.name = name if name is not None else base_dir.name
- self.app.hubapps[self.name] = self
- self.files_per_uris = {
- self.uri_from(file.path): file for file in self.scan_files(base_dir)
- }
- for uri in self.files_per_uris:
- app.add_route(uri, self)
+class TemplateTreeFileApp(TreeFileApp):
+ def get_file(self, path: Path) -> StaticFile:
+ if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX:
+ return StaticTemplateFile(path, self)
+ return super().get_file(path)
- @staticmethod
- def is_master_uri(uri_tail):
- slash = "/"
- start = uri_tail.rfind(slash)
- pos = uri_tail.find(".", start if start >= 0 else 0)
- return (
- uri_tail[start + len(slash):pos] == "master"
- or "master" in uri_tail[:start].split(slash)
- )
-
- def _uri_tail(self, path, suffix):
- if isinstance(path, Path):
- if path.is_absolute():
- path = path.relative_to(self.base_dir)
- uri_tail = str(path)
- else:
- uri_tail = str(path)
- if uri_tail.endswith(suffix):
- uri_tail = uri_tail[:-len(suffix)]
-
- if self.is_master_uri(uri_tail):
- return self.app.scramble(uri_tail)
- elif uri_tail == "index.html":
- return ""
+ def uri_tail(self, path: Path) -> str:
+ uri_tail = super().uri_tail(path)
+ if uri_tail.endswith(StaticTemplateFile.TEMPLATE_SUFFIX):
+ uri_tail = uri_tail[:-len(StaticTemplateFile.TEMPLATE_SUFFIX)]
return uri_tail
- def uri_from(self, path) -> str | Path:
- uri_tail = self._uri_tail(path, StaticTemplateFile.TEMPLATE_SUFFIX)
- name = self.name
- if name == "root":
- name = ""
- return f"/{name}{'/' if name and uri_tail else ''}{uri_tail}"
-
- def scan_files(self, base_dir):
- stack = [base_dir.iterdir()]
- while len(stack):
- try:
- path = next(stack[-1])
- except StopIteration:
- stack.pop()
- continue
- if path.is_dir():
- if self.SCAN_FILES_RECURSIVELY:
- stack.append(path.iterdir())
- elif not self.is_ignored_filename(path):
- if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX:
- static_file = StaticTemplateFile(path, self)
- else:
- static_file = StaticFile(path)
- yield static_file
-
- @staticmethod
- def is_ignored_filename(path):
- return path.name.startswith(".")
-
- async def on_get(self, req, resp):
- resource = self.files_per_uris[req.path]
- resp.content_type = resource.media_type
- resp.data = resource.get().encode()
- resp.status = HTTP_OK
-
-class HubApp(BaseHubApp):
- def __init__(self, app, base_dir):
- super().__init__(app, base_dir)
- self.wsh = WebSocketHub(self)
- self.ws_uris = {self.uri_from(f"ws_{value}"): value for value in ("client", "master")}
+class RootApp(TemplateTreeFileApp):
+ @classmethod
+ def scan_files(cls, base_dir):
+ for path in base_dir.iterdir():
+ if not path.is_dir() and not cls.is_ignored_filename(path):
+ yield path
+
+ def __init__(self, app, base_dir, base_uri="/", secret=None):
+ from .utils import get_redis_pass
+
+ self.secret = secret or token_urlsafe(64)
+ super().__init__(app, base_dir, base_uri)
+ self.conn = StrictRedis(username="default", password=get_redis_pass("/etc/redis/redis.conf"))
+
+ async def setup(self):
+ await self.conn.set("client_id", 0)
+
+ def scramble(self, value):
+ if isinstance(value, str):
+ value = value.encode()
+ secret = self.secret
+ if isinstance(secret, str):
+ secret = secret.encode()
+ return urlsafe_b64encode(
+ pbkdf2_hmac("sha512", value, secret, 221100)
+ ).rstrip(b"=").decode("ascii")
+
+ def uri_tail(self, path: Path) -> str:
+ return self.scramble(super().uri_tail(path))
+
+
+class HubApp(TemplateTreeFileApp):
+ def __init__(self, app, base_dir, base_uri="/"):
+ super().__init__(app, base_dir, base_uri)
+ self.ws_app = WebSocketApp(self)
+ self.ws_uris = {
+ self.uri(Path(f"ws_{value}")): value for value in ("client", "master")
+ }
for uri, suffix in self.ws_uris.items():
- app.add_route(uri, self.wsh, suffix=suffix)
-
-
-class RootApp(BaseHubApp):
- SCAN_FILES_RECURSIVELY = False
-
- def __init__(self, app, base_dir):
- super().__init__(app, base_dir, "root")
+ app.add_route(uri, self.ws_app, suffix=suffix)
@staticmethod
- def is_master_uri(uri_tail):
- return True
+ def is_master_uri(path: Path) -> bool:
+ assert isinstance(path, Path)
+ basename = path.name
+ pos = basename.find(".")
+ if pos != -1:
+ basename = basename[:pos]
+ return basename in "master"
+
+ def uri(self, path: Path) -> str:
+ uri = super().uri(path)
+ if self.is_master_uri(path):
+ pos = uri.rstrip("/").rfind("/")
+ scrambled_uri = self.app.hubapps["root"].scramble(uri)
+ if pos == -1:
+ uri = scrambled_uri
+ else:
+ uri = f"{uri[:pos]}/{scrambled_uri}"
+ return uri