input: Restructure to use GObject and io_watches

This isn't ideal, since opening all of the devices in /dev/input is slow, and
this runs on the UI thread, but it works for now. Ultimately, InputReader should
probably be restructured as a threading.Thread and use GLib.idle_add to send the
signals out.
This commit is contained in:
June Tate-Gans 2021-05-03 18:09:08 -05:00
parent 65c2580e45
commit ce8a106ea3

View File

@ -1,56 +1,78 @@
import threading
import gi
import glob
import asyncio
import evdev
import threading
from evdev import InputDevice
from evdev import KeyEvent
from evdev import ecodes
gi.require_version('Gtk', '3.0')
from gi.repository import GObject
import evdev
from evdev import InputDevice
from evdev import ecodes
from select import select
from gi.repository import GObject, GLib
class InputReader(threading.Thread, GObject.GObject):
class InputReader(GObject.Object):
INPUT_PATH = '/dev/input'
def __init__(self):
threading.Thread.__init__(self, daemon=True)
GObject.GObject.__init__(self)
self.reset()
GObject.Object.__init__(self)
def reset(self):
self._isRunning = False
self._capturing = False
self._watches = []
self._keyStates = {}
@GObject.Signal(name='evdev-key-released')
def keyReleased(self, keyCode, time):
print('key released: %d')
def capture(self):
if self._capturing:
return False
@GObject.Signal(name='evdev-key-pressed')
def keyPressed(self, keyCode, time):
print('key pressed: %d')
self._capturing = True
def run(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._keyStates = {}
devices = [InputDevice(path) for path in evdev.list_devices()]
keyboards = [dev for dev in devices if ecodes.EV_KEY in dev.capabilities()]
print('Have keyboards: %s' % (keyboards))
self._keyboards = [dev for dev in devices
if ecodes.EV_KEY in dev.capabilities()]
self._keyboards = {dev.fd: dev for dev in self._keyboards}
self._watches = []
for fd, kbd in self._keyboards.items():
watch = GLib.io_add_watch(fd,
GLib.PRIORITY_HIGH,
GLib.IO_IN,
self.handleEvent)
self._watches.append(watch)
while self._isRunning:
r, w, x = select(keyboards, [], [])
print('keyboards listening!')
for fd in r:
for event in keyboards[fd].read():
return False
def stop(self):
if not self._capturing:
return False
for i in self._watches:
GLib.source_remove(i)
for fd, kbd in self._keyboards.items():
del kbd
del fd
self._keyboards = None
self._watches = []
self._keyStates = {}
self._capturing = False
return False
def handleEvent(self, fd, condition, *args):
for event in self._keyboards[fd].read():
if event.type != ecodes.EV_KEY:
continue
keyCode = event.keycode
keyDown = event.keystate == event.key_down
lastKeyState = self._keystates.get(keyCode, False)
event = KeyEvent(event)
if 'BTN_MOUSE' in event.keycode:
continue
keyCode = event.scancode
keyDown = event.keystate in (
event.key_down, event.key_hold)
lastKeyState = self._keyStates.get(keyCode, False)
if keyDown and not lastKeyState:
self.emit('evdev-key-pressed',
@ -59,5 +81,14 @@ class InputReader(threading.Thread, GObject.GObject):
self.emit('evdev-key-released',
keyCode, event.event.timestamp())
def shutdown(self):
self._isRunning = False
self._keyStates[keyCode] = keyDown
return True
@GObject.Signal(name='evdev-key-released', arg_types=[int, float])
def keyReleased(self, keyCode, time):
pass
@GObject.Signal(name='evdev-key-pressed', arg_types=[int, float])
def keyPressed(self, keyCode, time):
pass