g13: Change Manager's name to DeviceManager

This commit is contained in:
June Tate-Gans 2021-05-08 19:28:07 -05:00
parent 8390c065fe
commit c98bc506e2
3 changed files with 49 additions and 28 deletions

View File

@ -16,7 +16,9 @@ from evdev import ecodes as e
from g13gui.observer.observer import Observer from g13gui.observer.observer import Observer
from g13gui.model.bindings import StickMode from g13gui.model.bindings import StickMode
from g13gui.g13.common import G13NormalKeys from g13gui.g13.common import G13NormalKeys
from g13gui.g13.common import G13AppletKeys
from g13gui.g13.common import G13SpecialKeys from g13gui.g13.common import G13SpecialKeys
from g13gui.applet.manager import AppletManager
class G13Endpoints(enum.Enum): class G13Endpoints(enum.Enum):
@ -48,7 +50,7 @@ class StateError(RuntimeError):
pass pass
class Manager(threading.Thread, Observer): class DeviceManager(threading.Thread, Observer):
class State(enum.Enum): class State(enum.Enum):
DISCOVERING = 0 DISCOVERING = 0
FOUND = 1 FOUND = 1
@ -59,7 +61,7 @@ class Manager(threading.Thread, Observer):
Observer.__init__(self) Observer.__init__(self)
self._prefs = prefs self._prefs = prefs
self._state = Manager.State.DISCOVERING self._state = DeviceManager.State.DISCOVERING
self._device = None self._device = None
self._uinput = UInput(UINPUT_KEYBOARD_CAPS, self._uinput = UInput(UINPUT_KEYBOARD_CAPS,
name='G13 Keyboard', name='G13 Keyboard',
@ -70,6 +72,8 @@ class Manager(threading.Thread, Observer):
self._commandQueue = queue.Queue() self._commandQueue = queue.Queue()
self._lastProfile = None self._lastProfile = None
self._appletManager = AppletManager(self)
self._prefs.registerObserver(self, {'selectedProfile'}) self._prefs.registerObserver(self, {'selectedProfile'})
self._updateProfileRegistration() self._updateProfileRegistration()
self.changeTrigger(self.onSelectedProfileChanged, self.changeTrigger(self.onSelectedProfileChanged,
@ -85,15 +89,14 @@ class Manager(threading.Thread, Observer):
self._lastProfile.registerObserver(self, {'lcdColor'}) self._lastProfile.registerObserver(self, {'lcdColor'})
def onSelectedProfileChanged(self, subject, changeType, key, data): def onSelectedProfileChanged(self, subject, changeType, key, data):
print('onSelectedProfileChanged')
self._updateProfileRegistration() self._updateProfileRegistration()
if self._state == Manager.State.FOUND: if self._state == DeviceManager.State.FOUND:
self._updateLcdColor() self._updateLcdColor()
def onLcdColorChanged(self, subject, changeType, key, data): def onLcdColorChanged(self, subject, changeType, key, data):
print('onLcdColorChanged') print('onLcdColorChanged')
if self._state == Manager.State.FOUND: if self._state == DeviceManager.State.FOUND:
self._updateLcdColor() self._updateLcdColor()
def _updateLcdColor(self): def _updateLcdColor(self):
@ -105,10 +108,6 @@ class Manager(threading.Thread, Observer):
def state(self): def state(self):
return self._state return self._state
def _ensureState(self, state):
if self._state != state:
raise StateError()
def _reset(self): def _reset(self):
try: try:
self._device.reset() self._device.reset()
@ -124,9 +123,9 @@ class Manager(threading.Thread, Observer):
if self._device: if self._device:
self._reset() self._reset()
self._state = Manager.State.DISCOVERING self._state = DeviceManager.State.DISCOVERING
while self._state == Manager.State.DISCOVERING: while self._state == DeviceManager.State.DISCOVERING:
try: try:
while not self._device: while not self._device:
self._device = usb.core.find(idVendor=VENDOR_ID, self._device = usb.core.find(idVendor=VENDOR_ID,
@ -144,7 +143,7 @@ class Manager(threading.Thread, Observer):
traceback.print_exc() traceback.print_exc()
self._reset() self._reset()
else: else:
self._state = Manager.State.FOUND self._state = DeviceManager.State.FOUND
def _readKeys(self, buffer): def _readKeys(self, buffer):
# Apparently an "interrupt" read with the G13 "times out" if no keys # Apparently an "interrupt" read with the G13 "times out" if no keys
@ -173,7 +172,9 @@ class Manager(threading.Thread, Observer):
leds: a bitwise-or'd bitfield of LEDBits. Set is on. leds: a bitwise-or'd bitfield of LEDBits. Set is on.
""" """
self._ensureState(Manager.State.FOUND) if self.state != DeviceManager.State.FOUND:
return
self._commandQueue.put([self._setLedsMode, (leds,)]) self._commandQueue.put([self._setLedsMode, (leds,)])
def _setLedsMode(self, leds): def _setLedsMode(self, leds):
@ -191,7 +192,9 @@ class Manager(threading.Thread, Observer):
r, g, b: byte values between 0-255 r, g, b: byte values between 0-255
""" """
self._ensureState(Manager.State.FOUND) if self.state != DeviceManager.State.FOUND:
return
self._commandQueue.put([self._setBacklightColor, (r, g, b)]) self._commandQueue.put([self._setBacklightColor, (r, g, b)])
def _setBacklightColor(self, r, g, b): def _setBacklightColor(self, r, g, b):
@ -208,7 +211,9 @@ class Manager(threading.Thread, Observer):
Note: buffer must be a byte array containing an LPBM formatted image. Note: buffer must be a byte array containing an LPBM formatted image.
IOW, each byte represents one vertical row of 8 pixels each. IOW, each byte represents one vertical row of 8 pixels each.
""" """
self._ensureState(Manager.State.FOUND) if self.state != DeviceManager.State.FOUND:
return
self._commandQueue.put([self._setLCDBuffer, (buffer,)]) self._commandQueue.put([self._setLCDBuffer, (buffer,)])
def _setLCDBuffer(self, buffer): def _setLCDBuffer(self, buffer):
@ -231,30 +236,33 @@ class Manager(threading.Thread, Observer):
def run(self): def run(self):
reportBuffer = usb.util.create_buffer(REPORT_SIZE) reportBuffer = usb.util.create_buffer(REPORT_SIZE)
while self._state != Manager.State.SHUTDOWN: while self._state != DeviceManager.State.SHUTDOWN:
print('Discovering devices') print('Discovering devices')
self._discover() self._discover()
print('Got device') print('Got device')
self._updateLcdColor() self._updateLcdColor()
self._appletManager.onPresent()
while self._state == Manager.State.FOUND: while self._state == DeviceManager.State.FOUND:
try: try:
count = self._readKeys(reportBuffer) count = self._readKeys(reportBuffer)
if count == REPORT_SIZE: if count == REPORT_SIZE:
self._synthesizeKeys(reportBuffer) self._synthesizeKeys(reportBuffer)
self._signalSpecialKeys(reportBuffer)
self._synthesizeStick(reportBuffer) self._synthesizeStick(reportBuffer)
self._uinput.syn() self._uinput.syn()
self._processCommands() self._processCommands()
except usb.core.USBError as err: except usb.core.USBError as err:
print('Unexpected error occurred: %s' % err) if self._state != DeviceManager.State.SHUTDOWN:
print('Unexpected error occurred: %s' % err)
break break
print('Shutting down') print('Shutting down')
if self._device and self._state == Manager.State.FOUND: if self._device and self._state == DeviceManager.State.FOUND:
self._reset() self._reset()
def _synthesizeStick(self, report): def _synthesizeStick(self, report):
@ -304,16 +312,29 @@ class Manager(threading.Thread, Observer):
self._lastKeyState[key] = nowPressed self._lastKeyState[key] = nowPressed
def signalSpecialKeys(self, report): def _signalSpecialKeys(self, report):
for key in G13AppletKeys:
wasPressed = self._lastKeyState.get(key, False)
nowPressed = key.testReport(report)
# Emit special keypress if and only if it was released
if wasPressed and not nowPressed:
self._appletManager.onKeyReleased(key)
elif not wasPressed and nowPressed:
self._appletManager.onKeyPressed(key)
self._lastKeyState[key] = nowPressed
for key in G13SpecialKeys: for key in G13SpecialKeys:
wasPressed = self._lastKeyState.get(key, False) wasPressed = self._lastKeyState.get(key, False)
nowPressed = key.testReport(report) nowPressed = key.testReport(report)
# Emit special keypress if and only if it was released # Emit special keypress if and only if it was released
if wasPressed and not nowPressed: if wasPressed and not nowPressed:
# check for MR, allow for key record this way
pass pass
self._lastKeyState[key] = nowPressed self._lastKeyState[key] = nowPressed
def shutdown(self): def shutdown(self):
self._state = Manager.State.SHUTDOWN self._state = DeviceManager.State.SHUTDOWN

View File

@ -1,8 +1,8 @@
from g13gui.model.prefs import Preferences from g13gui.model.prefs import Preferences
from g13gui.g13.manager import Manager from g13gui.g13.manager import DeviceManager
if __name__ == '__main__': if __name__ == '__main__':
prefs = Preferences() prefs = Preferences()
manager = Manager(prefs) manager = DeviceManager(prefs)
manager.run() manager.run()

View File

@ -6,20 +6,20 @@ import usb.util
from g13gui.observer.observer import ObserverTestCase from g13gui.observer.observer import ObserverTestCase
from g13gui.model.prefs import Preferences from g13gui.model.prefs import Preferences
from g13gui.g13.manager import Manager from g13gui.g13.manager import DeviceManager
from g13gui.g13.manager import LCD_BUFFER_SIZE from g13gui.g13.manager import LCD_BUFFER_SIZE
class G13ManagerTests(ObserverTestCase): class DeviceManagerTests(ObserverTestCase):
def setUp(self): def setUp(self):
prefs = Preferences() prefs = Preferences()
self.m = Manager(prefs) self.m = DeviceManager(prefs)
self.m.start() self.m.start()
while self.m.state != Manager.State.FOUND: while self.m.state != DeviceManager.State.FOUND:
time.sleep(1) time.sleep(1)
self.assertEqual(self.m.state, Manager.State.FOUND) self.assertEqual(self.m.state, DeviceManager.State.FOUND)
def tearDown(self): def tearDown(self):
self.m.shutdown() self.m.shutdown()