summaryrefslogtreecommitdiffstats
path: root/src/ble_dfu.py
diff options
context:
space:
mode:
authorP. J. McDermott <pj@pehjota.net>2023-10-01 16:32:49 (EDT)
committer P. J. McDermott <pj@pehjota.net>2023-10-01 16:32:49 (EDT)
commitbab4f35fc5c13341fb9cc96a7cb863c5cd5c3f53 (patch)
tree75d9463a02eb5984e1f33aadba830f490f2b103c /src/ble_dfu.py
downloadsiglo-bab4f35fc5c13341fb9cc96a7cb863c5cd5c3f53.zip
siglo-bab4f35fc5c13341fb9cc96a7cb863c5cd5c3f53.tar.gz
siglo-bab4f35fc5c13341fb9cc96a7cb863c5cd5c3f53.tar.bz2
New upstream version 0.9.9upstream/0.9.9upstream/latest
Diffstat (limited to 'src/ble_dfu.py')
-rw-r--r--src/ble_dfu.py334
1 files changed, 334 insertions, 0 deletions
diff --git a/src/ble_dfu.py b/src/ble_dfu.py
new file mode 100644
index 0000000..83d20d6
--- /dev/null
+++ b/src/ble_dfu.py
@@ -0,0 +1,334 @@
+from array import array
+import gatt
+import os
+from .util import *
+import math
+from struct import unpack
+
+class InfiniTimeDFU(gatt.Device):
+ # Class constants
+ UUID_DFU_SERVICE = "00001530-1212-efde-1523-785feabcd123"
+ UUID_CTRL_POINT = "00001531-1212-efde-1523-785feabcd123"
+ UUID_PACKET = "00001532-1212-efde-1523-785feabcd123"
+ UUID_VERSION = "00001534-1212-efde-1523-785feabcd123"
+
+ def __init__(self, mac_address, manager, window, firmware_path, datfile_path, verbose):
+ self.firmware_path = firmware_path
+ self.datfile_path = datfile_path
+ self.target_mac = mac_address
+ self.window = window
+ self.verbose = verbose
+ self.current_step = 0
+ self.pkt_receipt_interval = 10
+ self.pkt_payload_size = 20
+ self.size_per_receipt = self.pkt_payload_size * self.pkt_receipt_interval
+ self.done = False
+ self.packet_recipt_count = 0
+ self.total_receipt_size = 0
+ self.update_in_progress = False
+ self.caffeinator = Caffeinator()
+ self.success = False
+
+ super().__init__(mac_address, manager)
+
+ def connect(self):
+ self.successful_connection = True
+ super().connect()
+
+ def input_setup(self):
+ """Bin: read binfile into bin_array"""
+ print(
+ "preparing "
+ + os.path.split(self.firmware_path)[1]
+ + " for "
+ + self.target_mac
+ )
+
+ if self.firmware_path == None:
+ raise Exception("input invalid")
+
+ name, extent = os.path.splitext(self.firmware_path)
+
+ if extent == ".bin":
+ self.bin_array = array("B", open(self.firmware_path, "rb").read())
+
+ self.image_size = len(self.bin_array)
+ print("Binary image size: %d" % self.image_size)
+ print(
+ "Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array))
+ )
+ return
+ raise Exception("input invalid")
+
+ def connect_succeeded(self):
+ super().connect_succeeded()
+ print("[%s] Connected" % (self.mac_address))
+
+ def connect_failed(self, error):
+ super().connect_failed(error)
+ self.successful_connection = False
+ print("[%s] Connection failed: %s" % (self.mac_address, str(error)))
+
+ def disconnect_succeeded(self):
+ super().disconnect_succeeded()
+ if not self.success:
+ self.on_failure()
+ print("[%s] Disconnected" % (self.mac_address))
+
+ def characteristic_enable_notifications_succeeded(self, characteristic):
+ if self.verbose and characteristic.uuid == self.UUID_CTRL_POINT:
+ print("Notification Enable succeeded for Control Point Characteristic")
+ self.step_one()
+
+ def characteristic_write_value_succeeded(self, characteristic):
+ if self.verbose and characteristic.uuid == self.UUID_CTRL_POINT:
+ print(
+ "Characteristic value was written successfully for Control Point Characteristic"
+ )
+ if self.verbose and characteristic.uuid == self.UUID_PACKET:
+ print(
+ "Characteristic value was written successfully for Packet Characteristic"
+ )
+ if self.current_step == 1:
+ self.step_two()
+ elif self.current_step == 3:
+ self.step_four()
+ elif self.current_step == 5:
+ self.step_six()
+ elif self.current_step == 6:
+ print("Begin DFU")
+ self.caffeinator.caffeinate()
+ self.step_seven()
+
+ def characteristic_write_value_failed(self, characteristic, error):
+ print("[WARN ] write value failed", str(error))
+ self.update_in_progress = True
+ self.disconnect()
+
+ def characteristic_value_updated(self, characteristic, value):
+ if self.verbose:
+ if characteristic.uuid == self.UUID_CTRL_POINT:
+ print(
+ "Characteristic value was updated for Control Point Characteristic"
+ )
+ if characteristic.uuid == self.UUID_PACKET:
+ print("Characteristic value was updated for Packet Characteristic")
+ print("New value is:", value)
+
+ hexval = array_to_hex_string(value)
+
+ if hexval[:4] == "1001":
+ # Response::StartDFU
+ if hexval[4:] == "01":
+ self.step_three()
+ else:
+ print("[WARN ] StartDFU failed")
+ self.disconnect()
+ elif hexval[:4] == "1002":
+ # Response::InitDFUParameters
+ if hexval[4:] == "01":
+ self.step_five()
+ else:
+ print("[WARN ] InitDFUParameters failed")
+ self.disconnect()
+ elif hexval[:2] == "11":
+ # PacketReceiptNotification
+ self.packet_recipt_count += 1
+ self.total_receipt_size += self.size_per_receipt
+ # verify that the returned size correspond to what was sent
+ ack_size = unpack('<I', value[1:])[0]
+ if ack_size != self.total_receipt_size:
+ print("[WARN ] PacketReceiptNotification failed")
+ print(" acknowledged {} : expected {}".format(ack_size, self.total_receipt_size))
+ self.disconnect()
+ self.window.update_progress_bar()
+ if self.verbose:
+ print("[INFO ] receipt count", str(self.packet_recipt_count))
+ print("[INFO ] receipt size", self.total_receipt_size, "out of", self.image_size)
+ print("[INFO ] progress:", (self.total_receipt_size / self.image_size)*100, "%")
+ if self.done != True:
+ self.i += self.pkt_payload_size
+ self.step_seven()
+ elif hexval[:4] == "1003":
+ # Response::ReceiveFirmwareImage::NoError
+ if hexval[4:] == "01":
+ self.step_eight()
+ else:
+ print("[WARN ] ReceiveFirmwareImage failed")
+ self.disconnect()
+ elif hexval[:4] == "1004":
+ # Response::ValidateFirmware
+ if hexval[4:] == "01":
+ self.step_nine()
+ else:
+ print("[WARN ] ValidateFirmware failed")
+ self.disconnect()
+
+ def services_resolved(self):
+ super().services_resolved()
+ self.update_in_progress = True
+
+ print("[%s] Resolved services" % (self.mac_address))
+ ble_dfu_serv = next(s for s in self.services if s.uuid == self.UUID_DFU_SERVICE)
+ self.ctrl_point_char = next(
+ c for c in ble_dfu_serv.characteristics if c.uuid == self.UUID_CTRL_POINT
+ )
+ self.packet_char = next(
+ c for c in ble_dfu_serv.characteristics if c.uuid == self.UUID_PACKET
+ )
+
+ if self.verbose:
+ print("[INFO ] Enabling notifications for Control Point Characteristic")
+ self.ctrl_point_char.enable_notifications()
+
+ def step_one(self):
+ self.current_step = 1
+ if self.verbose:
+ print(
+ "[INFO ] Sending ('Start DFU' (0x01), 'Application' (0x04)) to DFU Control Point"
+ )
+ self.ctrl_point_char.write_value(bytearray.fromhex("01 04"))
+
+ def step_two(self):
+ self.current_step = 2
+ if self.verbose:
+ print("[INFO ] Sending Image size to the DFU Packet characteristic")
+ x = len(self.bin_array)
+ hex_size_array_lsb = uint32_to_bytes_le(x)
+ zero_pad_array_le(hex_size_array_lsb, 8)
+ self.packet_char.write_value(bytearray(hex_size_array_lsb))
+ print("[INFO ] Waiting for Image Size notification")
+
+ def step_three(self):
+ self.current_step = 3
+ if self.verbose:
+ print("[INFO ] Sending 'INIT DFU' + Init Packet Command")
+ self.ctrl_point_char.write_value(bytearray.fromhex("02 00"))
+
+ def step_four(self):
+ self.current_step = 4
+ if self.verbose:
+ print("[INFO ] Sending the Init image (DAT)")
+ self.packet_char.write_value(bytearray(self.get_init_bin_array()))
+ if self.verbose:
+ print("[INFO ] Send 'INIT DFU' + Init Packet Complete Command")
+ self.ctrl_point_char.write_value(bytearray.fromhex("02 01"))
+ print("[INFO ] Waiting for INIT DFU notification")
+
+ def step_five(self):
+ self.current_step = 5
+ if self.verbose:
+ print("Setting pkt receipt notification interval")
+ self.ctrl_point_char.write_value(bytearray.fromhex("08 0A"))
+
+ def step_six(self):
+ self.current_step = 6
+ if self.verbose:
+ print(
+ "[INFO ] Send 'RECEIVE FIRMWARE IMAGE' command to set DFU in firmware receive state"
+ )
+ self.ctrl_point_char.write_value(bytearray.fromhex("03"))
+ self.segment_count = 0
+ self.i = 0
+ self.segment_total = int(
+ math.ceil(self.image_size / float(self.pkt_payload_size))
+ )
+
+ def step_seven(self):
+ self.current_step = 7
+ # Send bin_array contents as as series of packets (burst mode).
+ # Each segment is pkt_payload_size bytes long.
+ # For every pkt_receipt_interval sends, wait for notification.
+ segment = self.bin_array[self.i : self.i + self.pkt_payload_size]
+ self.packet_char.write_value(segment)
+ self.segment_count += 1
+ if self.segment_count == self.segment_total:
+ self.done = True
+ elif (self.segment_count % self.pkt_receipt_interval) != 0:
+ self.i += self.pkt_payload_size
+ self.step_seven()
+ else:
+ if self.verbose:
+ print("[INFO ] Waiting for Packet Receipt Notifiation")
+
+ def step_eight(self):
+ self.current_step = 8
+ print("[INFO ] Sending Validate command")
+ self.ctrl_point_char.write_value(bytearray.fromhex("04"))
+
+ def step_nine(self):
+ self.current_step = 9
+ print("[INFO ] Activate and reset")
+ self.ctrl_point_char.write_value(bytearray.fromhex("05"))
+ self.update_in_progress = False
+ self.success = True
+ self.on_success()
+ self.disconnect()
+ self.caffeinator.decaffeinate()
+
+ def get_init_bin_array(self):
+ # Open the DAT file and create array of its contents
+ init_bin_array = array("B", open(self.datfile_path, "rb").read())
+ return init_bin_array
+
+class Caffeinator():
+ def __init__(self):
+ try:
+ from gi.repository import Gio
+ self.gio = Gio
+
+ self.gnome_session = self.safe_lookup(
+ "org.gnome.desktop.session",
+ "GNOME session not found, you're on your own for idle timeouts"
+ )
+ if self.gnome_session:
+ self.idle_delay = self.gnome_session.get_uint("idle-delay")
+
+ self.gnome_power = self.safe_lookup(
+ "org.gnome.settings-daemon.plugins.power",
+ "GNOME power settings not found, you're on your own for system sleep"
+ )
+ if self.gnome_power:
+ self.sleep_inactive_battery_timeout = self.gnome_power.get_int("sleep-inactive-battery-timeout")
+ self.sleep_inactive_ac_timeout = self.gnome_power.get_int("sleep-inactive-ac-timeout")
+ self.idle_dim = self.gnome_power.get_boolean("idle-dim")
+ except ImportError:
+ print("[INFO ] GIO not found, disabling caffeine")
+ except AttributeError:
+ print("[INFO ] Unable to load GIO schemas, disabling caffeine")
+
+ # Look up a Gio Settings schema without crashing if it doesn't exist
+ def safe_lookup(self, path, failmsg=None):
+ try:
+ exists = self.gio.SettingsSchema.lookup(path)
+ except AttributeError:
+ # SettingsSchema is new, if it doesn't exist
+ # then fall back to legacy schema lookup
+ exists = (path in self.gio.Settings.list_schemas())
+
+ if exists:
+ return self.gio.Settings.new(path)
+ else:
+ if failmsg:
+ print("[INFO ] {}".format(failmsg))
+ return None
+
+ def caffeinate(self):
+ if self.gnome_session:
+ print("[INFO ] Disabling GNOME idle timeout")
+ self.gnome_session.set_uint("idle-delay", 0)
+ if self.gnome_power:
+ print("[INFO ] Disabling GNOME inactivity sleeping")
+ self.gnome_power.set_int("sleep-inactive-battery-timeout", 0)
+ self.gnome_power.set_int("sleep-inactive-ac-timeout", 0)
+ self.gnome_power.set_boolean("idle-dim", False)
+
+ def decaffeinate(self):
+ if self.gnome_session:
+ print("[INFO ] Restoring GNOME idle timeout")
+ self.gnome_session.set_uint("idle-delay", self.idle_delay)
+ if self.gnome_power:
+ print("[INFO ] Restoring GNOME inactivity sleeping")
+ self.gnome_power.set_int("sleep-inactive-battery-timeout", self.sleep_inactive_battery_timeout)
+ self.gnome_power.set_int("sleep-inactive-ac-timeout", self.sleep_inactive_ac_timeout)
+ self.gnome_power.set_boolean("idle-dim", self.idle_dim)