class App(TemplateTreeFileApp, FalconApp):
@classmethod
- def scan_files(cls, base_dir: Path) -> Generator[Path]:
+ def scan_files(cls, base_dir: Path) -> Generator[Path, None, None]:
for path in base_dir.iterdir():
if not path.is_dir() and not cls.is_ignored_filename(path):
yield path
def scramble(self, value: str | bytes) -> str:
if isinstance(value, str):
value = value.encode()
- secret = self.secret
- if isinstance(secret, str):
- secret = secret.encode()
return (
- urlsafe_b64encode(pbkdf2_hmac("sha512", value, secret, 221100))
+ urlsafe_b64encode(
+ pbkdf2_hmac("sha512", value, self.secret.encode(), 221100)
+ )
.rstrip(b"=")
.decode("ascii")
)
def uri_tail(self, path: Path) -> str:
return self.scramble(super().uri_tail(path))
- def __init__(self, base_dir: Path, secret: str, **kwargs):
+ def __init__(self, base_dir: Path, base_uri: str, secret: str, **kwargs):
from .utils import get_redis_pass
- self.secret = secret or token_urlsafe(64)
+ self.secret: str = secret or token_urlsafe(64)
kwargs.setdefault("media_type", MEDIA_HTML)
FalconApp.__init__(self, **kwargs)
- TemplateTreeFileApp.__init__(self, self, base_dir, "/derp", "root")
self.env = Environment(
- loader=FileSystemLoader(self.base_dir),
+ loader=FileSystemLoader(base_dir),
autoescape=select_autoescape(),
extensions=["hub.utils.StaticTag"],
)
+ TemplateTreeFileApp.__init__(self, self, base_dir, base_uri, "root")
self.hubapps = {}
self.conn = StrictRedis(
username="default", password=get_redis_pass("/etc/redis/redis.conf")
for base_dir in self.base_dir.iterdir():
if base_dir.is_dir() and not self.is_ignored_filename(base_dir):
self.hubapps[base_dir.name] = HubApp(
- self, base_dir, base_uri=f"/derp/{base_dir.name}"
+ self, base_dir, base_uri=f"{base_uri.rstrip('/')}/{base_dir.name}"
)
self.add_error_handler(Exception, self.print_exception)
ap.add_argument("--secret")
ap.add_argument("--browser", default="xdg-open")
args = ap.parse_args()
- app = App(Path(__file__).parents[1] / "webroot", args.secret)
+ app = App(Path(__file__).parents[1] / "webroot", "/", args.secret)
await app.conn.set("client_id", 0)
config = Config(app, port=5000, log_level="info")
config.setup_event_loop()
from pathlib import Path
+from typing import TYPE_CHECKING, cast
from falcon.constants import MEDIA_HTML, MEDIA_JS, MEDIA_TEXT
from falcon.request import Request
from falcon.response import Response
from falcon.status_codes import HTTP_OK
-from jinja2 import Template
+from jinja2 import FileSystemLoader
from .websocket import WebSocketApp
+if TYPE_CHECKING:
+ from jinja2.environment import Template
+
+ from .app import App
+
MEDIA_CSS = "text/css"
".txt": MEDIA_TEXT,
}
+ def _get_mtime(self):
+ return self.path.stat().st_mtime
+
+ def _get_content(self) -> bytes:
+ with open(self.path, "rb") as fh:
+ return fh.read()
+
def __init__(self, path: Path):
self.path = path
self.name = path.name
self.media_type = self.get_media_type(path)
- self.mtime: float | None = None
- self.content: bytes | None = None
+ self.mtime = self._get_mtime()
+ self.content: bytes = self._get_content()
@classmethod
def get_media_type(cls, path: Path):
return cls.media_types_per_suffix[path.suffix]
def get(self) -> bytes:
- mtime = self.path.stat().st_mtime
+ mtime = self._get_mtime()
if mtime != self.mtime:
- with open(self.path, "rb") as fh:
- self.content = fh.read()
self.mtime = mtime
+ self.content = self._get_content()
return self.content
def __repr__(self):
base_uri = base_uri[len(root_base_uri) :]
return base_uri.replace("/", ".").strip(".")
- def __init__(self, root, base_dir, base_uri="/", name=None):
+ def __init__(self, root: "App", base_dir: Path, base_uri: str = "/", name=None):
self.root = root
self.base_dir = base_dir
self.base_uri = base_uri.rstrip("/")
class StaticTemplateFile(StaticFile):
TEMPLATE_SUFFIX = ".j2"
- content: Template | None
+
+ def _get_template(self) -> "Template":
+ env = self.hubapp.root.env
+ return env.get_template(
+ str(self.path.relative_to(cast(FileSystemLoader, env.loader).searchpath[0]))
+ )
def __init__(self, path: Path, hubapp: "TemplateTreeFileApp"):
- super().__init__(path)
+ self.path = path
self.name = path.stem
+ self.media_type = self.get_media_type(path)
+ self.mtime = self._get_mtime()
self.hubapp = hubapp
self.context = {"static_file": self}
+ self.template = self._get_template()
def get(self) -> bytes:
- mtime = self.path.stat().st_mtime
+ mtime = self._get_mtime()
if mtime != self.mtime:
- env = self.hubapp.root.env
- self.content = env.get_template(
- str(self.path.relative_to(env.loader.searchpath[0]))
- )
self.mtime = mtime
- return self.content.render(self.context).encode()
+ self.template = self._get_template()
+ return self.template.render(self.context).encode()
@classmethod
def get_media_type(cls, path: Path) -> str:
from asyncio.exceptions import CancelledError
from functools import partial
from traceback import print_exception
+from typing import TYPE_CHECKING
from falcon import WebSocketDisconnected
-from .static import TreeFileApp
+if TYPE_CHECKING:
+ from .static import TreeFileApp
class WebSocketApp:
- def __init__(self, hubapp: TreeFileApp):
+ def __init__(self, hubapp: "TreeFileApp"):
self.name = hubapp.name
self.conn = hubapp.root.conn
--- /dev/null
+[tool.mypy]
+python_version = "3.12"
+warn_return_any = true
+warn_unused_configs = true
+
+[[tool.mypy.overrides]]
+module = ["falcon.*", "jinja2_simple_tags"]
+ignore_missing_imports = true
Jinja2==3.1.4
jinja2-simple-tags==0.6.1
MarkupSafe==2.1.5
+mypy==1.11.1
mypy-extensions==1.0.0
packaging==24.1
pathspec==0.12.1
python-dotenv==1.0.1
PyYAML==6.0.1
redis==5.0.8
+ruff==0.5.6
sniffio==1.3.1
+typing_extensions==4.12.2
uvicorn==0.30.5
uvloop==0.19.0
watchfiles==0.22.0
(function () {
- var domconf_event_prefix = "event_";
- function domconf(domobj, obj) {
- var key;
- for (key in obj) {
- if (key.substring(0, domconf_event_prefix.length) == domconf_event_prefix) {
- domobj.addEventListener(
- key.substring(domconf_event_prefix.length), obj[key]
- );
- } else {
- domobj.setAttribute(key, obj[key])
- }
- }
- return domobj;
- }
-
- function tag(tag_name, obj) {
- return domconf(document.createElement(tag_name), obj);
- }
-
+ "use strict";
function HubClient(ws_uri, open, close, message) {
this.close = close.bind(this);
this.message = message.bind(this);
this.open = open.bind(this);
- this.ws = domconf(
- new WebSocket(
- {"http:": "ws:", "https:": "wss:"}[window.location.protocol] +
- "//" +
- window.location.host +
- ws_uri
- ),
- {
- "event_close": this.close,
- "event_message": this.message,
- "event_open": this.open,
- },
- );
+ this.ws = new WebSocket(
+ {"http:": "ws:", "https:": "wss:"}[window.location.protocol] +
+ "//" +
+ window.location.host +
+ ws_uri
+ )
+ this.ws.addEventListener("close", this.close);
+ this.ws.addEventListener("message", this.message);
+ this.ws.addEventListener("open", this.open);
this.send = this.ws.send.bind(this.ws);
}
- function write(output, msg) {
+ function write(msg) {
+ var output = document.getElementById("output");
if (output.childNodes.length > 0) {
- output.append(tag("br"));
+ output.append(document.createElement("br"));
}
if (typeof msg !== "string") {
msg = String(msg);
}
function setup(callback) {
- domconf(
- document,
- {
- "event_readystatechange": function (event) {
- if (!event) {
- event = window.event;
- }
- if (event.target.readyState !== "complete") {
- return;
- }
- callback(event);
+ document.addEventListener(
+ "readystatechange",
+ function (event) {
+ if (!event) {
+ event = window.event;
}
- },
+ if (event.target.readyState !== "complete") {
+ return;
+ }
+ callback(event);
+ }
);
}
- function make_key_event(callback) {
- var wrapper = function (event) {
- event = event || window.event;
- callback(event, event.target || event.srcElement, event.code || event.key);
- };
- return wrapper;
- }
-
- function input_with_keydown_event(callback) {
- return common.tag("input", {"event_keydown": make_key_event(callback)});
- }
-
window.common = {
- "tag": tag,
"HubClient": HubClient,
"write": write,
"setup": setup,
- "input_with_keydown_event": input_with_keydown_event,
};
}());
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
- <title>Chat client</title>
+ <title>Chat</title>
<link rel="stylesheet" href="{% static 'style.css' %}">
</head>
<body>
</script>
<script src="{% static 'index.js' %}"></script>
<h1>Chat</h1>
+ <div id="output"></div>
+ <div id="input">
+ <input type="text" />
+ <button>change name</button>
+ </div>
+ <div id="change_name" style="display:none">
+ Enter name: <input type="text" />
+ </div>
</body>
</html>
(function () {
"use strict";
- var input_div = null;
+ var input_div;
+ var change_name_div;
- function write(msg) {
- common.write(
- input_div === null ? document.body : input_div.previousSibling, msg
- );
- }
-
- function change_name_button(hub) {
- var btn = common.tag(
- "button",
- {
- "event_click": function () {
- setup_change_name(hub);
+ function setup_chat(hub) {
+ var inp = input_div.getElementsByTagName("input")[0];
+ inp.disabled = false;
+ inp.addEventListener(
+ "keydown",
+ function (event) {
+ if (!event) {
+ event = window.event;
+ }
+ var target = event.target || event.srcElement;
+ var keycode = event.code || event.key;
+ if (keycode !== "Enter" && keycode !== "NumpadEnter") {
+ return;
}
+ hub.send(JSON.stringify({"data": target.value}));
+ target.value = "";
}
);
- btn.append(document.createTextNode("change name"));
- return btn;
- }
-
- function setup_chat(hub) {
- input_div = common.tag("div");
- input_div.append(
- common.input_with_keydown_event(
- function (event, target, keycode) {
- if (keycode !== "Enter" && keycode !== "NumpadEnter") {
- return;
- }
- hub.send(JSON.stringify({"data": target.value}));
- target.value = "";
- }
- )
+ input_div.getElementsByTagName("button")[0].addEventListener(
+ "click",
+ function () {
+ change_name_div.getElementsByTagName("input")[0].focus();
+ change_name_div.style.display = "";
+ input_div.style.display = "none";
+ }
);
- input_div.append(document.createTextNode(" "));
- input_div.append(change_name_button(hub));
-
- document.body.append(common.tag("div"));
- document.body.append(input_div);
- }
-
- function setup_change_name(hub) {
- var div = common.tag("div");
- div.append(document.createTextNode("Enter name: "));
- div.append(
- common.input_with_keydown_event(
- function (event, target, keycode) {
- if (keycode !== "Enter" && keycode !== "NumpadEnter") {
- return;
- }
- if (input_div.childNodes[0] !== input_div.children[0]) {
- input_div.childNodes[0].remove();
- }
- input_div.insertBefore(
- document.createTextNode(target.value + " "), input_div.children[0]
- );
- hub.send(JSON.stringify({"action": "set_name", "name": target.value}));
- div.remove();
- input_div.style.display = "";
- input_div.children[0].focus();
+ change_name_div.getElementsByTagName("input")[0].addEventListener(
+ "keydown",
+ function (event) {
+ if (!event) {
+ event = window.event;
+ }
+ var target = event.target || event.srcElement;
+ var keycode = event.code || event.key;
+ if (keycode !== "Enter" && keycode !== "NumpadEnter") {
+ return;
}
- )
- )
- document.body.append(div);
- input_div.style.display = "none";
- div.children[0].focus();
+ while (input_div.childNodes[0] !== input_div.children[0]) {
+ input_div.childNodes[0].remove();
+ }
+ input_div.insertBefore(
+ document.createTextNode(target.value + " "),
+ input_div.children[0]
+ );
+ hub.send(
+ JSON.stringify({"action": "set_name", "name": target.value})
+ );
+ target.value = "";
+ input_div.style.display = "";
+ change_name_div.style.display = "none";
+ input_div.children[0].focus();
+ }
+ );
}
function open() {
+ input_div = document.getElementById("input");
+ change_name_div = document.getElementById("change_name");
setup_chat(this);
- write("connected to " + ws_uri);
- setup_change_name(this);
+ common.write("connected to " + ws_uri);
+ input_div.getElementsByTagName("button")[0].click();
}
function close() {
- write("connection lost");
- if (input_div !== null) {
- input_div.remove();
- input_div = null;
- }
+ common.write("connection lost");
+ input_div.getElementsByTagName("input")[0].disabled = true;
}
function message(msg) {
var obj = JSON.parse(msg.data);
if (obj.action === "join" || obj.action === "leave") {
- write("[action]: " + obj.name + " " + obj.action + "s");
- } else if (obj.action == "set_name") {
- write(
+ common.write("[action]: " + obj.name + " " + obj.action + "s");
+ } else if (obj.action === "set_name") {
+ common.write(
"[action]: " +
obj.old_name +
" changes name to " +
obj.name
);
} else if (obj.hasOwnProperty("data")) {
- write(obj.name + ": " + obj.data);
+ common.write(obj.name + ": " + obj.data);
}
}
</script>
<script src="{% static 'master.js' %}"></script>
<h1>Chat Master</h1>
+ <ul id="clients_list"></ul>
+ <div id="output"></div>
+ <div id="input">MASTER <input disabled /></div>
</body>
</html>
(function () {
"use strict";
- var input_div = null;
- var clients_list = null;
-
- function write(msg) {
- common.write(input_div.previousSibling, msg);
- }
-
- function master_send(hub, client_ids, client_id, obj) {
- obj.client_ids = client_ids;
- obj.client_id = client_id;
- hub.send(JSON.stringify(obj));
- }
-
- function ClientsList() {
- function Client(clients_list, client_id, name) {
- var client_label;
+ var input_div;
+ var clients_list;
+ function ClientsList(ul) {
+ this.ul = ul;
+ function Client(clients_list, client_id) {
this.clients_list = clients_list;
this.id = client_id;
this.get_label = function () {
return "client-" + this.id.toString();
};
- client_label = this.get_label();
- this.name = name || client_label;
-
- this.li = common.tag("li")
- this.checkbox = common.tag(
- "input",
- {
- "type": "checkbox",
- "id": "checkbox-" + client_label,
- "checked": "",
- },
- );
- this.li.append(this.checkbox);
+ this.name = this.get_label();
+ this.li = document.createElement("li");
+ this.cb = document.createElement("input");
+ this.cb.setAttribute("type", "checkbox");
+ this.cb.setAttribute("checked", "");
+ this.li.append(this.cb);
+ this.li.append(document.createTextNode(this.name));
clients_list.ul.append(this.li);
this.set_name = function (new_name) {
var old_name = this.name;
this.name = new_name;
- while (this.checkbox.nextSibling)
- this.checkbox.nextSibling.remove();
- this.checkbox.parentNode.append(document.createTextNode(new_name));
+ if (this.cb.nextSibling !== null) {
+ this.cb.nextSibling.remove();
+ }
+ this.li.append(document.createTextNode(new_name));
return old_name;
};
- this.set_name(client_label);
}
this.clients = {};
- this.ul = common.tag("ul");
- document.body.append(this.ul);
this.append = function (client_id) {
var client = new Client(this, client_id);
var result = [];
for (key in this.clients) {
client = this.clients[key];
- if (client.checkbox.checked) {
+ if (client.cb.checked) {
result.push(client.id);
}
}
};
}
- function open() {
- var hub = this;
- clients_list = new ClientsList();
- input_div = common.tag("div");
- input_div.append(document.createTextNode("MASTER "));
- input_div.append(
- common.input_with_keydown_event(
- function (event, target, keycode) {
- if (keycode !== "Enter" && keycode !== "NumpadEnter") {
- return;
- }
- hub.send(JSON.stringify({
- "client_ids": clients_list.selected(),
- "name": "MASTER",
- "data": target.value,
- }));
- write("MASTER: " + target.value);
- target.value = "";
+ function setup_chat(hub) {
+ var inp = input_div.getElementsByTagName("input")[0];
+ inp.disabled = false;
+ inp.addEventListener(
+ "keydown",
+ function (event) {
+ if (!event) {
+ event = window.event;
}
- )
+ var target = event.target || event.srcElement;
+ var keycode = event.code || event.key;
+ if (keycode !== "Enter" && keycode !== "NumpadEnter") {
+ return;
+ }
+ hub.send(
+ JSON.stringify(
+ {
+ "client_ids": clients_list.selected(),
+ "name": "MASTER",
+ "data": target.value,
+ }
+ )
+ );
+ common.write("MASTER: " + target.value);
+ target.value = "";
+ }
);
+ }
- document.body.append(common.tag("div")); // output
- document.body.append(input_div);
- write("connected to ws_master");
+ function open() {
+ input_div = document.getElementById("input");
+ clients_list = new ClientsList(document.getElementById("clients_list"));
+ setup_chat(this);
+ common.write("connected to ws_master");
}
function close() {
- write("connection lost");
- input_div.remove();
+ common.write("connection lost");
+ input_div.getElementsByTagName("input")[0].disabled = true;
}
function join_leave_broadcast(hub, client, action) {
if (!client) {
return;
}
- write("[action]: " + client.name + " " + action + "s");
- master_send(hub, clients_list.all(), client.id, {
- "action": action,
- "name": client.name,
- });
+ common.write("[action]: " + client.name + " " + action + "s");
+ hub.send(
+ JSON.stringify(
+ {
+ "client_ids": clients_list.all(),
+ "client_id": client.id,
+ "action": action,
+ "name": client.name,
+ }
+ )
+ );
}
function message(msg) {
}
client = clients_list.clients[obj.client_id.toString()];
- if (obj.action == "set_name") {
- write(
- "[action]: " +
- client.name +
- " changes name to " +
- obj.name
+ if (obj.action === "set_name") {
+ common.write("[action]: " + client.name + " changes name to " + obj.name);
+ this.send(
+ JSON.stringify(
+ {
+ "client_ids": clients_list.all(),
+ "client_id": client.id,
+ "action": "set_name",
+ "name": obj.name,
+ "old_name": client.name,
+ }
+ )
);
- master_send(this, clients_list.all(), client.id, {
- "action": "set_name",
- "old_name": client.name,
- "name": obj.name,
- });
client.set_name(obj.name);
} else if (obj.hasOwnProperty("data")) {
- write(client.name + " [" + client.id + "]: " + obj.data);
- master_send(this, clients_list.all(), client.id, {
- "name": client.name,
- "data": obj.data,
- });
+ common.write(client.name + " [" + client.id + "]: " + obj.data);
+ this.send(
+ JSON.stringify(
+ {
+ "client_ids": clients_list.all(),
+ "client_id": client.id,
+ "name": client.name,
+ "data": obj.data,
+ }
+ )
+ );
}
}