+import socket
+import sys
from argparse import ArgumentParser
from itertools import chain
+from pathlib import Path
from secrets import token_urlsafe
-import socket
from subprocess import run
-import sys
from traceback import print_exception
-from typing import List, Optional
from urllib.parse import urlunsplit
-from falcon.asgi import App
+from falcon.asgi import App as FalconApp
from falcon.constants import MEDIA_HTML
+from jinja2 import Environment, FileSystemLoader, select_autoescape
from uvicorn import Config, Server
-from .staticresource import StaticResource
-from .hub import Hub
+from .hubapp import HubApp, RootApp
-class HubApp(App):
- def __init__(self, secret, browser, hubapp, *args, **kwargs):
+class App(FalconApp):
+ def __init__(self, secret, **kwargs):
kwargs.setdefault("media_type", MEDIA_HTML)
- super().__init__(*args, **kwargs)
+ super().__init__(**kwargs)
+ self.base_dir = Path(__file__).parents[1] / "webroot"
+ self.env = Environment(
+ loader=FileSystemLoader(self.base_dir),
+ autoescape=select_autoescape,
+ extensions=["hub.utils.StaticTag"],
+ )
self.secret = secret or token_urlsafe(64)
- self.hub = Hub(self.secret)
- self.sr = StaticResource(self.secret, hubapp, self.hub.master_ws_uri)
- self.browser = browser
- self.hub.add_routes(self)
- self.sr.add_routes(self)
+ self.hubapps = {}
+ RootApp(self, self.base_dir)
+ for base_dir in self.base_dir.iterdir():
+ if not base_dir.is_dir() or HubApp.is_ignored_filename(base_dir):
+ continue
+ HubApp(self, base_dir)
self.add_error_handler(Exception, self.print_exception)
+ def get_hubapp_by_name(self, name):
+ return self.hubapps[name]
+
async def print_exception(self, req, resp, ex, params):
print_exception(*sys.exc_info())
-def get_app():
- ap = ArgumentParser()
- ap.add_argument("--secret")
- ap.add_argument("--browser", default="firefox")
- ap.add_argument("--hubapp", default="first")
- args = ap.parse_args()
- return HubApp(args.secret, args.browser, args.hubapp)
-
-
class HubServer(Server):
- async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None:
+ def __init__(self, *args, browser, **kwargs):
+ self.browser = browser
+ super().__init__(*args, **kwargs)
+
+ async def startup(self, sockets: list[socket.socket] | None = None) -> None:
await super().startup(sockets)
config = self.config
protocol_name = "https" if config.ssl else "http"
if port == 0:
try:
port = next(
- chain.from_iterable((server.sockets for server in getattr(self, "servers", ())))
+ chain.from_iterable(
+ (server.sockets for server in getattr(self, "servers", ()))
+ )
).getsockname()[1]
except StopIteration:
pass
if {"http": 80, "https": 443}[protocol_name] != port:
host = f"{host}:{port}"
app = config.loaded_app.app
- print("master_uri", app.sr.master_uri)
- master_url = urlunsplit((protocol_name, host, app.sr.master_uri, "", ""))
- print("secret:", app.secret)
- if not app.browser:
- print("master url", master_url)
- else:
- run([app.browser, master_url])
+ print("Secret:", app.secret)
+ for key, value in app.hubapps["root"].files_per_uris.items():
+ if Path(value.path.name).stem == "index.html":
+ url = urlunsplit((protocol_name, host, key, "", ""))
+ print("URL:", url)
+ if self.browser:
+ run([self.browser, url])
+
+app: App
-def main():
- HubServer(
- Config("hub.app:get_app", factory=True, port=5000, log_level="info")
- ).run()
+
+async def main():
+ global app
+ ap = ArgumentParser()
+ ap.add_argument("--secret")
+ ap.add_argument("--browser", default="firefox")
+ args = ap.parse_args()
+ app = App(args.secret)
+ config = Config("hub.app:app", port=5000, log_level="info")
+ config.setup_event_loop()
+ hs = HubServer(config, browser=args.browser)
+ await hs.serve()