From bab4f35fc5c13341fb9cc96a7cb863c5cd5c3f53 Mon Sep 17 00:00:00 2001 From: P. J. McDermott Date: Sun, 01 Oct 2023 16:32:49 -0400 Subject: New upstream version 0.9.9 --- (limited to 'src/ble_dfu.py') diff --git a/src/ble_dfu.py b/src/ble_dfu.py new file mode 100644 index 0000000..83d20d6 --- /dev/null +++ b/src/ble_dfu.py @@ -0,0 +1,334 @@ +from array import array +import gatt +import os +from .util import * +import math +from struct import unpack + +class InfiniTimeDFU(gatt.Device): + # Class constants + UUID_DFU_SERVICE = "00001530-1212-efde-1523-785feabcd123" + UUID_CTRL_POINT = "00001531-1212-efde-1523-785feabcd123" + UUID_PACKET = "00001532-1212-efde-1523-785feabcd123" + UUID_VERSION = "00001534-1212-efde-1523-785feabcd123" + + def __init__(self, mac_address, manager, window, firmware_path, datfile_path, verbose): + self.firmware_path = firmware_path + self.datfile_path = datfile_path + self.target_mac = mac_address + self.window = window + self.verbose = verbose + self.current_step = 0 + self.pkt_receipt_interval = 10 + self.pkt_payload_size = 20 + self.size_per_receipt = self.pkt_payload_size * self.pkt_receipt_interval + self.done = False + self.packet_recipt_count = 0 + self.total_receipt_size = 0 + self.update_in_progress = False + self.caffeinator = Caffeinator() + self.success = False + + super().__init__(mac_address, manager) + + def connect(self): + self.successful_connection = True + super().connect() + + def input_setup(self): + """Bin: read binfile into bin_array""" + print( + "preparing " + + os.path.split(self.firmware_path)[1] + + " for " + + self.target_mac + ) + + if self.firmware_path == None: + raise Exception("input invalid") + + name, extent = os.path.splitext(self.firmware_path) + + if extent == ".bin": + self.bin_array = array("B", open(self.firmware_path, "rb").read()) + + self.image_size = len(self.bin_array) + print("Binary image size: %d" % self.image_size) + print( + "Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array)) + ) + return + raise Exception("input invalid") + + def connect_succeeded(self): + super().connect_succeeded() + print("[%s] Connected" % (self.mac_address)) + + def connect_failed(self, error): + super().connect_failed(error) + self.successful_connection = False + print("[%s] Connection failed: %s" % (self.mac_address, str(error))) + + def disconnect_succeeded(self): + super().disconnect_succeeded() + if not self.success: + self.on_failure() + print("[%s] Disconnected" % (self.mac_address)) + + def characteristic_enable_notifications_succeeded(self, characteristic): + if self.verbose and characteristic.uuid == self.UUID_CTRL_POINT: + print("Notification Enable succeeded for Control Point Characteristic") + self.step_one() + + def characteristic_write_value_succeeded(self, characteristic): + if self.verbose and characteristic.uuid == self.UUID_CTRL_POINT: + print( + "Characteristic value was written successfully for Control Point Characteristic" + ) + if self.verbose and characteristic.uuid == self.UUID_PACKET: + print( + "Characteristic value was written successfully for Packet Characteristic" + ) + if self.current_step == 1: + self.step_two() + elif self.current_step == 3: + self.step_four() + elif self.current_step == 5: + self.step_six() + elif self.current_step == 6: + print("Begin DFU") + self.caffeinator.caffeinate() + self.step_seven() + + def characteristic_write_value_failed(self, characteristic, error): + print("[WARN ] write value failed", str(error)) + self.update_in_progress = True + self.disconnect() + + def characteristic_value_updated(self, characteristic, value): + if self.verbose: + if characteristic.uuid == self.UUID_CTRL_POINT: + print( + "Characteristic value was updated for Control Point Characteristic" + ) + if characteristic.uuid == self.UUID_PACKET: + print("Characteristic value was updated for Packet Characteristic") + print("New value is:", value) + + hexval = array_to_hex_string(value) + + if hexval[:4] == "1001": + # Response::StartDFU + if hexval[4:] == "01": + self.step_three() + else: + print("[WARN ] StartDFU failed") + self.disconnect() + elif hexval[:4] == "1002": + # Response::InitDFUParameters + if hexval[4:] == "01": + self.step_five() + else: + print("[WARN ] InitDFUParameters failed") + self.disconnect() + elif hexval[:2] == "11": + # PacketReceiptNotification + self.packet_recipt_count += 1 + self.total_receipt_size += self.size_per_receipt + # verify that the returned size correspond to what was sent + ack_size = unpack('