From ce8a106ea30c0c7c5640946e48bd9883b666d9be Mon Sep 17 00:00:00 2001 From: June Tate-Gans Date: Mon, 3 May 2021 18:09:08 -0500 Subject: [PATCH] input: Restructure to use GObject and io_watches This isn't ideal, since opening all of the devices in /dev/input is slow, and this runs on the UI thread, but it works for now. Ultimately, InputReader should probably be restructured as a threading.Thread and use GLib.idle_add to send the signals out. --- g13gui/g13gui/input.py | 119 ++++++++++++++++++++++++++--------------- 1 file changed, 75 insertions(+), 44 deletions(-) diff --git a/g13gui/g13gui/input.py b/g13gui/g13gui/input.py index 570602a..4f871d3 100644 --- a/g13gui/g13gui/input.py +++ b/g13gui/g13gui/input.py @@ -1,63 +1,94 @@ -import threading import gi -import glob -import asyncio +import evdev +import threading + +from evdev import InputDevice +from evdev import KeyEvent +from evdev import ecodes gi.require_version('Gtk', '3.0') -from gi.repository import GObject - -import evdev -from evdev import InputDevice -from evdev import ecodes -from select import select +from gi.repository import GObject, GLib -class InputReader(threading.Thread, GObject.GObject): +class InputReader(GObject.Object): INPUT_PATH = '/dev/input' def __init__(self): - threading.Thread.__init__(self, daemon=True) - GObject.GObject.__init__(self) - self.reset() + GObject.Object.__init__(self) - def reset(self): - self._isRunning = False + self._capturing = False + self._watches = [] self._keyStates = {} - @GObject.Signal(name='evdev-key-released') - def keyReleased(self, keyCode, time): - print('key released: %d') + def capture(self): + if self._capturing: + return False - @GObject.Signal(name='evdev-key-pressed') - def keyPressed(self, keyCode, time): - print('key pressed: %d') + self._capturing = True - def run(self): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + self._keyStates = {} devices = [InputDevice(path) for path in evdev.list_devices()] - keyboards = [dev for dev in devices if ecodes.EV_KEY in dev.capabilities()] - print('Have keyboards: %s' % (keyboards)) + self._keyboards = [dev for dev in devices + if ecodes.EV_KEY in dev.capabilities()] + self._keyboards = {dev.fd: dev for dev in self._keyboards} + self._watches = [] + for fd, kbd in self._keyboards.items(): + watch = GLib.io_add_watch(fd, + GLib.PRIORITY_HIGH, + GLib.IO_IN, + self.handleEvent) + self._watches.append(watch) - while self._isRunning: - r, w, x = select(keyboards, [], []) - print('keyboards listening!') - for fd in r: - for event in keyboards[fd].read(): - if event.type != ecodes.EV_KEY: - continue + return False - keyCode = event.keycode - keyDown = event.keystate == event.key_down - lastKeyState = self._keystates.get(keyCode, False) + def stop(self): + if not self._capturing: + return False - if keyDown and not lastKeyState: - self.emit('evdev-key-pressed', - keyCode, event.event.timestamp()) - elif not keyDown and lastKeyState: - self.emit('evdev-key-released', - keyCode, event.event.timestamp()) + for i in self._watches: + GLib.source_remove(i) - def shutdown(self): - self._isRunning = False + for fd, kbd in self._keyboards.items(): + del kbd + del fd + + self._keyboards = None + self._watches = [] + self._keyStates = {} + self._capturing = False + + return False + + def handleEvent(self, fd, condition, *args): + for event in self._keyboards[fd].read(): + if event.type != ecodes.EV_KEY: + continue + + event = KeyEvent(event) + if 'BTN_MOUSE' in event.keycode: + continue + + keyCode = event.scancode + keyDown = event.keystate in ( + event.key_down, event.key_hold) + lastKeyState = self._keyStates.get(keyCode, False) + + if keyDown and not lastKeyState: + self.emit('evdev-key-pressed', + keyCode, event.event.timestamp()) + elif not keyDown and lastKeyState: + self.emit('evdev-key-released', + keyCode, event.event.timestamp()) + + self._keyStates[keyCode] = keyDown + + return True + + @GObject.Signal(name='evdev-key-released', arg_types=[int, float]) + def keyReleased(self, keyCode, time): + pass + + @GObject.Signal(name='evdev-key-pressed', arg_types=[int, float]) + def keyPressed(self, keyCode, time): + pass