diff options
Diffstat (limited to 'src/window.py')
-rw-r--r-- | src/window.py | 381 |
1 files changed, 381 insertions, 0 deletions
diff --git a/src/window.py b/src/window.py new file mode 100644 index 0000000..939299c --- /dev/null +++ b/src/window.py @@ -0,0 +1,381 @@ +import subprocess +import configparser +import threading +import urllib.request +from pathlib import Path +import gatt +from gi.repository import Gtk, GObject, GLib +from .bluetooth import ( + InfiniTimeDevice, + InfiniTimeManager, + BluetoothDisabled, + NoAdapterFound, +) +from .ble_dfu import InfiniTimeDFU +from .unpacker import Unpacker +from .quick_deploy import * +from .config import config + + +class ConnectionThread(threading.Thread): + def __init__(self, manager, mac, callback): + threading.Thread.__init__(self) + self.mac = mac + self.manager = manager + self.callback = callback + self.device = None + + def run(self): + self.device = InfiniTimeDevice( + manager=self.manager, mac_address=self.mac, thread=True + ) + self.device.services_done = self.data_received + self.device.connect() + + def data_received(self): + firmware = bytes(self.device.firmware).decode() + if self.device.battery == -1: + battery = "n/a" + else: + battery = "{}%".format(self.device.battery) + GLib.idle_add(self.callback, [firmware, battery]) + + +@Gtk.Template(resource_path="/com/github/theironrobin/siglo/window.ui") +class SigloWindow(Gtk.ApplicationWindow): + __gtype_name__ = "SigloWindow" + # Navigation + main_stack = Gtk.Template.Child() + header_stack = Gtk.Template.Child() + + # Watches view + watches_listbox = Gtk.Template.Child() + + # Watch view + watch_name = Gtk.Template.Child() + watch_address = Gtk.Template.Child() + watch_firmware = Gtk.Template.Child() + watch_battery = Gtk.Template.Child() + ota_pick_tag_combobox = Gtk.Template.Child() + ota_pick_asset_combobox = Gtk.Template.Child() + firmware_run = Gtk.Template.Child() + firmware_file = Gtk.Template.Child() + firmware_run_file = Gtk.Template.Child() + keep_paired_switch = Gtk.Template.Child() + + # Flasher + dfu_stack = Gtk.Template.Child() + dfu_progress_bar = Gtk.Template.Child() + dfu_progress_text = Gtk.Template.Child() + + def __init__(self, **kwargs): + self.ble_dfu = None + self.ota_file = None + self.manager = None + self.current_mac = None + self.asset = None + self.asset_download_url = None + self.tag = None + self.conf = config() + super().__init__(**kwargs) + GObject.threads_init() + self.full_list = get_quick_deploy_list() + GObject.signal_new( + "flash-signal", + self, + GObject.SIGNAL_RUN_LAST, + GObject.TYPE_PYOBJECT, + (GObject.TYPE_PYOBJECT,), + ) + + def disconnect_paired_device(self): + try: + devices = self.manager.devices() + for d in devices: + if d.mac_address == self.manager.get_mac_address() and d.is_connected(): + d.disconnect() + finally: + self.conf.set_property("paired", "False") + + def destroy_manager(self): + if self.manager: + self.manager.stop() + self.manager = None + + def make_watch_row(self, name, mac): + row = Gtk.ListBoxRow() + grid = Gtk.Grid() + grid.set_hexpand(True) + grid.set_row_spacing(8) + grid.set_column_spacing(8) + grid.set_margin_top(8) + grid.set_margin_bottom(8) + grid.set_margin_left(8) + grid.set_margin_right(8) + row.add(grid) + + icon = Gtk.Image.new_from_resource("/com/github/theironrobin/siglo/watch-icon.svg") + grid.attach(icon, 0, 0, 1, 2) + + label_alias = Gtk.Label(label="Name", xalign=1.0) + label_alias.get_style_context().add_class("dim-label") + grid.attach(label_alias, 1, 0, 1, 1) + value_alias = Gtk.Label(label=name, xalign=0.0) + value_alias.set_hexpand(True) + grid.attach(value_alias, 2, 0, 1, 1) + + label_mac = Gtk.Label(label="Address", xalign=1.0) + label_mac.get_style_context().add_class("dim-label") + grid.attach(label_mac, 1, 1, 1, 1) + value_mac = Gtk.Label(label=mac, xalign=0.0) + grid.attach(value_mac, 2, 1, 1, 1) + + arrow = Gtk.Image.new_from_icon_name("go-next-symbolic", Gtk.IconSize.BUTTON) + grid.attach(arrow, 4, 0, 1, 2) + + row.show_all() + return row + + def do_scanning(self): + print("Start scanning") + self.main_stack.set_visible_child_name("scan") + self.header_stack.set_visible_child_name("scan") + if not self.manager: + # create manager if not present yet + try: + self.manager = InfiniTimeManager() + except (gatt.errors.NotReady, BluetoothDisabled): + print("Bluetooth is disabled") + self.main_stack.set_visible_child_name("nodevice") + except NoAdapterFound: + print("No bluetooth adapter found") + self.main_stack.set_visible_child_name("nodevice") + if not self.manager: + return + + if self.conf.get_property("paired"): + self.disconnect_paired_device() + + self.depopulate_listbox() + self.manager.scan_result = False + try: + self.manager.scan_for_infinitime() + except (gatt.errors.NotReady, gatt.errors.Failed) as e: + print(e) + self.main_stack.set_visible_child_name("nodevice") + self.destroy_manager() + try: + if len(self.manager.get_device_set()) > 0: + self.main_stack.set_visible_child_name("watches") + self.header_stack.set_visible_child_name("watches") + else: + self.main_stack.set_visible_child_name("nodevice") + for mac in self.manager.get_device_set(): + print("Found {}".format(mac)) + row = self.make_watch_row(self.manager.aliases[mac], mac) + row.mac = mac + row.alias = self.manager.aliases[mac] + self.watches_listbox.add(row) + except AttributeError as e: + print(e) + self.main_stack.set_visible_child_name("nodevice") + self.destroy_manager() + self.populate_tagbox() + + def depopulate_listbox(self): + children = self.watches_listbox.get_children() + for child in children: + self.watches_listbox.remove(child) + + def populate_tagbox(self): + self.ota_pick_tag_combobox.remove_all() + for tag in get_tags(self.full_list): + self.ota_pick_tag_combobox.append_text(tag) + + def populate_assetbox(self): + self.ota_pick_asset_combobox.remove_all() + for asset in get_assets_by_tag(self.tag, self.full_list): + self.ota_pick_asset_combobox.append_text(asset) + + def callback_device_connect(self, data): + firmware, battery = data + + self.watch_firmware.set_text(firmware) + self.watch_battery.set_text(battery) + + @Gtk.Template.Callback() + def on_watches_listbox_row_activated(self, widget, row): + mac = row.mac + self.current_mac = mac + alias = row.alias + + if self.keep_paired_switch.get_active(): + # Start daemon + subprocess.Popen(["systemctl", "--user", "start", "siglo"]) + self.conf.set_property("paired", "True") + + if self.manager is not None: + thread = ConnectionThread(self.manager, mac, self.callback_device_connect) + thread.daemon = True + thread.start() + + self.watch_name.set_text(alias) + self.watch_address.set_text(mac) + self.main_stack.set_visible_child_name("watch") + self.header_stack.set_visible_child_name("watch") + + @Gtk.Template.Callback() + def on_back_to_devices_clicked(self, *args): + self.main_stack.set_visible_child_name("watches") + self.header_stack.set_visible_child_name("watches") + + @Gtk.Template.Callback() + def ota_pick_tag_combobox_changed_cb(self, widget): + self.tag = self.ota_pick_tag_combobox.get_active_text() + self.populate_assetbox() + + @Gtk.Template.Callback() + def ota_pick_asset_combobox_changed_cb(self, widget): + self.asset = self.ota_pick_asset_combobox.get_active_text() + if self.asset is not None: + self.firmware_run.set_sensitive(True) + self.asset_download_url = get_download_url( + self.asset, self.tag, self.full_list + ) + else: + self.firmware_run.set_sensitive(False) + self.asset_download_url = None + + @Gtk.Template.Callback() + def firmware_file_file_set_cb(self, widget): + print("File set!") + filename = widget.get_filename() + self.ota_file = filename + self.firmware_run_file.set_sensitive(True) + + @Gtk.Template.Callback() + def rescan_button_clicked(self, widget): + self.do_scanning() + + @Gtk.Template.Callback() + def on_bluetooth_settings_clicked(self, widget): + subprocess.Popen(["gnome-control-center", "bluetooth"]) + + @Gtk.Template.Callback() + def ota_file_selected(self, widget): + filename = widget.get_filename() + self.ota_file = filename + self.main_info.set_text("File: " + filename.split("/")[-1]) + self.ota_picked_box.set_visible(True) + self.ota_selection_box.set_visible(False) + self.ota_picked_box.set_sensitive(True) + + @Gtk.Template.Callback() + def firmware_run_file_clicked_cb(self, widget): + self.dfu_stack.set_visible_child_name("ok") + self.main_stack.set_visible_child_name("firmware") + + self.firmware_mode = "manual" + + self.start_flash() + + @Gtk.Template.Callback() + def on_firmware_run_clicked(self, widget): + self.dfu_stack.set_visible_child_name("ok") + self.main_stack.set_visible_child_name("firmware") + + self.firmware_mode = "auto" + + file_name = "/tmp/" + self.asset + + print("Downloading {}".format(self.asset_download_url)) + + local_filename, headers = urllib.request.urlretrieve( + self.asset_download_url, file_name + ) + self.ota_file = local_filename + + self.start_flash() + + def start_flash(self): + unpacker = Unpacker() + try: + binfile, datfile = unpacker.unpack_zipfile(self.ota_file) + except Exception as e: + print("ERR") + print(e) + pass + + self.ble_dfu = InfiniTimeDFU( + mac_address=self.current_mac, + manager=self.manager, + window=self, + firmware_path=binfile, + datfile_path=datfile, + verbose=False, + ) + self.ble_dfu.on_failure = self.on_flash_failed + self.ble_dfu.on_success = self.on_flash_done + self.ble_dfu.input_setup() + self.dfu_progress_text.set_text(self.get_prog_text()) + self.ble_dfu.connect() + + def on_flash_failed(self): + self.dfu_stack.set_visible_child_name("fail") + + def on_flash_done(self): + self.dfu_stack.set_visible_child_name("done") + + @Gtk.Template.Callback() + def on_dfu_retry_clicked(self, widget): + if self.firmware_mode == "auto": + self.on_firmware_run_clicked(widget) + + @Gtk.Template.Callback() + def flash_it_button_clicked(self, widget): + if self.deploy_type == "quick": + file_name = "/tmp/" + self.asset + local_filename, headers = urllib.request.urlretrieve( + self.asset_download_url, file_name + ) + self.ota_file = local_filename + + @Gtk.Template.Callback() + def deploy_type_toggled(self, widget): + if ( + self.conf.get_property("deploy_type") == "manual" + and self.auto_switch_deploy_type + ): + self.auto_switch_deploy_type = False + else: + if self.conf.get_property("deploy_type") == "quick": + self.conf.set_property("deploy_type", "manual") + else: + self.conf.set_property("deploy_type", "quick") + self.rescan_button.emit("clicked") + + def update_progress_bar(self): + self.dfu_progress_bar.set_fraction( + self.ble_dfu.total_receipt_size / self.ble_dfu.image_size + ) + self.dfu_progress_text.set_text(self.get_prog_text()) + + def get_prog_text(self): + return ( + str(self.ble_dfu.total_receipt_size) + + " / " + + str(self.ble_dfu.image_size) + + " bytes received" + ) + + def show_complete(self, success): + if success: + self.rescan_button.set_sensitive("True") + self.main_info.set_text("OTA Update Complete") + else: + self.main_info.set_text("OTA Update Failed") + self.bt_spinner.set_visible(False) + self.dfu_progress_box.set_visible(False) + self.ota_picked_box.set_visible(True) + if self.conf.get_property("deploy_type") == "quick": + self.auto_bbox_scan_pass.set_visible(True) |