From 45b6f068413ffb4baedd64616d7d4acb3609fcb7 Mon Sep 17 00:00:00 2001 From: mar77i Date: Tue, 21 Jan 2025 17:57:26 +0100 Subject: [PATCH] import bluetooth, xinput, xrandr --- zenbook_conf/__init__.py | 0 zenbook_conf/bluetooth.py | 25 ++++ zenbook_conf/xinput.py | 123 ++++++++++++++++++++ zenbook_conf/xrandr.py | 233 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 381 insertions(+) create mode 100644 zenbook_conf/__init__.py create mode 100644 zenbook_conf/bluetooth.py create mode 100644 zenbook_conf/xinput.py create mode 100644 zenbook_conf/xrandr.py diff --git a/zenbook_conf/__init__.py b/zenbook_conf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zenbook_conf/bluetooth.py b/zenbook_conf/bluetooth.py new file mode 100644 index 0000000..214fdde --- /dev/null +++ b/zenbook_conf/bluetooth.py @@ -0,0 +1,25 @@ +import os +from subprocess import check_output, run + + +class BluetoothConf: + DEVICE_TYPE = "bluetooth" + + def __init__(self): + self.conf = self.get_config() + + def get_config(self): + output = check_output( + ["rfkill", "-rnoSOFT,HARD", "list", self.DEVICE_TYPE], text=True + ) + state = None + for line in output.strip().split(os.linesep): + new_state = "blocked" not in line.split() + if state is not None and new_state != state: + return None + state = new_state + return state + + def update(self, value): + run(["rfkill", f"{'un' if value else ''}block", self.DEVICE_TYPE]) + self.conf = self.get_config() diff --git a/zenbook_conf/xinput.py b/zenbook_conf/xinput.py new file mode 100644 index 0000000..0dc586a --- /dev/null +++ b/zenbook_conf/xinput.py @@ -0,0 +1,123 @@ +import os +import re +from subprocess import DEVNULL, check_output, run + + +class XinputConf: + DEVICES = { + "ELAN9008:00 04F3:425B": { + "output": "eDP-1", + "type": "touchpad", + }, + "ELAN9008:00 04F3:425B Touchpad": { + "output": "eDP-1", + "type": "touchpad", + }, + "ELAN9008:00 04F3:425B Stylus": { + "output": "eDP-1", + "type": "stylus", + "ignore_result": True, + }, + "ELAN9008:00 04F3:425B Stylus Pen (0)": { + "output": "eDP-1", + "type": "stylus", + }, + "ELAN9009:00 04F3:425A": { + "output": "eDP-2", + "type": "touchpad", + }, + "ELAN9009:00 04F3:425A Touchpad": { + "output": "eDP-2", + "type": "touchpad", + }, + "ELAN9009:00 04F3:425A Stylus": { + "output": "eDP-2", + "type": "stylus", + "ignore_result": True, + }, + "ELAN9009:00 04F3:425A Stylus Pen (0)": { + "output": "eDP-2", + "type": "stylus", + }, + } + DEVICE_PATTERN = re.compile( + "^([\u239c ]\\s*\u21b3|\u223c)\\s+(([^ ]| [^ ])+)\\s*\tid=(\\d+)\t" + ) + + def __init__(self, xrandr_conf): + self.xrandr_conf = xrandr_conf + self.conf = self.get_conf() + + def get_conf(self): + output = check_output(["xinput", "list"], text=True) + devices = [] + for line in output.split(os.linesep): + match = self.DEVICE_PATTERN.match(line) + if match: + if match.group(2) not in self.DEVICES: + continue + devices.append( + { + "name": match.group(2), + "enabled": not match.group(1).startswith("\u223c"), + "id": match.group(4), + **self.DEVICES[match.group(2)], + } + ) + elif line and "core pointer" not in line and "core keyboard" not in line: + print("line", repr(line)) + return devices + + def update_device(self, device, enable=None): + if enable is None: + enable = device["enabled"] + if enable: + run(["xinput", "enable", device["id"]]) + run( + ["xinput", "map-to-output", device["id"], device["output"]], + stderr=DEVNULL if device.get("ignore_result") else None, + ) + else: + run(["xinput", "disable", device["id"]]) + self.conf = self.get_conf() + + def update(self, mode): + enable_second = mode != "single" + for device in self.conf: + if device["output"] == self.xrandr_conf.OUTPUTS[0]: + self.update_device(device) + elif device["output"] == self.xrandr_conf.OUTPUTS[1]: + self.update_device(device, enable_second) + + def update_by_type(self, device_type, state): + outputs = {o["name"]: o for o in self.xrandr_conf.get_relevant_outputs()} + for device in self.conf: + if device["type"] != device_type: + continue + s = state + if not outputs[device["output"]]["active"]: + s = False + self.update_device(device, s) + return state + + def conf_by_type(self, device_type): + outputs = {o["name"]: o for o in self.xrandr_conf.get_relevant_outputs()} + state = None + for device in self.conf: + if ( + device["type"] != device_type + or not outputs[device["output"]]["active"] + or device.get("ignore_result") + ): + continue + new_state = device["enabled"] + if state is not None and new_state != state: + return None + state = new_state + return state + + def reapply_by_type(self, device_type): + for device in self.conf: + if device["type"] != device_type: + continue + self.update_device(device, device["enabled"]) diff --git a/zenbook_conf/xrandr.py b/zenbook_conf/xrandr.py new file mode 100644 index 0000000..db135f1 --- /dev/null +++ b/zenbook_conf/xrandr.py @@ -0,0 +1,233 @@ +import os +import re +import subprocess + + +class XrandrConf: + CONNECTED_MAPPING = { + "connected": True, + "disconnected": False, + } + RESOLUTION_PATTERN = re.compile(r"(\d+)x(\d+)\+(\d+)\+(\d+)") + MM_SIZE_PATTERN = re.compile(r"(\d+)mm x (\d+)mm") + DIRECTIONS = ("normal", "left", "inverted", "right", "invalid direction") + REFLECTIONS = ( + "none", "[Xx] axis", "[Yy] axis", "X and Y axis", "invalid reflection" + ) + DIRECTIONS_REFLECTIONS_PATTERN = re.compile( + f"({'|'.join(DIRECTIONS)})( {'|'.join(REFLECTIONS)})? \\(" + ) + AVAILABLE_DIRECTIONS_REFLECTIONS_PATTERN = re.compile( + f"{'|'.join(DIRECTIONS)}|{'|'.join(REFLECTIONS)}" + ) + MODE_PATTERN = re.compile(r"^\s{2}([^ ]+) \(0x([0-9A-Fa-f]+)\)\s+([^ ]+)MHz") + MODE_FLAGS_PATTERN = re.compile("[-+][CHV]Sync|Interlace|DoubleScan") + WIDTH_HEIGHT_PATTERN = re.compile(r"^\s{8}(h: width|v: height)\s+(\d+) ") + OUTPUTS = ("eDP-1", "eDP-2") + + def __init__(self): + self.current_conf = self.get_conf() + + @staticmethod + def strip_each(ss): + return (s.strip() for s in ss) + + @staticmethod + def int_tuple(it): + return tuple(int(i) for i in it) + + @classmethod + def parse_screen_line(cls, line): + screen_id, sizes = cls.strip_each(line.split(":", 1)) + return { + "screen": int(screen_id[7:]), + "sizes": [ + { + "name": name, + "size": cls.int_tuple(cls.strip_each(size.split("x"))), + } for name, size in ( + cls.strip_each(p.split(" ", 1)) + for p in cls.strip_each(sizes.split(",")) + ) + ], + "outputs": [], + } + + @classmethod + def parse_output_line(cls, line): + name, connected, rest = cls.strip_each(line.split(" ", 2)) + output = { + "name": name, + "connected": cls.CONNECTED_MAPPING.get( + connected, "unknown connection" + ), + } + if output["connected"] is False: + return output + resolution = cls.RESOLUTION_PATTERN.search(rest) + output["active"] = bool(resolution) + if resolution is None: + return output + parsed = cls.int_tuple(resolution.group(i) for i in range(1, 5)) + direction_reflection = ( + cls.DIRECTIONS_REFLECTIONS_PATTERN.search(rest) + ) + end = direction_reflection.end() + output.update( + { + "size": parsed[:2], + "pos": parsed[2:], + "primary": rest.startswith("primary "), + "direction": direction_reflection.group(1), + "reflection": direction_reflection.group(2) or "none", + "available_rotations_reflections": [ + m.group(0) + for m in cls.AVAILABLE_DIRECTIONS_REFLECTIONS_PATTERN.finditer( + rest[end:rest.find(")", end)] + ) + ], + "modes": [] + } + ) + mm_size = cls.MM_SIZE_PATTERN.search(rest) + if mm_size: + output["mm_size"] = ( + int(mm_size.group(1)), int(mm_size.group(2)) + ) + return output + + @classmethod + def parse_mode(cls, line, width_line, height_line): + if line.endswith(" +preferred"): + line = line[:-11] + preferred = True + else: + preferred = False + if line.endswith(" *current"): + line = line[:-9] + current = True + else: + current = False + match = cls.MODE_PATTERN.match(line) + return { + "name": match.group(1), + "width": int(cls.WIDTH_HEIGHT_PATTERN.match(width_line).group(2)), + "height": int(cls.WIDTH_HEIGHT_PATTERN.match(height_line).group(2)), + "mode_id": int(match.group(2), 16), + "refresh_rate": float(match.group(3)), + "preferred": preferred, + "current": current, + "mode_flags": cls.MODE_FLAGS_PATTERN.findall(line), + } + + @classmethod + def get_conf(cls): + screens = [] + current_screen = None + output = subprocess.check_output(["xrandr", "--verbose"], text=True) + lines = output.split(os.linesep) + for i, line in enumerate(lines): + if not line: + continue + elif line.startswith("Screen "): + current_screen = len(screens) + screens.append(cls.parse_screen_line(line)) + elif not line.startswith("\t") and not line.startswith(" "): + screens[current_screen]["outputs"].append(cls.parse_output_line(line)) + elif line.startswith(" "): + output = screens[current_screen]["outputs"][-1] + if line[3] == " " or "modes" not in output: + continue + output["modes"].append(cls.parse_mode(*lines[i:i + 3])) + return screens + + @classmethod + def call(cls, output, *args, **kwargs): + args = [f"--{arg}" for arg in args] + for key, value in kwargs.items(): + args.extend((f"--{key.replace("_", "-")}", value)) + return subprocess.run(["xrandr", "--output", cls.OUTPUTS[output], *args]) + + def update(self, mode): + if mode not in ("single", "double", "vertical"): + raise ValueError(f"unknown mode: {mode}") + self.call(1, "off", "noprimary") + if mode in ("single", "double"): + self.call(0, "auto", "primary", rotate="normal") + if mode == "double": + self.call(1, "auto", rotate="normal", below=self.OUTPUTS[0]) + if mode == "vertical": + self.call(0, "auto", rotate="left") + self.call(1, "auto", rotate="left", left_of=self.OUTPUTS[0]) + self.current_conf = self.get_conf() + + def get_relevant_outputs(self, screen_id=0): + screen = next(s for s in self.current_conf if s["screen"] == screen_id) + outputs = [None, None] + to_finds = list(self.OUTPUTS) + for output in screen["outputs"]: + for i, to_find in enumerate(to_finds): + if output["name"] == to_find: + outputs[self.OUTPUTS.index(to_find)] = output + to_finds.remove(to_find) + if not to_finds: + return outputs + raise ValueError(f"Outputs not found: {', '.join(self.OUTPUTS)}") + + @staticmethod + def compare_output(outputs, expecteds): + for output, expected in zip(outputs, expecteds): + for key, value in expected.items(): + if key == "pos": + for p, q in zip(output[key], value): + if p != q: + return False + elif output[key] != value: + return False + return True + + def is_active(self, mode): + outputs = self.get_relevant_outputs() + expected = { + "single": [ + { + "active": True, + "pos": [0, 0], + "direction": "normal", + }, + { + "active": False, + }, + ], + "double": [ + { + "active": True, + "pos": [0, 0], + "direction": "normal", + }, + { + "active": True, + "pos": [0, outputs[0].get("size", [None])[-1]], + "direction": "normal", + }, + ], + "vertical": [ + { + "active": True, + "pos": [outputs[1].get("size", [None])[0], 0], + "direction": "left", + }, + { + "active": True, + "pos": [0, 0], + "direction": "left", + }, + ], + } + return self.compare_output(outputs, expected[mode]) + + def count_active(self, screen_id=0): + screen = next(s for s in self.current_conf if s["screen"] == screen_id) + return sum( + output["connected"] and output["active"] for output in screen["outputs"] + ) -- 2.51.0