diff options
author | P. J. McDermott <pj@pehjota.net> | 2023-10-01 10:52:07 (EDT) |
---|---|---|
committer | P. J. McDermott <pj@pehjota.net> | 2023-10-01 10:52:07 (EDT) |
commit | 1369c3fcb52c7e6500bf52ecfddf113747f05047 (patch) | |
tree | 2862ad27d6028c1ebcc47b05396c956157059960 /gatt | |
download | gatt-python-upstream/0.2.7.zip gatt-python-upstream/0.2.7.tar.gz gatt-python-upstream/0.2.7.tar.bz2 |
New upstream version 0.2.7upstream/0.2.7upstream/latest
Diffstat (limited to 'gatt')
-rw-r--r-- | gatt/__init__.py | 1 | ||||
-rw-r--r-- | gatt/errors.py | 30 | ||||
-rw-r--r-- | gatt/gatt.py | 11 | ||||
-rw-r--r-- | gatt/gatt_linux.py | 642 | ||||
-rw-r--r-- | gatt/gatt_stubs.py | 14 |
5 files changed, 698 insertions, 0 deletions
diff --git a/gatt/__init__.py b/gatt/__init__.py new file mode 100644 index 0000000..988d547 --- /dev/null +++ b/gatt/__init__.py @@ -0,0 +1 @@ +from .gatt import DeviceManager, Device, Service, Characteristic diff --git a/gatt/errors.py b/gatt/errors.py new file mode 100644 index 0000000..477bb88 --- /dev/null +++ b/gatt/errors.py @@ -0,0 +1,30 @@ +class AccessDenied(Exception): + pass + + +class Failed(Exception): + pass + + +class InProgress(Exception): + pass + + +class InvalidValueLength(Exception): + pass + + +class NotAuthorized(Exception): + pass + + +class NotReady(Exception): + pass + + +class NotPermitted(Exception): + pass + + +class NotSupported(Exception): + pass diff --git a/gatt/gatt.py b/gatt/gatt.py new file mode 100644 index 0000000..d0bdd8e --- /dev/null +++ b/gatt/gatt.py @@ -0,0 +1,11 @@ +import os +import platform + +if platform.system() == 'Linux': + if os.environ.get('LINUX_WITHOUT_DBUS', '0') == '0': + from .gatt_linux import * + else: + from .gatt_stubs import * +else: + # TODO: Add support for more platforms + from .gatt_stubs import * diff --git a/gatt/gatt_linux.py b/gatt/gatt_linux.py new file mode 100644 index 0000000..dcf0b00 --- /dev/null +++ b/gatt/gatt_linux.py @@ -0,0 +1,642 @@ +try: + import dbus + import dbus.mainloop.glib +except ImportError: + import sys + print("Module 'dbus' not found") + print("Please run: sudo apt-get install python3-dbus") + print("See also: https://github.com/getsenic/gatt-python#installing-gatt-sdk-for-python") + sys.exit(1) + +import re + +from gi.repository import GObject + +from . import errors + + +dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) +dbus.mainloop.glib.threads_init() + + +class DeviceManager: + """ + Entry point for managing BLE GATT devices. + + This class is intended to be subclassed to manage a specific set of GATT devices. + """ + + def __init__(self, adapter_name): + self.listener = None + self.adapter_name = adapter_name + + self._bus = dbus.SystemBus() + try: + adapter_object = self._bus.get_object('org.bluez', '/org/bluez/' + adapter_name) + except dbus.exceptions.DBusException as e: + raise _error_from_dbus_error(e) + object_manager_object = self._bus.get_object("org.bluez", "/") + self._adapter = dbus.Interface(adapter_object, 'org.bluez.Adapter1') + self._adapter_properties = dbus.Interface(self._adapter, 'org.freedesktop.DBus.Properties') + self._object_manager = dbus.Interface(object_manager_object, "org.freedesktop.DBus.ObjectManager") + self._device_path_regex = re.compile('^/org/bluez/' + adapter_name + '/dev((_[A-Z0-9]{2}){6})$') + self._devices = {} + self._discovered_devices = {} + self._interface_added_signal = None + self._properties_changed_signal = None + self._main_loop = None + + self.update_devices() + + @property + def is_adapter_powered(self): + return self._adapter_properties.Get('org.bluez.Adapter1', 'Powered') == 1 + + @is_adapter_powered.setter + def is_adapter_powered(self, powered): + return self._adapter_properties.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(powered)) + + def run(self): + """ + Starts the main loop that is necessary to receive Bluetooth events from the Bluetooth adapter. + + This call blocks until you call `stop()` to stop the main loop. + """ + + if self._main_loop: + return + + self._interface_added_signal = self._bus.add_signal_receiver( + self._interfaces_added, + dbus_interface='org.freedesktop.DBus.ObjectManager', + signal_name='InterfacesAdded') + + # TODO: Also listen to 'interfaces removed' events? + + self._properties_changed_signal = self._bus.add_signal_receiver( + self._properties_changed, + dbus_interface=dbus.PROPERTIES_IFACE, + signal_name='PropertiesChanged', + arg0='org.bluez.Device1', + path_keyword='path') + + def disconnect_signals(): + for device in self._devices.values(): + device.invalidate() + self._properties_changed_signal.remove() + self._interface_added_signal.remove() + + self._main_loop = GObject.MainLoop() + try: + self._main_loop.run() + disconnect_signals() + except Exception: + disconnect_signals() + raise + + def stop(self): + """ + Stops the main loop started with `start()` + """ + if self._main_loop: + self._main_loop.quit() + self._main_loop = None + + def _manage_device(self, device): + existing_device = self._devices.get(device.mac_address) + if existing_device is not None: + existing_device.invalidate() + self._devices[device.mac_address] = device + + def update_devices(self): + managed_objects = self._object_manager.GetManagedObjects().items() + possible_mac_addresses = [self._mac_address(path) for path, _ in managed_objects] + mac_addresses = [m for m in possible_mac_addresses if m is not None] + new_mac_addresses = [m for m in mac_addresses if m not in self._devices] + for mac_address in new_mac_addresses: + self.make_device(mac_address) + # TODO: Remove devices from `_devices` that are no longer managed, i.e. deleted + + def devices(self): + """ + Returns all known Bluetooth devices. + """ + self.update_devices() + return self._devices.values() + + def start_discovery(self, service_uuids=[]): + """Starts a discovery for BLE devices with given service UUIDs. + + :param service_uuids: Filters the search to only return devices with given UUIDs. + """ + + discovery_filter = {'Transport': 'le'} + if service_uuids: # D-Bus doesn't like empty lists, it needs to guess the type + discovery_filter['UUIDs'] = service_uuids + + try: + self._adapter.SetDiscoveryFilter(discovery_filter) + self._adapter.StartDiscovery() + except dbus.exceptions.DBusException as e: + if e.get_dbus_name() == 'org.bluez.Error.NotReady': + raise errors.NotReady( + "Bluetooth adapter not ready. " + "Set `is_adapter_powered` to `True` or run 'echo \"power on\" | sudo bluetoothctl'.") + if e.get_dbus_name() == 'org.bluez.Error.InProgress': + # Discovery was already started - ignore exception + pass + else: + raise _error_from_dbus_error(e) + + def stop_discovery(self): + """ + Stops the discovery started with `start_discovery` + """ + try: + self._adapter.StopDiscovery() + except dbus.exceptions.DBusException as e: + if (e.get_dbus_name() == 'org.bluez.Error.Failed') and (e.get_dbus_message() == 'No discovery started'): + pass + else: + raise _error_from_dbus_error(e) + + def _interfaces_added(self, path, interfaces): + self._device_discovered(path, interfaces) + + def _properties_changed(self, interface, changed, invalidated, path): + # TODO: Handle `changed` and `invalidated` properties and update device + self._device_discovered(path, [interface]) + + def _device_discovered(self, path, interfaces): + if 'org.bluez.Device1' not in interfaces: + return + mac_address = self._mac_address(path) + if not mac_address: + return + device = self._devices.get(mac_address) or self.make_device(mac_address) + if device is not None: + self.device_discovered(device) + + def device_discovered(self, device): + device.advertised() + + def _mac_address(self, device_path): + match = self._device_path_regex.match(device_path) + if not match: + return None + return match.group(1)[1:].replace('_', ':').lower() + + def make_device(self, mac_address): + """ + Makes and returns a `Device` instance with specified MAC address. + + Override this method to return a specific subclass instance of `Device`. + Return `None` if the specified device shall not be supported by this class. + """ + return Device(mac_address=mac_address, manager=self) + + def add_device(self, mac_address): + """ + Adds a device with given MAC address without discovery. + """ + # TODO: Implement + pass + + def remove_device(self, mac_address): + """ + Removes a device with the given MAC address + """ + # TODO: Implement + pass + + def remove_all_devices(self, skip_alias=None): + self.update_devices() + + keys_to_be_deleted = [] + for key, device in self._devices.items(): + if skip_alias and device.alias() == skip_alias: + continue + mac_address = device.mac_address.replace(':', '_').upper() + path = '/org/bluez/%s/dev_%s' % (self.adapter_name, mac_address) + self._adapter.RemoveDevice(path) + keys_to_be_deleted.append(key) + + for key in keys_to_be_deleted: + del self._devices[key] + + self.update_devices() + + + +class Device: + def __init__(self, mac_address, manager, managed=True): + """ + Represents a BLE GATT device. + + This class is intended to be sublcassed with a device-specific implementations + that reflect the device's GATT profile. + + :param mac_address: MAC address of this device + :manager: `DeviceManager` that shall manage this device + :managed: If False, the created device will not be managed by the device manager + Particularly of interest for sub classes of `DeviceManager` who want + to decide on certain device properties if they then create a subclass + instance of that `Device` or not. + """ + + self.mac_address = mac_address + self.manager = manager + self.services = [] + + self._bus = manager._bus + self._object_manager = manager._object_manager + + # TODO: Device needs to be created if it's not yet known to bluetoothd, see "test-device" in bluez-5.43/test/ + self._device_path = '/org/bluez/%s/dev_%s' % (manager.adapter_name, mac_address.replace(':', '_').upper()) + device_object = self._bus.get_object('org.bluez', self._device_path) + self._object = dbus.Interface(device_object, 'org.bluez.Device1') + self._properties = dbus.Interface(self._object, 'org.freedesktop.DBus.Properties') + self._properties_signal = None + self._connect_retry_attempt = None + + if managed: + manager._manage_device(self) + + def advertised(self): + """ + Called when an advertisement package has been received from the device. Requires device discovery to run. + """ + pass + + def is_registered(self): + # TODO: Implement, see __init__ + return False + + def register(self): + # TODO: Implement, see __init__ + return + + def invalidate(self): + self._disconnect_signals() + + def connect(self): + """ + Connects to the device. Blocks until the connection was successful. + """ + self._connect_retry_attempt = 0 + self._connect_signals() + self._connect() + + def _connect(self): + self._connect_retry_attempt += 1 + try: + self._object.Connect() + if not self.services and self.is_services_resolved(): + self.services_resolved() + + except dbus.exceptions.DBusException as e: + if (e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownObject'): + self.connect_failed(errors.Failed("Device does not exist, check adapter name and MAC address.")) + elif ((e.get_dbus_name() == 'org.bluez.Error.Failed') and + (e.get_dbus_message() == "Operation already in progress")): + pass + elif ((self._connect_retry_attempt < 5) and + (e.get_dbus_name() == 'org.bluez.Error.Failed') and + (e.get_dbus_message() == "Software caused connection abort")): + self._connect() + elif (e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply'): + # TODO: How to handle properly? + # Reproducable when we repeatedly shut off Nuimo immediately after its flashing Bluetooth icon appears + self.connect_failed(_error_from_dbus_error(e)) + else: + self.connect_failed(_error_from_dbus_error(e)) + + def _connect_signals(self): + if self._properties_signal is None: + self._properties_signal = self._properties.connect_to_signal('PropertiesChanged', self.properties_changed) + self._connect_service_signals() + + def _connect_service_signals(self): + for service in self.services: + service._connect_signals() + + def connect_succeeded(self): + """ + Will be called when `connect()` has finished connecting to the device. + Will not be called if the device was already connected. + """ + pass + + def connect_failed(self, error): + """ + Called when the connection could not be established. + """ + self._disconnect_signals() + + def disconnect(self): + """ + Disconnects from the device, if connected. + """ + self._object.Disconnect() + + def disconnect_succeeded(self): + """ + Will be called when the device has disconnected. + """ + self._disconnect_signals() + self.services = [] + + def _disconnect_signals(self): + if self._properties_signal is not None: + self._properties_signal.remove() + self._properties_signal = None + self._disconnect_service_signals() + + def _disconnect_service_signals(self): + for service in self.services: + service._disconnect_signals() + + def is_connected(self): + """ + Returns `True` if the device is connected, otherwise `False`. + """ + return self._properties.Get('org.bluez.Device1', 'Connected') == 1 + + def is_services_resolved(self): + """ + Returns `True` is services are discovered, otherwise `False`. + """ + return self._properties.Get('org.bluez.Device1', 'ServicesResolved') == 1 + + def alias(self): + """ + Returns the device's alias (name). + """ + try: + return self._properties.Get('org.bluez.Device1', 'Alias') + except dbus.exceptions.DBusException as e: + if e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownObject': + # BlueZ sometimes doesn't provide an alias, we then simply return `None`. + # Might occur when device was deleted as the following issue points out: + # https://github.com/blueman-project/blueman/issues/460 + return None + else: + raise _error_from_dbus_error(e) + + def properties_changed(self, sender, changed_properties, invalidated_properties): + """ + Called when a device property has changed or got invalidated. + """ + if 'Connected' in changed_properties: + if changed_properties['Connected']: + self.connect_succeeded() + else: + self.disconnect_succeeded() + + if ('ServicesResolved' in changed_properties and changed_properties['ServicesResolved'] == 1 and + not self.services): + self.services_resolved() + + def services_resolved(self): + """ + Called when all device's services and characteristics got resolved. + """ + self._disconnect_service_signals() + + services_regex = re.compile(self._device_path + '/service[0-9abcdef]{4}$') + managed_services = [ + service for service in self._object_manager.GetManagedObjects().items() + if services_regex.match(service[0])] + self.services = [Service( + device=self, + path=service[0], + uuid=service[1]['org.bluez.GattService1']['UUID']) for service in managed_services] + + self._connect_service_signals() + + def characteristic_value_updated(self, characteristic, value): + """ + Called when a characteristic value has changed. + """ + # To be implemented by subclass + pass + + def characteristic_read_value_failed(self, characteristic, error): + """ + Called when a characteristic value read command failed. + """ + # To be implemented by subclass + pass + + def characteristic_write_value_succeeded(self, characteristic): + """ + Called when a characteristic value write command succeeded. + """ + # To be implemented by subclass + pass + + def characteristic_write_value_failed(self, characteristic, error): + """ + Called when a characteristic value write command failed. + """ + # To be implemented by subclass + pass + + def characteristic_enable_notifications_succeeded(self, characteristic): + """ + Called when a characteristic notifications enable command succeeded. + """ + # To be implemented by subclass + pass + + def characteristic_enable_notifications_failed(self, characteristic, error): + """ + Called when a characteristic notifications enable command failed. + """ + # To be implemented by subclass + pass + + +class Service: + """ + Represents a GATT service. + """ + + def __init__(self, device, path, uuid): + # TODO: Don'T requore `path` argument, it can be calculated from device's path and uuid + self.device = device + self.uuid = uuid + self._path = path + self._bus = device._bus + self._object_manager = device._object_manager + self._object = self._bus.get_object('org.bluez', self._path) + self.characteristics = [] + self.characteristics_resolved() + + def _connect_signals(self): + self._connect_characteristic_signals() + + def _connect_characteristic_signals(self): + for characteristic in self.characteristics: + characteristic._connect_signals() + + def _disconnect_signals(self): + self._disconnect_characteristic_signals() + + def _disconnect_characteristic_signals(self): + for characteristic in self.characteristics: + characteristic._disconnect_signals() + + def characteristics_resolved(self): + """ + Called when all service's characteristics got resolved. + """ + self._disconnect_characteristic_signals() + + characteristics_regex = re.compile(self._path + '/char[0-9abcdef]{4}$') + managed_characteristics = [ + char for char in self._object_manager.GetManagedObjects().items() + if characteristics_regex.match(char[0])] + self.characteristics = [Characteristic( + service=self, + path=c[0], + uuid=c[1]['org.bluez.GattCharacteristic1']['UUID']) for c in managed_characteristics] + + self._connect_characteristic_signals() + + +class Characteristic: + """ + Represents a GATT characteristic. + """ + + def __init__(self, service, path, uuid): + # TODO: Don't require `path` parameter, it can be calculated from service's path and uuid + self.service = service + self.uuid = uuid + self._path = path + self._bus = service._bus + self._object_manager = service._object_manager + self._object = self._bus.get_object('org.bluez', self._path) + self._properties = dbus.Interface(self._object, "org.freedesktop.DBus.Properties") + self._properties_signal = None + + def _connect_signals(self): + if self._properties_signal is None: + self._properties_signal = self._properties.connect_to_signal('PropertiesChanged', self.properties_changed) + + def _disconnect_signals(self): + if self._properties_signal is not None: + self._properties_signal.remove() + self._properties_signal = None + + def properties_changed(self, properties, changed_properties, invalidated_properties): + value = changed_properties.get('Value') + """ + Called when a Characteristic property has changed. + """ + if value is not None: + self.service.device.characteristic_value_updated(characteristic=self, value=bytes(value)) + + def read_value(self, offset=0): + """ + Reads the value of this characteristic. + + When successful, `characteristic_value_updated()` of the related device will be called, + otherwise `characteristic_read_value_failed()` is invoked. + """ + try: + return self._object.ReadValue( + {'offset': dbus.UInt16(offset, variant_level=1)}, + dbus_interface='org.bluez.GattCharacteristic1') + except dbus.exceptions.DBusException as e: + error = _error_from_dbus_error(e) + self.service.device.characteristic_read_value_failed(self, error=error) + + def write_value(self, value, offset=0): + """ + Attempts to write a value to the characteristic. + + Success or failure will be notified by calls to `write_value_succeeded` or `write_value_failed` respectively. + + :param value: array of bytes to be written + :param offset: offset from where to start writing the bytes (defaults to 0) + """ + bytes = [dbus.Byte(b) for b in value] + + try: + self._object.WriteValue( + bytes, + {'offset': dbus.UInt16(offset, variant_level=1)}, + reply_handler=self._write_value_succeeded, + error_handler=self._write_value_failed, + dbus_interface='org.bluez.GattCharacteristic1') + except dbus.exceptions.DBusException as e: + self._write_value_failed(self, error=e) + + def _write_value_succeeded(self): + """ + Called when the write request has succeeded. + """ + self.service.device.characteristic_write_value_succeeded(characteristic=self) + + def _write_value_failed(self, dbus_error): + """ + Called when the write request has failed. + """ + error = _error_from_dbus_error(dbus_error) + self.service.device.characteristic_write_value_failed(characteristic=self, error=error) + + def enable_notifications(self, enabled=True): + """ + Enables or disables value change notifications. + + Success or failure will be notified by calls to `characteristic_enable_notifications_succeeded` + or `enable_notifications_failed` respectively. + + Each time when the device notifies a new value, `characteristic_value_updated()` of the related + device will be called. + """ + try: + if enabled: + self._object.StartNotify( + reply_handler=self._enable_notifications_succeeded, + error_handler=self._enable_notifications_failed, + dbus_interface='org.bluez.GattCharacteristic1') + else: + self._object.StopNotify( + reply_handler=self._enable_notifications_succeeded, + error_handler=self._enable_notifications_failed, + dbus_interface='org.bluez.GattCharacteristic1') + except dbus.exceptions.DBusException as e: + self._enable_notifications_failed(error=e) + + def _enable_notifications_succeeded(self): + """ + Called when notification enabling has succeeded. + """ + self.service.device.characteristic_enable_notifications_succeeded(characteristic=self) + + def _enable_notifications_failed(self, dbus_error): + """ + Called when notification enabling has failed. + """ + if ((dbus_error.get_dbus_name() == 'org.bluez.Error.Failed') and + ((dbus_error.get_dbus_message() == "Already notifying") or + (dbus_error.get_dbus_message() == "No notify session started"))): + # Ignore cases where notifications where already enabled or already disabled + return + error = _error_from_dbus_error(dbus_error) + self.service.device.characteristic_enable_notifications_failed(characteristic=self, error=error) + + +def _error_from_dbus_error(e): + return { + 'org.bluez.Error.Failed': errors.Failed(e.get_dbus_message()), + 'org.bluez.Error.InProgress': errors.InProgress(e.get_dbus_message()), + 'org.bluez.Error.InvalidValueLength': errors.InvalidValueLength(e.get_dbus_message()), + 'org.bluez.Error.NotAuthorized': errors.NotAuthorized(e.get_dbus_message()), + 'org.bluez.Error.NotPermitted': errors.NotPermitted(e.get_dbus_message()), + 'org.bluez.Error.NotSupported': errors.NotSupported(e.get_dbus_message()), + 'org.freedesktop.DBus.Error.AccessDenied': errors.AccessDenied("Root permissions required") + }.get(e.get_dbus_name(), errors.Failed(e.get_dbus_message())) diff --git a/gatt/gatt_stubs.py b/gatt/gatt_stubs.py new file mode 100644 index 0000000..54d67c2 --- /dev/null +++ b/gatt/gatt_stubs.py @@ -0,0 +1,14 @@ +class DeviceManager: + pass + + +class Device: + pass + + +class Service: + pass + + +class Characteristic: + pass |