from array import array import gatt import os from .util import * import math from struct import unpack class InfiniTimeDFU(gatt.Device): # Class constants UUID_DFU_SERVICE = "00001530-1212-efde-1523-785feabcd123" UUID_CTRL_POINT = "00001531-1212-efde-1523-785feabcd123" UUID_PACKET = "00001532-1212-efde-1523-785feabcd123" UUID_VERSION = "00001534-1212-efde-1523-785feabcd123" def __init__(self, mac_address, manager, window, firmware_path, datfile_path, verbose): self.firmware_path = firmware_path self.datfile_path = datfile_path self.target_mac = mac_address self.window = window self.verbose = verbose self.current_step = 0 self.pkt_receipt_interval = 10 self.pkt_payload_size = 20 self.size_per_receipt = self.pkt_payload_size * self.pkt_receipt_interval self.done = False self.packet_recipt_count = 0 self.total_receipt_size = 0 self.update_in_progress = False self.caffeinator = Caffeinator() self.success = False super().__init__(mac_address, manager) def connect(self): self.successful_connection = True super().connect() def input_setup(self): """Bin: read binfile into bin_array""" print( "preparing " + os.path.split(self.firmware_path)[1] + " for " + self.target_mac ) if self.firmware_path == None: raise Exception("input invalid") name, extent = os.path.splitext(self.firmware_path) if extent == ".bin": self.bin_array = array("B", open(self.firmware_path, "rb").read()) self.image_size = len(self.bin_array) print("Binary image size: %d" % self.image_size) print( "Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array)) ) return raise Exception("input invalid") def connect_succeeded(self): super().connect_succeeded() print("[%s] Connected" % (self.mac_address)) def connect_failed(self, error): super().connect_failed(error) self.successful_connection = False print("[%s] Connection failed: %s" % (self.mac_address, str(error))) def disconnect_succeeded(self): super().disconnect_succeeded() if not self.success: self.on_failure() print("[%s] Disconnected" % (self.mac_address)) def characteristic_enable_notifications_succeeded(self, characteristic): if self.verbose and characteristic.uuid == self.UUID_CTRL_POINT: print("Notification Enable succeeded for Control Point Characteristic") self.step_one() def characteristic_write_value_succeeded(self, characteristic): if self.verbose and characteristic.uuid == self.UUID_CTRL_POINT: print( "Characteristic value was written successfully for Control Point Characteristic" ) if self.verbose and characteristic.uuid == self.UUID_PACKET: print( "Characteristic value was written successfully for Packet Characteristic" ) if self.current_step == 1: self.step_two() elif self.current_step == 3: self.step_four() elif self.current_step == 5: self.step_six() elif self.current_step == 6: print("Begin DFU") self.caffeinator.caffeinate() self.step_seven() def characteristic_write_value_failed(self, characteristic, error): print("[WARN ] write value failed", str(error)) self.update_in_progress = True self.disconnect() def characteristic_value_updated(self, characteristic, value): if self.verbose: if characteristic.uuid == self.UUID_CTRL_POINT: print( "Characteristic value was updated for Control Point Characteristic" ) if characteristic.uuid == self.UUID_PACKET: print("Characteristic value was updated for Packet Characteristic") print("New value is:", value) hexval = array_to_hex_string(value) if hexval[:4] == "1001": # Response::StartDFU if hexval[4:] == "01": self.step_three() else: print("[WARN ] StartDFU failed") self.disconnect() elif hexval[:4] == "1002": # Response::InitDFUParameters if hexval[4:] == "01": self.step_five() else: print("[WARN ] InitDFUParameters failed") self.disconnect() elif hexval[:2] == "11": # PacketReceiptNotification self.packet_recipt_count += 1 self.total_receipt_size += self.size_per_receipt # verify that the returned size correspond to what was sent ack_size = unpack('