+from base64 import urlsafe_b64encode
+from hashlib import pbkdf2_hmac
from pathlib import Path
+from secrets import token_urlsafe
from falcon.constants import MEDIA_HTML, MEDIA_JS, MEDIA_TEXT
from falcon.status_codes import HTTP_OK
+from jinja2 import Template
+from redis.asyncio import StrictRedis
-from .websocket import WebSocketHub
-from .utils import scramble
+from .websocket import WebSocketApp
MEDIA_CSS = "text/css"
class StaticFile:
+ """
+ Basic static file wrapper.
+ """
media_types_per_suffix = {
".html": MEDIA_HTML,
".js": MEDIA_JS,
".css": MEDIA_CSS,
".txt": MEDIA_TEXT,
}
+ content: str | None
def __init__(self, path):
self.path = path
+ self.name = path.name
self.media_type = self.get_media_type(path)
self.mtime = None
self.content = None
return f"<{type(self).__name__} {self.path}>"
-class StaticTemplateFile(StaticFile):
- TEMPLATE_SUFFIX = ".j2"
-
- def __init__(self, path, hubapp):
- super().__init__(path)
- self.hubapp = hubapp
- self.context = {"hubapp": self.hubapp}
- self.template = hubapp.app.env.get_template(
- str(path.relative_to(hubapp.app.env.loader.searchpath[0]))
- )
-
- def get(self):
- return self.template.render(self.context)
+class TreeFileApp:
+ """
+ Map a directory tree base_dir to a base_uri and serve it statically.
+ Map index.html files to their relative roots:
+ index.html to "/" (root uri) to index.html and
+ "/a" (directory uri) to a/index.html
+ """
+ @staticmethod
+ def is_ignored_filename(path):
+ return path.name.startswith(".")
@classmethod
- def get_media_type(cls, path):
- assert path.suffix == cls.TEMPLATE_SUFFIX
- return cls.media_types_per_suffix[path.suffixes[-2]]
-
-
-class BaseHubApp:
- SCAN_FILES_RECURSIVELY = True
-
- def __init__(self, app, base_dir, name=None):
- self.app = app
- self.base_dir = base_dir
- self.name = name if name is not None else base_dir.name
- self.app.hubapps[self.name] = self
- self.files_per_uris = {
- self.uri_from(file.path): file for file in self.scan_files(base_dir)
- }
- for uri in self.files_per_uris:
- app.add_route(uri, self)
-
- @staticmethod
- def is_master_uri(uri_tail):
- slash = "/"
- start = uri_tail.rfind(slash)
- pos = uri_tail.find(".", start if start >= 0 else 0)
- return (
- uri_tail[start + len(slash):pos] == "master"
- or "master" in uri_tail[:start].split(slash)
- )
-
- def uri_from(self, path):
- if isinstance(path, Path) and path.is_absolute():
- uri_tail = str(path.relative_to(self.base_dir))
- else:
- uri_tail = str(path)
- suffix = StaticTemplateFile.TEMPLATE_SUFFIX
- if uri_tail.endswith(suffix):
- uri_tail = uri_tail[:-len(suffix)]
-
- if self.is_master_uri(uri_tail):
- uri_tail = scramble(self.app.secret, uri_tail)
- elif uri_tail == "index.html":
- uri_tail = ""
- name = self.name
- if name == "root":
- name = ""
- if name and uri_tail:
- name = f"{name}/"
- return f"/{name}{uri_tail}"
-
- def scan_files(self, base_dir):
+ def scan_files(cls, base_dir):
stack = [base_dir.iterdir()]
while len(stack):
try:
stack.pop()
continue
if path.is_dir():
- if self.SCAN_FILES_RECURSIVELY:
- stack.append(path.iterdir())
- elif not self.is_ignored_filename(path):
- if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX:
- static_file = StaticTemplateFile(path, self)
- else:
- static_file = StaticFile(path)
- yield static_file
-
- @staticmethod
- def is_ignored_filename(path):
- return path.name.startswith(".")
+ stack.append(path.iterdir())
+ elif not cls.is_ignored_filename(path):
+ yield path
+
+ def get_file(self, path: Path) -> StaticFile:
+ return StaticFile(path)
+
+ def uri_tail(self, path: Path) -> str:
+ """
+ Return the "local" path, relative to self.base_dir, if applicable
+ """
+ assert isinstance(path, Path)
+ if path.is_absolute():
+ path = path.relative_to(self.base_dir)
+ return str(path)
+
+ def uri(self, path: Path) -> str:
+ uri_tail = self.uri_tail(path)
+ if uri_tail == "index.html":
+ return self.base_uri or "/"
+ index_suffix = "/index.html"
+ if uri_tail.endswith(index_suffix):
+ uri_tail = uri_tail[:-len(index_suffix)]
+ return f"{self.base_uri}/{uri_tail.lstrip('/')}"
+
+ def __init__(self, app, base_dir, base_uri="/"):
+ self.app = app
+ self.base_dir = base_dir
+ self.base_uri = base_uri.rstrip("/")
+ self.name = self.base_uri.replace("/", ".").strip(".") or "root"
+ self.files_per_uris = {}
+ for path in self.scan_files(base_dir):
+ static_file = self.get_file(path)
+ uri = self.uri(static_file.path)
+ self.files_per_uris[uri] = static_file
+ app.add_route(uri, self)
async def on_get(self, req, resp):
resource = self.files_per_uris[req.path]
resp.status = HTTP_OK
-class HubApp(BaseHubApp):
- def __init__(self, app, base_dir):
- super().__init__(app, base_dir)
- self.wsh = WebSocketHub(self)
- self.ws_uris = {self.uri_from(f"ws_{value}"): value for value in ("client", "master")}
- for uri, suffix in self.ws_uris.items():
- app.add_route(uri, self.wsh, suffix=suffix)
+class StaticTemplateFile(StaticFile):
+ TEMPLATE_SUFFIX = ".j2"
+ content: Template | None
+ def __init__(self, path, hubapp):
+ super().__init__(path)
+ self.name = path.stem
+ self.hubapp = hubapp
+ self.context = {"static_file": self}
-class RootApp(BaseHubApp):
- SCAN_FILES_RECURSIVELY = False
+ def get(self) -> str:
+ mtime = self.path.stat().st_mtime
+ if mtime != self.mtime:
+ env = self.hubapp.app.env
+ self.content = env.get_template(
+ str(self.path.relative_to(env.loader.searchpath[0]))
+ )
+ self.mtime = mtime
+ return self.content.render(self.context)
- def __init__(self, app, base_dir):
- super().__init__(app, base_dir, "root")
+ @classmethod
+ def get_media_type(cls, path):
+ assert path.suffix == cls.TEMPLATE_SUFFIX
+ return cls.media_types_per_suffix[path.suffixes[-2]]
+
+
+class TemplateTreeFileApp(TreeFileApp):
+ def get_file(self, path: Path) -> StaticFile:
+ if path.suffix == StaticTemplateFile.TEMPLATE_SUFFIX:
+ return StaticTemplateFile(path, self)
+ return super().get_file(path)
+
+ def uri_tail(self, path: Path) -> str:
+ uri_tail = super().uri_tail(path)
+ if uri_tail.endswith(StaticTemplateFile.TEMPLATE_SUFFIX):
+ uri_tail = uri_tail[:-len(StaticTemplateFile.TEMPLATE_SUFFIX)]
+ return uri_tail
+
+
+class RootApp(TemplateTreeFileApp):
+ @classmethod
+ def scan_files(cls, base_dir):
+ for path in base_dir.iterdir():
+ if not path.is_dir() and not cls.is_ignored_filename(path):
+ yield path
+
+ def __init__(self, app, base_dir, base_uri="/", secret=None):
+ from .utils import get_redis_pass
+
+ self.secret = secret or token_urlsafe(64)
+ super().__init__(app, base_dir, base_uri)
+ self.conn = StrictRedis(username="default", password=get_redis_pass("/etc/redis/redis.conf"))
+
+ async def setup(self):
+ await self.conn.set("client_id", 0)
+
+ def scramble(self, value):
+ if isinstance(value, str):
+ value = value.encode()
+ secret = self.secret
+ if isinstance(secret, str):
+ secret = secret.encode()
+ return urlsafe_b64encode(
+ pbkdf2_hmac("sha512", value, secret, 221100)
+ ).rstrip(b"=").decode("ascii")
+
+ def uri_tail(self, path: Path) -> str:
+ return self.scramble(super().uri_tail(path))
+
+
+class HubApp(TemplateTreeFileApp):
+ def __init__(self, app, base_dir, base_uri="/"):
+ super().__init__(app, base_dir, base_uri)
+ self.ws_app = WebSocketApp(self)
+ self.ws_uris = {
+ self.uri(Path(f"ws_{value}")): value for value in ("client", "master")
+ }
+ for uri, suffix in self.ws_uris.items():
+ app.add_route(uri, self.ws_app, suffix=suffix)
@staticmethod
- def is_master_uri(uri_tail):
- return True
+ def is_master_uri(path: Path) -> bool:
+ assert isinstance(path, Path)
+ basename = path.name
+ pos = basename.find(".")
+ if pos != -1:
+ basename = basename[:pos]
+ return basename in "master"
+
+ def uri(self, path: Path) -> str:
+ uri = super().uri(path)
+ if self.is_master_uri(path):
+ pos = uri.rstrip("/").rfind("/")
+ scrambled_uri = self.app.hubapps["root"].scramble(uri)
+ if pos == -1:
+ uri = scrambled_uri
+ else:
+ uri = f"{uri[:pos]}/{scrambled_uri}"
+ return uri