]> git.mar77i.info Git - zenbook_gui/commitdiff
import bluetooth, xinput, xrandr
authormar77i <mar77i@protonmail.ch>
Tue, 21 Jan 2025 16:57:26 +0000 (17:57 +0100)
committermar77i <mar77i@protonmail.ch>
Tue, 21 Jan 2025 16:57:26 +0000 (17:57 +0100)
zenbook_conf/__init__.py [new file with mode: 0644]
zenbook_conf/bluetooth.py [new file with mode: 0644]
zenbook_conf/xinput.py [new file with mode: 0644]
zenbook_conf/xrandr.py [new file with mode: 0644]

diff --git a/zenbook_conf/__init__.py b/zenbook_conf/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/zenbook_conf/bluetooth.py b/zenbook_conf/bluetooth.py
new file mode 100644 (file)
index 0000000..214fdde
--- /dev/null
@@ -0,0 +1,25 @@
+import os
+from subprocess import check_output, run
+
+
+class BluetoothConf:
+    DEVICE_TYPE = "bluetooth"
+
+    def __init__(self):
+        self.conf = self.get_config()
+
+    def get_config(self):
+        output = check_output(
+            ["rfkill", "-rnoSOFT,HARD", "list", self.DEVICE_TYPE], text=True
+        )
+        state = None
+        for line in output.strip().split(os.linesep):
+            new_state = "blocked" not in line.split()
+            if state is not None and new_state != state:
+                return None
+            state = new_state
+        return state
+
+    def update(self, value):
+        run(["rfkill", f"{'un' if value else ''}block", self.DEVICE_TYPE])
+        self.conf = self.get_config()
diff --git a/zenbook_conf/xinput.py b/zenbook_conf/xinput.py
new file mode 100644 (file)
index 0000000..0dc586a
--- /dev/null
@@ -0,0 +1,123 @@
+import os
+import re
+from subprocess import DEVNULL, check_output, run
+
+
+class XinputConf:
+    DEVICES = {
+        "ELAN9008:00 04F3:425B": {
+            "output": "eDP-1",
+            "type": "touchpad",
+        },
+        "ELAN9008:00 04F3:425B Touchpad": {
+            "output": "eDP-1",
+            "type": "touchpad",
+        },
+        "ELAN9008:00 04F3:425B Stylus": {
+            "output": "eDP-1",
+            "type": "stylus",
+            "ignore_result": True,
+        },
+        "ELAN9008:00 04F3:425B Stylus Pen (0)": {
+            "output": "eDP-1",
+            "type": "stylus",
+        },
+        "ELAN9009:00 04F3:425A": {
+            "output": "eDP-2",
+            "type": "touchpad",
+        },
+        "ELAN9009:00 04F3:425A Touchpad": {
+            "output": "eDP-2",
+            "type": "touchpad",
+        },
+        "ELAN9009:00 04F3:425A Stylus": {
+            "output": "eDP-2",
+            "type": "stylus",
+            "ignore_result": True,
+        },
+        "ELAN9009:00 04F3:425A Stylus Pen (0)": {
+            "output": "eDP-2",
+            "type": "stylus",
+        },
+    }
+    DEVICE_PATTERN = re.compile(
+        "^([\u239c ]\\s*\u21b3|\u223c)\\s+(([^ ]| [^ ])+)\\s*\tid=(\\d+)\t"
+    )
+
+    def __init__(self, xrandr_conf):
+        self.xrandr_conf = xrandr_conf
+        self.conf = self.get_conf()
+
+    def get_conf(self):
+        output = check_output(["xinput", "list"], text=True)
+        devices = []
+        for line in output.split(os.linesep):
+            match = self.DEVICE_PATTERN.match(line)
+            if match:
+                if match.group(2) not in self.DEVICES:
+                    continue
+                devices.append(
+                    {
+                        "name": match.group(2),
+                        "enabled": not match.group(1).startswith("\u223c"),
+                        "id": match.group(4),
+                        **self.DEVICES[match.group(2)],
+                    }
+                )
+            elif line and "core pointer" not in line and "core keyboard" not in line:
+                print("line", repr(line))
+        return devices
+
+    def update_device(self, device, enable=None):
+        if enable is None:
+            enable = device["enabled"]
+        if enable:
+            run(["xinput", "enable", device["id"]])
+            run(
+                ["xinput", "map-to-output", device["id"], device["output"]],
+                stderr=DEVNULL if device.get("ignore_result") else None,
+            )
+        else:
+            run(["xinput", "disable", device["id"]])
+        self.conf = self.get_conf()
+
+    def update(self, mode):
+        enable_second = mode != "single"
+        for device in self.conf:
+            if device["output"] == self.xrandr_conf.OUTPUTS[0]:
+                self.update_device(device)
+            elif device["output"] == self.xrandr_conf.OUTPUTS[1]:
+                self.update_device(device, enable_second)
+
+    def update_by_type(self, device_type, state):
+        outputs = {o["name"]: o for o in self.xrandr_conf.get_relevant_outputs()}
+        for device in self.conf:
+            if device["type"] != device_type:
+                continue
+            s = state
+            if not outputs[device["output"]]["active"]:
+                s = False
+            self.update_device(device, s)
+        return state
+
+    def conf_by_type(self, device_type):
+        outputs = {o["name"]: o for o in self.xrandr_conf.get_relevant_outputs()}
+        state = None
+        for device in self.conf:
+            if (
+                device["type"] != device_type
+                or not outputs[device["output"]]["active"]
+                or device.get("ignore_result")
+            ):
+                continue
+            new_state = device["enabled"]
+            if state is not None and new_state != state:
+                return None
+            state = new_state
+        return state
+
+    def reapply_by_type(self, device_type):
+        for device in self.conf:
+            if device["type"] != device_type:
+                continue
+            self.update_device(device, device["enabled"])
diff --git a/zenbook_conf/xrandr.py b/zenbook_conf/xrandr.py
new file mode 100644 (file)
index 0000000..db135f1
--- /dev/null
@@ -0,0 +1,233 @@
+import os
+import re
+import subprocess
+
+
+class XrandrConf:
+    CONNECTED_MAPPING = {
+        "connected": True,
+        "disconnected": False,
+    }
+    RESOLUTION_PATTERN = re.compile(r"(\d+)x(\d+)\+(\d+)\+(\d+)")
+    MM_SIZE_PATTERN = re.compile(r"(\d+)mm x (\d+)mm")
+    DIRECTIONS = ("normal", "left", "inverted", "right", "invalid direction")
+    REFLECTIONS = (
+        "none", "[Xx] axis", "[Yy] axis", "X and Y axis", "invalid reflection"
+    )
+    DIRECTIONS_REFLECTIONS_PATTERN = re.compile(
+        f"({'|'.join(DIRECTIONS)})( {'|'.join(REFLECTIONS)})? \\("
+    )
+    AVAILABLE_DIRECTIONS_REFLECTIONS_PATTERN = re.compile(
+        f"{'|'.join(DIRECTIONS)}|{'|'.join(REFLECTIONS)}"
+    )
+    MODE_PATTERN = re.compile(r"^\s{2}([^ ]+) \(0x([0-9A-Fa-f]+)\)\s+([^ ]+)MHz")
+    MODE_FLAGS_PATTERN = re.compile("[-+][CHV]Sync|Interlace|DoubleScan")
+    WIDTH_HEIGHT_PATTERN = re.compile(r"^\s{8}(h: width|v: height)\s+(\d+) ")
+    OUTPUTS = ("eDP-1", "eDP-2")
+
+    def __init__(self):
+        self.current_conf = self.get_conf()
+
+    @staticmethod
+    def strip_each(ss):
+        return (s.strip() for s in ss)
+
+    @staticmethod
+    def int_tuple(it):
+        return tuple(int(i) for i in it)
+
+    @classmethod
+    def parse_screen_line(cls, line):
+        screen_id, sizes = cls.strip_each(line.split(":", 1))
+        return {
+            "screen": int(screen_id[7:]),
+            "sizes": [
+                {
+                    "name": name,
+                    "size": cls.int_tuple(cls.strip_each(size.split("x"))),
+                } for name, size in (
+                    cls.strip_each(p.split(" ", 1))
+                    for p in cls.strip_each(sizes.split(","))
+                )
+            ],
+            "outputs": [],
+        }
+
+    @classmethod
+    def parse_output_line(cls, line):
+        name, connected, rest = cls.strip_each(line.split(" ", 2))
+        output = {
+            "name": name,
+            "connected": cls.CONNECTED_MAPPING.get(
+                connected, "unknown connection"
+            ),
+        }
+        if output["connected"] is False:
+            return output
+        resolution = cls.RESOLUTION_PATTERN.search(rest)
+        output["active"] = bool(resolution)
+        if resolution is None:
+            return output
+        parsed = cls.int_tuple(resolution.group(i) for i in range(1, 5))
+        direction_reflection = (
+            cls.DIRECTIONS_REFLECTIONS_PATTERN.search(rest)
+        )
+        end = direction_reflection.end()
+        output.update(
+            {
+                "size": parsed[:2],
+                "pos": parsed[2:],
+                "primary": rest.startswith("primary "),
+                "direction": direction_reflection.group(1),
+                "reflection": direction_reflection.group(2) or "none",
+                "available_rotations_reflections": [
+                    m.group(0)
+                    for m in cls.AVAILABLE_DIRECTIONS_REFLECTIONS_PATTERN.finditer(
+                        rest[end:rest.find(")", end)]
+                    )
+                ],
+                "modes": []
+            }
+        )
+        mm_size = cls.MM_SIZE_PATTERN.search(rest)
+        if mm_size:
+            output["mm_size"] = (
+                int(mm_size.group(1)), int(mm_size.group(2))
+            )
+        return output
+
+    @classmethod
+    def parse_mode(cls, line, width_line, height_line):
+        if line.endswith(" +preferred"):
+            line = line[:-11]
+            preferred = True
+        else:
+            preferred = False
+        if line.endswith(" *current"):
+            line = line[:-9]
+            current = True
+        else:
+            current = False
+        match = cls.MODE_PATTERN.match(line)
+        return {
+            "name": match.group(1),
+            "width": int(cls.WIDTH_HEIGHT_PATTERN.match(width_line).group(2)),
+            "height": int(cls.WIDTH_HEIGHT_PATTERN.match(height_line).group(2)),
+            "mode_id": int(match.group(2), 16),
+            "refresh_rate": float(match.group(3)),
+            "preferred": preferred,
+            "current": current,
+            "mode_flags": cls.MODE_FLAGS_PATTERN.findall(line),
+        }
+
+    @classmethod
+    def get_conf(cls):
+        screens = []
+        current_screen = None
+        output = subprocess.check_output(["xrandr", "--verbose"], text=True)
+        lines = output.split(os.linesep)
+        for i, line in enumerate(lines):
+            if not line:
+                continue
+            elif line.startswith("Screen "):
+                current_screen = len(screens)
+                screens.append(cls.parse_screen_line(line))
+            elif not line.startswith("\t") and not line.startswith(" "):
+                screens[current_screen]["outputs"].append(cls.parse_output_line(line))
+            elif line.startswith("  "):
+                output = screens[current_screen]["outputs"][-1]
+                if line[3] == " " or "modes" not in output:
+                    continue
+                output["modes"].append(cls.parse_mode(*lines[i:i + 3]))
+        return screens
+
+    @classmethod
+    def call(cls, output, *args, **kwargs):
+        args = [f"--{arg}" for arg in args]
+        for key, value in kwargs.items():
+            args.extend((f"--{key.replace("_", "-")}", value))
+        return subprocess.run(["xrandr", "--output", cls.OUTPUTS[output], *args])
+
+    def update(self, mode):
+        if mode not in ("single", "double", "vertical"):
+            raise ValueError(f"unknown mode: {mode}")
+        self.call(1, "off", "noprimary")
+        if mode in ("single", "double"):
+            self.call(0, "auto", "primary", rotate="normal")
+        if mode == "double":
+            self.call(1, "auto", rotate="normal", below=self.OUTPUTS[0])
+        if mode == "vertical":
+            self.call(0, "auto", rotate="left")
+            self.call(1, "auto", rotate="left", left_of=self.OUTPUTS[0])
+        self.current_conf = self.get_conf()
+
+    def get_relevant_outputs(self, screen_id=0):
+        screen = next(s for s in self.current_conf if s["screen"] == screen_id)
+        outputs = [None, None]
+        to_finds = list(self.OUTPUTS)
+        for output in screen["outputs"]:
+            for i, to_find in enumerate(to_finds):
+                if output["name"] == to_find:
+                    outputs[self.OUTPUTS.index(to_find)] = output
+                    to_finds.remove(to_find)
+            if not to_finds:
+                return outputs
+        raise ValueError(f"Outputs not found: {', '.join(self.OUTPUTS)}")
+
+    @staticmethod
+    def compare_output(outputs, expecteds):
+        for output, expected in zip(outputs, expecteds):
+            for key, value in expected.items():
+                if key == "pos":
+                    for p, q in zip(output[key], value):
+                        if p != q:
+                            return False
+                elif output[key] != value:
+                    return False
+        return True
+
+    def is_active(self, mode):
+        outputs = self.get_relevant_outputs()
+        expected = {
+            "single": [
+                {
+                    "active": True,
+                    "pos": [0, 0],
+                    "direction": "normal",
+                },
+                {
+                    "active": False,
+                },
+            ],
+            "double": [
+                {
+                    "active": True,
+                    "pos": [0, 0],
+                    "direction": "normal",
+                },
+                {
+                    "active": True,
+                    "pos": [0, outputs[0].get("size", [None])[-1]],
+                    "direction": "normal",
+                },
+            ],
+            "vertical": [
+                {
+                    "active": True,
+                    "pos": [outputs[1].get("size", [None])[0], 0],
+                    "direction": "left",
+                },
+                {
+                    "active": True,
+                    "pos": [0, 0],
+                    "direction": "left",
+                },
+            ],
+        }
+        return self.compare_output(outputs, expected[mode])
+
+    def count_active(self, screen_id=0):
+        screen = next(s for s in self.current_conf if s["screen"] == screen_id)
+        return sum(
+            output["connected"] and output["active"] for output in screen["outputs"]
+        )