summaryrefslogtreecommitdiffstats
path: root/gatt/gatt_linux.py
diff options
context:
space:
mode:
authorP. J. McDermott <pj@pehjota.net>2023-10-01 10:52:07 (EDT)
committer P. J. McDermott <pj@pehjota.net>2023-10-01 10:52:07 (EDT)
commit1369c3fcb52c7e6500bf52ecfddf113747f05047 (patch)
tree2862ad27d6028c1ebcc47b05396c956157059960 /gatt/gatt_linux.py
downloadgatt-python-1369c3fcb52c7e6500bf52ecfddf113747f05047.zip
gatt-python-1369c3fcb52c7e6500bf52ecfddf113747f05047.tar.gz
gatt-python-1369c3fcb52c7e6500bf52ecfddf113747f05047.tar.bz2
New upstream version 0.2.7upstream/0.2.7upstream/latest
Diffstat (limited to 'gatt/gatt_linux.py')
-rw-r--r--gatt/gatt_linux.py642
1 files changed, 642 insertions, 0 deletions
diff --git a/gatt/gatt_linux.py b/gatt/gatt_linux.py
new file mode 100644
index 0000000..dcf0b00
--- /dev/null
+++ b/gatt/gatt_linux.py
@@ -0,0 +1,642 @@
+try:
+ import dbus
+ import dbus.mainloop.glib
+except ImportError:
+ import sys
+ print("Module 'dbus' not found")
+ print("Please run: sudo apt-get install python3-dbus")
+ print("See also: https://github.com/getsenic/gatt-python#installing-gatt-sdk-for-python")
+ sys.exit(1)
+
+import re
+
+from gi.repository import GObject
+
+from . import errors
+
+
+dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+dbus.mainloop.glib.threads_init()
+
+
+class DeviceManager:
+ """
+ Entry point for managing BLE GATT devices.
+
+ This class is intended to be subclassed to manage a specific set of GATT devices.
+ """
+
+ def __init__(self, adapter_name):
+ self.listener = None
+ self.adapter_name = adapter_name
+
+ self._bus = dbus.SystemBus()
+ try:
+ adapter_object = self._bus.get_object('org.bluez', '/org/bluez/' + adapter_name)
+ except dbus.exceptions.DBusException as e:
+ raise _error_from_dbus_error(e)
+ object_manager_object = self._bus.get_object("org.bluez", "/")
+ self._adapter = dbus.Interface(adapter_object, 'org.bluez.Adapter1')
+ self._adapter_properties = dbus.Interface(self._adapter, 'org.freedesktop.DBus.Properties')
+ self._object_manager = dbus.Interface(object_manager_object, "org.freedesktop.DBus.ObjectManager")
+ self._device_path_regex = re.compile('^/org/bluez/' + adapter_name + '/dev((_[A-Z0-9]{2}){6})$')
+ self._devices = {}
+ self._discovered_devices = {}
+ self._interface_added_signal = None
+ self._properties_changed_signal = None
+ self._main_loop = None
+
+ self.update_devices()
+
+ @property
+ def is_adapter_powered(self):
+ return self._adapter_properties.Get('org.bluez.Adapter1', 'Powered') == 1
+
+ @is_adapter_powered.setter
+ def is_adapter_powered(self, powered):
+ return self._adapter_properties.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(powered))
+
+ def run(self):
+ """
+ Starts the main loop that is necessary to receive Bluetooth events from the Bluetooth adapter.
+
+ This call blocks until you call `stop()` to stop the main loop.
+ """
+
+ if self._main_loop:
+ return
+
+ self._interface_added_signal = self._bus.add_signal_receiver(
+ self._interfaces_added,
+ dbus_interface='org.freedesktop.DBus.ObjectManager',
+ signal_name='InterfacesAdded')
+
+ # TODO: Also listen to 'interfaces removed' events?
+
+ self._properties_changed_signal = self._bus.add_signal_receiver(
+ self._properties_changed,
+ dbus_interface=dbus.PROPERTIES_IFACE,
+ signal_name='PropertiesChanged',
+ arg0='org.bluez.Device1',
+ path_keyword='path')
+
+ def disconnect_signals():
+ for device in self._devices.values():
+ device.invalidate()
+ self._properties_changed_signal.remove()
+ self._interface_added_signal.remove()
+
+ self._main_loop = GObject.MainLoop()
+ try:
+ self._main_loop.run()
+ disconnect_signals()
+ except Exception:
+ disconnect_signals()
+ raise
+
+ def stop(self):
+ """
+ Stops the main loop started with `start()`
+ """
+ if self._main_loop:
+ self._main_loop.quit()
+ self._main_loop = None
+
+ def _manage_device(self, device):
+ existing_device = self._devices.get(device.mac_address)
+ if existing_device is not None:
+ existing_device.invalidate()
+ self._devices[device.mac_address] = device
+
+ def update_devices(self):
+ managed_objects = self._object_manager.GetManagedObjects().items()
+ possible_mac_addresses = [self._mac_address(path) for path, _ in managed_objects]
+ mac_addresses = [m for m in possible_mac_addresses if m is not None]
+ new_mac_addresses = [m for m in mac_addresses if m not in self._devices]
+ for mac_address in new_mac_addresses:
+ self.make_device(mac_address)
+ # TODO: Remove devices from `_devices` that are no longer managed, i.e. deleted
+
+ def devices(self):
+ """
+ Returns all known Bluetooth devices.
+ """
+ self.update_devices()
+ return self._devices.values()
+
+ def start_discovery(self, service_uuids=[]):
+ """Starts a discovery for BLE devices with given service UUIDs.
+
+ :param service_uuids: Filters the search to only return devices with given UUIDs.
+ """
+
+ discovery_filter = {'Transport': 'le'}
+ if service_uuids: # D-Bus doesn't like empty lists, it needs to guess the type
+ discovery_filter['UUIDs'] = service_uuids
+
+ try:
+ self._adapter.SetDiscoveryFilter(discovery_filter)
+ self._adapter.StartDiscovery()
+ except dbus.exceptions.DBusException as e:
+ if e.get_dbus_name() == 'org.bluez.Error.NotReady':
+ raise errors.NotReady(
+ "Bluetooth adapter not ready. "
+ "Set `is_adapter_powered` to `True` or run 'echo \"power on\" | sudo bluetoothctl'.")
+ if e.get_dbus_name() == 'org.bluez.Error.InProgress':
+ # Discovery was already started - ignore exception
+ pass
+ else:
+ raise _error_from_dbus_error(e)
+
+ def stop_discovery(self):
+ """
+ Stops the discovery started with `start_discovery`
+ """
+ try:
+ self._adapter.StopDiscovery()
+ except dbus.exceptions.DBusException as e:
+ if (e.get_dbus_name() == 'org.bluez.Error.Failed') and (e.get_dbus_message() == 'No discovery started'):
+ pass
+ else:
+ raise _error_from_dbus_error(e)
+
+ def _interfaces_added(self, path, interfaces):
+ self._device_discovered(path, interfaces)
+
+ def _properties_changed(self, interface, changed, invalidated, path):
+ # TODO: Handle `changed` and `invalidated` properties and update device
+ self._device_discovered(path, [interface])
+
+ def _device_discovered(self, path, interfaces):
+ if 'org.bluez.Device1' not in interfaces:
+ return
+ mac_address = self._mac_address(path)
+ if not mac_address:
+ return
+ device = self._devices.get(mac_address) or self.make_device(mac_address)
+ if device is not None:
+ self.device_discovered(device)
+
+ def device_discovered(self, device):
+ device.advertised()
+
+ def _mac_address(self, device_path):
+ match = self._device_path_regex.match(device_path)
+ if not match:
+ return None
+ return match.group(1)[1:].replace('_', ':').lower()
+
+ def make_device(self, mac_address):
+ """
+ Makes and returns a `Device` instance with specified MAC address.
+
+ Override this method to return a specific subclass instance of `Device`.
+ Return `None` if the specified device shall not be supported by this class.
+ """
+ return Device(mac_address=mac_address, manager=self)
+
+ def add_device(self, mac_address):
+ """
+ Adds a device with given MAC address without discovery.
+ """
+ # TODO: Implement
+ pass
+
+ def remove_device(self, mac_address):
+ """
+ Removes a device with the given MAC address
+ """
+ # TODO: Implement
+ pass
+
+ def remove_all_devices(self, skip_alias=None):
+ self.update_devices()
+
+ keys_to_be_deleted = []
+ for key, device in self._devices.items():
+ if skip_alias and device.alias() == skip_alias:
+ continue
+ mac_address = device.mac_address.replace(':', '_').upper()
+ path = '/org/bluez/%s/dev_%s' % (self.adapter_name, mac_address)
+ self._adapter.RemoveDevice(path)
+ keys_to_be_deleted.append(key)
+
+ for key in keys_to_be_deleted:
+ del self._devices[key]
+
+ self.update_devices()
+
+
+
+class Device:
+ def __init__(self, mac_address, manager, managed=True):
+ """
+ Represents a BLE GATT device.
+
+ This class is intended to be sublcassed with a device-specific implementations
+ that reflect the device's GATT profile.
+
+ :param mac_address: MAC address of this device
+ :manager: `DeviceManager` that shall manage this device
+ :managed: If False, the created device will not be managed by the device manager
+ Particularly of interest for sub classes of `DeviceManager` who want
+ to decide on certain device properties if they then create a subclass
+ instance of that `Device` or not.
+ """
+
+ self.mac_address = mac_address
+ self.manager = manager
+ self.services = []
+
+ self._bus = manager._bus
+ self._object_manager = manager._object_manager
+
+ # TODO: Device needs to be created if it's not yet known to bluetoothd, see "test-device" in bluez-5.43/test/
+ self._device_path = '/org/bluez/%s/dev_%s' % (manager.adapter_name, mac_address.replace(':', '_').upper())
+ device_object = self._bus.get_object('org.bluez', self._device_path)
+ self._object = dbus.Interface(device_object, 'org.bluez.Device1')
+ self._properties = dbus.Interface(self._object, 'org.freedesktop.DBus.Properties')
+ self._properties_signal = None
+ self._connect_retry_attempt = None
+
+ if managed:
+ manager._manage_device(self)
+
+ def advertised(self):
+ """
+ Called when an advertisement package has been received from the device. Requires device discovery to run.
+ """
+ pass
+
+ def is_registered(self):
+ # TODO: Implement, see __init__
+ return False
+
+ def register(self):
+ # TODO: Implement, see __init__
+ return
+
+ def invalidate(self):
+ self._disconnect_signals()
+
+ def connect(self):
+ """
+ Connects to the device. Blocks until the connection was successful.
+ """
+ self._connect_retry_attempt = 0
+ self._connect_signals()
+ self._connect()
+
+ def _connect(self):
+ self._connect_retry_attempt += 1
+ try:
+ self._object.Connect()
+ if not self.services and self.is_services_resolved():
+ self.services_resolved()
+
+ except dbus.exceptions.DBusException as e:
+ if (e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownObject'):
+ self.connect_failed(errors.Failed("Device does not exist, check adapter name and MAC address."))
+ elif ((e.get_dbus_name() == 'org.bluez.Error.Failed') and
+ (e.get_dbus_message() == "Operation already in progress")):
+ pass
+ elif ((self._connect_retry_attempt < 5) and
+ (e.get_dbus_name() == 'org.bluez.Error.Failed') and
+ (e.get_dbus_message() == "Software caused connection abort")):
+ self._connect()
+ elif (e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply'):
+ # TODO: How to handle properly?
+ # Reproducable when we repeatedly shut off Nuimo immediately after its flashing Bluetooth icon appears
+ self.connect_failed(_error_from_dbus_error(e))
+ else:
+ self.connect_failed(_error_from_dbus_error(e))
+
+ def _connect_signals(self):
+ if self._properties_signal is None:
+ self._properties_signal = self._properties.connect_to_signal('PropertiesChanged', self.properties_changed)
+ self._connect_service_signals()
+
+ def _connect_service_signals(self):
+ for service in self.services:
+ service._connect_signals()
+
+ def connect_succeeded(self):
+ """
+ Will be called when `connect()` has finished connecting to the device.
+ Will not be called if the device was already connected.
+ """
+ pass
+
+ def connect_failed(self, error):
+ """
+ Called when the connection could not be established.
+ """
+ self._disconnect_signals()
+
+ def disconnect(self):
+ """
+ Disconnects from the device, if connected.
+ """
+ self._object.Disconnect()
+
+ def disconnect_succeeded(self):
+ """
+ Will be called when the device has disconnected.
+ """
+ self._disconnect_signals()
+ self.services = []
+
+ def _disconnect_signals(self):
+ if self._properties_signal is not None:
+ self._properties_signal.remove()
+ self._properties_signal = None
+ self._disconnect_service_signals()
+
+ def _disconnect_service_signals(self):
+ for service in self.services:
+ service._disconnect_signals()
+
+ def is_connected(self):
+ """
+ Returns `True` if the device is connected, otherwise `False`.
+ """
+ return self._properties.Get('org.bluez.Device1', 'Connected') == 1
+
+ def is_services_resolved(self):
+ """
+ Returns `True` is services are discovered, otherwise `False`.
+ """
+ return self._properties.Get('org.bluez.Device1', 'ServicesResolved') == 1
+
+ def alias(self):
+ """
+ Returns the device's alias (name).
+ """
+ try:
+ return self._properties.Get('org.bluez.Device1', 'Alias')
+ except dbus.exceptions.DBusException as e:
+ if e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownObject':
+ # BlueZ sometimes doesn't provide an alias, we then simply return `None`.
+ # Might occur when device was deleted as the following issue points out:
+ # https://github.com/blueman-project/blueman/issues/460
+ return None
+ else:
+ raise _error_from_dbus_error(e)
+
+ def properties_changed(self, sender, changed_properties, invalidated_properties):
+ """
+ Called when a device property has changed or got invalidated.
+ """
+ if 'Connected' in changed_properties:
+ if changed_properties['Connected']:
+ self.connect_succeeded()
+ else:
+ self.disconnect_succeeded()
+
+ if ('ServicesResolved' in changed_properties and changed_properties['ServicesResolved'] == 1 and
+ not self.services):
+ self.services_resolved()
+
+ def services_resolved(self):
+ """
+ Called when all device's services and characteristics got resolved.
+ """
+ self._disconnect_service_signals()
+
+ services_regex = re.compile(self._device_path + '/service[0-9abcdef]{4}$')
+ managed_services = [
+ service for service in self._object_manager.GetManagedObjects().items()
+ if services_regex.match(service[0])]
+ self.services = [Service(
+ device=self,
+ path=service[0],
+ uuid=service[1]['org.bluez.GattService1']['UUID']) for service in managed_services]
+
+ self._connect_service_signals()
+
+ def characteristic_value_updated(self, characteristic, value):
+ """
+ Called when a characteristic value has changed.
+ """
+ # To be implemented by subclass
+ pass
+
+ def characteristic_read_value_failed(self, characteristic, error):
+ """
+ Called when a characteristic value read command failed.
+ """
+ # To be implemented by subclass
+ pass
+
+ def characteristic_write_value_succeeded(self, characteristic):
+ """
+ Called when a characteristic value write command succeeded.
+ """
+ # To be implemented by subclass
+ pass
+
+ def characteristic_write_value_failed(self, characteristic, error):
+ """
+ Called when a characteristic value write command failed.
+ """
+ # To be implemented by subclass
+ pass
+
+ def characteristic_enable_notifications_succeeded(self, characteristic):
+ """
+ Called when a characteristic notifications enable command succeeded.
+ """
+ # To be implemented by subclass
+ pass
+
+ def characteristic_enable_notifications_failed(self, characteristic, error):
+ """
+ Called when a characteristic notifications enable command failed.
+ """
+ # To be implemented by subclass
+ pass
+
+
+class Service:
+ """
+ Represents a GATT service.
+ """
+
+ def __init__(self, device, path, uuid):
+ # TODO: Don'T requore `path` argument, it can be calculated from device's path and uuid
+ self.device = device
+ self.uuid = uuid
+ self._path = path
+ self._bus = device._bus
+ self._object_manager = device._object_manager
+ self._object = self._bus.get_object('org.bluez', self._path)
+ self.characteristics = []
+ self.characteristics_resolved()
+
+ def _connect_signals(self):
+ self._connect_characteristic_signals()
+
+ def _connect_characteristic_signals(self):
+ for characteristic in self.characteristics:
+ characteristic._connect_signals()
+
+ def _disconnect_signals(self):
+ self._disconnect_characteristic_signals()
+
+ def _disconnect_characteristic_signals(self):
+ for characteristic in self.characteristics:
+ characteristic._disconnect_signals()
+
+ def characteristics_resolved(self):
+ """
+ Called when all service's characteristics got resolved.
+ """
+ self._disconnect_characteristic_signals()
+
+ characteristics_regex = re.compile(self._path + '/char[0-9abcdef]{4}$')
+ managed_characteristics = [
+ char for char in self._object_manager.GetManagedObjects().items()
+ if characteristics_regex.match(char[0])]
+ self.characteristics = [Characteristic(
+ service=self,
+ path=c[0],
+ uuid=c[1]['org.bluez.GattCharacteristic1']['UUID']) for c in managed_characteristics]
+
+ self._connect_characteristic_signals()
+
+
+class Characteristic:
+ """
+ Represents a GATT characteristic.
+ """
+
+ def __init__(self, service, path, uuid):
+ # TODO: Don't require `path` parameter, it can be calculated from service's path and uuid
+ self.service = service
+ self.uuid = uuid
+ self._path = path
+ self._bus = service._bus
+ self._object_manager = service._object_manager
+ self._object = self._bus.get_object('org.bluez', self._path)
+ self._properties = dbus.Interface(self._object, "org.freedesktop.DBus.Properties")
+ self._properties_signal = None
+
+ def _connect_signals(self):
+ if self._properties_signal is None:
+ self._properties_signal = self._properties.connect_to_signal('PropertiesChanged', self.properties_changed)
+
+ def _disconnect_signals(self):
+ if self._properties_signal is not None:
+ self._properties_signal.remove()
+ self._properties_signal = None
+
+ def properties_changed(self, properties, changed_properties, invalidated_properties):
+ value = changed_properties.get('Value')
+ """
+ Called when a Characteristic property has changed.
+ """
+ if value is not None:
+ self.service.device.characteristic_value_updated(characteristic=self, value=bytes(value))
+
+ def read_value(self, offset=0):
+ """
+ Reads the value of this characteristic.
+
+ When successful, `characteristic_value_updated()` of the related device will be called,
+ otherwise `characteristic_read_value_failed()` is invoked.
+ """
+ try:
+ return self._object.ReadValue(
+ {'offset': dbus.UInt16(offset, variant_level=1)},
+ dbus_interface='org.bluez.GattCharacteristic1')
+ except dbus.exceptions.DBusException as e:
+ error = _error_from_dbus_error(e)
+ self.service.device.characteristic_read_value_failed(self, error=error)
+
+ def write_value(self, value, offset=0):
+ """
+ Attempts to write a value to the characteristic.
+
+ Success or failure will be notified by calls to `write_value_succeeded` or `write_value_failed` respectively.
+
+ :param value: array of bytes to be written
+ :param offset: offset from where to start writing the bytes (defaults to 0)
+ """
+ bytes = [dbus.Byte(b) for b in value]
+
+ try:
+ self._object.WriteValue(
+ bytes,
+ {'offset': dbus.UInt16(offset, variant_level=1)},
+ reply_handler=self._write_value_succeeded,
+ error_handler=self._write_value_failed,
+ dbus_interface='org.bluez.GattCharacteristic1')
+ except dbus.exceptions.DBusException as e:
+ self._write_value_failed(self, error=e)
+
+ def _write_value_succeeded(self):
+ """
+ Called when the write request has succeeded.
+ """
+ self.service.device.characteristic_write_value_succeeded(characteristic=self)
+
+ def _write_value_failed(self, dbus_error):
+ """
+ Called when the write request has failed.
+ """
+ error = _error_from_dbus_error(dbus_error)
+ self.service.device.characteristic_write_value_failed(characteristic=self, error=error)
+
+ def enable_notifications(self, enabled=True):
+ """
+ Enables or disables value change notifications.
+
+ Success or failure will be notified by calls to `characteristic_enable_notifications_succeeded`
+ or `enable_notifications_failed` respectively.
+
+ Each time when the device notifies a new value, `characteristic_value_updated()` of the related
+ device will be called.
+ """
+ try:
+ if enabled:
+ self._object.StartNotify(
+ reply_handler=self._enable_notifications_succeeded,
+ error_handler=self._enable_notifications_failed,
+ dbus_interface='org.bluez.GattCharacteristic1')
+ else:
+ self._object.StopNotify(
+ reply_handler=self._enable_notifications_succeeded,
+ error_handler=self._enable_notifications_failed,
+ dbus_interface='org.bluez.GattCharacteristic1')
+ except dbus.exceptions.DBusException as e:
+ self._enable_notifications_failed(error=e)
+
+ def _enable_notifications_succeeded(self):
+ """
+ Called when notification enabling has succeeded.
+ """
+ self.service.device.characteristic_enable_notifications_succeeded(characteristic=self)
+
+ def _enable_notifications_failed(self, dbus_error):
+ """
+ Called when notification enabling has failed.
+ """
+ if ((dbus_error.get_dbus_name() == 'org.bluez.Error.Failed') and
+ ((dbus_error.get_dbus_message() == "Already notifying") or
+ (dbus_error.get_dbus_message() == "No notify session started"))):
+ # Ignore cases where notifications where already enabled or already disabled
+ return
+ error = _error_from_dbus_error(dbus_error)
+ self.service.device.characteristic_enable_notifications_failed(characteristic=self, error=error)
+
+
+def _error_from_dbus_error(e):
+ return {
+ 'org.bluez.Error.Failed': errors.Failed(e.get_dbus_message()),
+ 'org.bluez.Error.InProgress': errors.InProgress(e.get_dbus_message()),
+ 'org.bluez.Error.InvalidValueLength': errors.InvalidValueLength(e.get_dbus_message()),
+ 'org.bluez.Error.NotAuthorized': errors.NotAuthorized(e.get_dbus_message()),
+ 'org.bluez.Error.NotPermitted': errors.NotPermitted(e.get_dbus_message()),
+ 'org.bluez.Error.NotSupported': errors.NotSupported(e.get_dbus_message()),
+ 'org.freedesktop.DBus.Error.AccessDenied': errors.AccessDenied("Root permissions required")
+ }.get(e.get_dbus_name(), errors.Failed(e.get_dbus_message()))