--- /dev/null
+import os
+import re
+from subprocess import DEVNULL, check_output, run
+
+
+class XinputConf:
+ DEVICES = {
+ "ELAN9008:00 04F3:425B": {
+ "output": "eDP-1",
+ "type": "touchpad",
+ },
+ "ELAN9008:00 04F3:425B Touchpad": {
+ "output": "eDP-1",
+ "type": "touchpad",
+ },
+ "ELAN9008:00 04F3:425B Stylus": {
+ "output": "eDP-1",
+ "type": "stylus",
+ "ignore_result": True,
+ },
+ "ELAN9008:00 04F3:425B Stylus Pen (0)": {
+ "output": "eDP-1",
+ "type": "stylus",
+ },
+ "ELAN9009:00 04F3:425A": {
+ "output": "eDP-2",
+ "type": "touchpad",
+ },
+ "ELAN9009:00 04F3:425A Touchpad": {
+ "output": "eDP-2",
+ "type": "touchpad",
+ },
+ "ELAN9009:00 04F3:425A Stylus": {
+ "output": "eDP-2",
+ "type": "stylus",
+ "ignore_result": True,
+ },
+ "ELAN9009:00 04F3:425A Stylus Pen (0)": {
+ "output": "eDP-2",
+ "type": "stylus",
+ },
+ }
+ DEVICE_PATTERN = re.compile(
+ "^([\u239c ]\\s*\u21b3|\u223c)\\s+(([^ ]| [^ ])+)\\s*\tid=(\\d+)\t"
+ )
+
+ def __init__(self, xrandr_conf):
+ self.xrandr_conf = xrandr_conf
+ self.conf = self.get_conf()
+
+ def get_conf(self):
+ output = check_output(["xinput", "list"], text=True)
+ devices = []
+ for line in output.split(os.linesep):
+ match = self.DEVICE_PATTERN.match(line)
+ if match:
+ if match.group(2) not in self.DEVICES:
+ continue
+ devices.append(
+ {
+ "name": match.group(2),
+ "enabled": not match.group(1).startswith("\u223c"),
+ "id": match.group(4),
+ **self.DEVICES[match.group(2)],
+ }
+ )
+ elif line and "core pointer" not in line and "core keyboard" not in line:
+ print("line", repr(line))
+ return devices
+
+ def update_device(self, device, enable=None):
+ if enable is None:
+ enable = device["enabled"]
+ if enable:
+ run(["xinput", "enable", device["id"]])
+ run(
+ ["xinput", "map-to-output", device["id"], device["output"]],
+ stderr=DEVNULL if device.get("ignore_result") else None,
+ )
+ else:
+ run(["xinput", "disable", device["id"]])
+ self.conf = self.get_conf()
+
+ def update(self, mode):
+ enable_second = mode != "single"
+ for device in self.conf:
+ if device["output"] == self.xrandr_conf.OUTPUTS[0]:
+ self.update_device(device)
+ elif device["output"] == self.xrandr_conf.OUTPUTS[1]:
+ self.update_device(device, enable_second)
+
+ def update_by_type(self, device_type, state):
+ outputs = {o["name"]: o for o in self.xrandr_conf.get_relevant_outputs()}
+ for device in self.conf:
+ if device["type"] != device_type:
+ continue
+ s = state
+ if not outputs[device["output"]]["active"]:
+ s = False
+ self.update_device(device, s)
+ return state
+
+ def conf_by_type(self, device_type):
+ outputs = {o["name"]: o for o in self.xrandr_conf.get_relevant_outputs()}
+ state = None
+ for device in self.conf:
+ if (
+ device["type"] != device_type
+ or not outputs[device["output"]]["active"]
+ or device.get("ignore_result")
+ ):
+ continue
+ new_state = device["enabled"]
+ if state is not None and new_state != state:
+ return None
+ state = new_state
+ return state
+
+ def reapply_by_type(self, device_type):
+ for device in self.conf:
+ if device["type"] != device_type:
+ continue
+ self.update_device(device, device["enabled"])
--- /dev/null
+import os
+import re
+import subprocess
+
+
+class XrandrConf:
+ CONNECTED_MAPPING = {
+ "connected": True,
+ "disconnected": False,
+ }
+ RESOLUTION_PATTERN = re.compile(r"(\d+)x(\d+)\+(\d+)\+(\d+)")
+ MM_SIZE_PATTERN = re.compile(r"(\d+)mm x (\d+)mm")
+ DIRECTIONS = ("normal", "left", "inverted", "right", "invalid direction")
+ REFLECTIONS = (
+ "none", "[Xx] axis", "[Yy] axis", "X and Y axis", "invalid reflection"
+ )
+ DIRECTIONS_REFLECTIONS_PATTERN = re.compile(
+ f"({'|'.join(DIRECTIONS)})( {'|'.join(REFLECTIONS)})? \\("
+ )
+ AVAILABLE_DIRECTIONS_REFLECTIONS_PATTERN = re.compile(
+ f"{'|'.join(DIRECTIONS)}|{'|'.join(REFLECTIONS)}"
+ )
+ MODE_PATTERN = re.compile(r"^\s{2}([^ ]+) \(0x([0-9A-Fa-f]+)\)\s+([^ ]+)MHz")
+ MODE_FLAGS_PATTERN = re.compile("[-+][CHV]Sync|Interlace|DoubleScan")
+ WIDTH_HEIGHT_PATTERN = re.compile(r"^\s{8}(h: width|v: height)\s+(\d+) ")
+ OUTPUTS = ("eDP-1", "eDP-2")
+
+ def __init__(self):
+ self.current_conf = self.get_conf()
+
+ @staticmethod
+ def strip_each(ss):
+ return (s.strip() for s in ss)
+
+ @staticmethod
+ def int_tuple(it):
+ return tuple(int(i) for i in it)
+
+ @classmethod
+ def parse_screen_line(cls, line):
+ screen_id, sizes = cls.strip_each(line.split(":", 1))
+ return {
+ "screen": int(screen_id[7:]),
+ "sizes": [
+ {
+ "name": name,
+ "size": cls.int_tuple(cls.strip_each(size.split("x"))),
+ } for name, size in (
+ cls.strip_each(p.split(" ", 1))
+ for p in cls.strip_each(sizes.split(","))
+ )
+ ],
+ "outputs": [],
+ }
+
+ @classmethod
+ def parse_output_line(cls, line):
+ name, connected, rest = cls.strip_each(line.split(" ", 2))
+ output = {
+ "name": name,
+ "connected": cls.CONNECTED_MAPPING.get(
+ connected, "unknown connection"
+ ),
+ }
+ if output["connected"] is False:
+ return output
+ resolution = cls.RESOLUTION_PATTERN.search(rest)
+ output["active"] = bool(resolution)
+ if resolution is None:
+ return output
+ parsed = cls.int_tuple(resolution.group(i) for i in range(1, 5))
+ direction_reflection = (
+ cls.DIRECTIONS_REFLECTIONS_PATTERN.search(rest)
+ )
+ end = direction_reflection.end()
+ output.update(
+ {
+ "size": parsed[:2],
+ "pos": parsed[2:],
+ "primary": rest.startswith("primary "),
+ "direction": direction_reflection.group(1),
+ "reflection": direction_reflection.group(2) or "none",
+ "available_rotations_reflections": [
+ m.group(0)
+ for m in cls.AVAILABLE_DIRECTIONS_REFLECTIONS_PATTERN.finditer(
+ rest[end:rest.find(")", end)]
+ )
+ ],
+ "modes": []
+ }
+ )
+ mm_size = cls.MM_SIZE_PATTERN.search(rest)
+ if mm_size:
+ output["mm_size"] = (
+ int(mm_size.group(1)), int(mm_size.group(2))
+ )
+ return output
+
+ @classmethod
+ def parse_mode(cls, line, width_line, height_line):
+ if line.endswith(" +preferred"):
+ line = line[:-11]
+ preferred = True
+ else:
+ preferred = False
+ if line.endswith(" *current"):
+ line = line[:-9]
+ current = True
+ else:
+ current = False
+ match = cls.MODE_PATTERN.match(line)
+ return {
+ "name": match.group(1),
+ "width": int(cls.WIDTH_HEIGHT_PATTERN.match(width_line).group(2)),
+ "height": int(cls.WIDTH_HEIGHT_PATTERN.match(height_line).group(2)),
+ "mode_id": int(match.group(2), 16),
+ "refresh_rate": float(match.group(3)),
+ "preferred": preferred,
+ "current": current,
+ "mode_flags": cls.MODE_FLAGS_PATTERN.findall(line),
+ }
+
+ @classmethod
+ def get_conf(cls):
+ screens = []
+ current_screen = None
+ output = subprocess.check_output(["xrandr", "--verbose"], text=True)
+ lines = output.split(os.linesep)
+ for i, line in enumerate(lines):
+ if not line:
+ continue
+ elif line.startswith("Screen "):
+ current_screen = len(screens)
+ screens.append(cls.parse_screen_line(line))
+ elif not line.startswith("\t") and not line.startswith(" "):
+ screens[current_screen]["outputs"].append(cls.parse_output_line(line))
+ elif line.startswith(" "):
+ output = screens[current_screen]["outputs"][-1]
+ if line[3] == " " or "modes" not in output:
+ continue
+ output["modes"].append(cls.parse_mode(*lines[i:i + 3]))
+ return screens
+
+ @classmethod
+ def call(cls, output, *args, **kwargs):
+ args = [f"--{arg}" for arg in args]
+ for key, value in kwargs.items():
+ args.extend((f"--{key.replace("_", "-")}", value))
+ return subprocess.run(["xrandr", "--output", cls.OUTPUTS[output], *args])
+
+ def update(self, mode):
+ if mode not in ("single", "double", "vertical"):
+ raise ValueError(f"unknown mode: {mode}")
+ self.call(1, "off", "noprimary")
+ if mode in ("single", "double"):
+ self.call(0, "auto", "primary", rotate="normal")
+ if mode == "double":
+ self.call(1, "auto", rotate="normal", below=self.OUTPUTS[0])
+ if mode == "vertical":
+ self.call(0, "auto", rotate="left")
+ self.call(1, "auto", rotate="left", left_of=self.OUTPUTS[0])
+ self.current_conf = self.get_conf()
+
+ def get_relevant_outputs(self, screen_id=0):
+ screen = next(s for s in self.current_conf if s["screen"] == screen_id)
+ outputs = [None, None]
+ to_finds = list(self.OUTPUTS)
+ for output in screen["outputs"]:
+ for i, to_find in enumerate(to_finds):
+ if output["name"] == to_find:
+ outputs[self.OUTPUTS.index(to_find)] = output
+ to_finds.remove(to_find)
+ if not to_finds:
+ return outputs
+ raise ValueError(f"Outputs not found: {', '.join(self.OUTPUTS)}")
+
+ @staticmethod
+ def compare_output(outputs, expecteds):
+ for output, expected in zip(outputs, expecteds):
+ for key, value in expected.items():
+ if key == "pos":
+ for p, q in zip(output[key], value):
+ if p != q:
+ return False
+ elif output[key] != value:
+ return False
+ return True
+
+ def is_active(self, mode):
+ outputs = self.get_relevant_outputs()
+ expected = {
+ "single": [
+ {
+ "active": True,
+ "pos": [0, 0],
+ "direction": "normal",
+ },
+ {
+ "active": False,
+ },
+ ],
+ "double": [
+ {
+ "active": True,
+ "pos": [0, 0],
+ "direction": "normal",
+ },
+ {
+ "active": True,
+ "pos": [0, outputs[0].get("size", [None])[-1]],
+ "direction": "normal",
+ },
+ ],
+ "vertical": [
+ {
+ "active": True,
+ "pos": [outputs[1].get("size", [None])[0], 0],
+ "direction": "left",
+ },
+ {
+ "active": True,
+ "pos": [0, 0],
+ "direction": "left",
+ },
+ ],
+ }
+ return self.compare_output(outputs, expected[mode])
+
+ def count_active(self, screen_id=0):
+ screen = next(s for s in self.current_conf if s["screen"] == screen_id)
+ return sum(
+ output["connected"] and output["active"] for output in screen["outputs"]
+ )