Refactor for better separation of concerns

This commit is contained in:
Djuri Baars 2025-01-05 17:04:44 +01:00
parent 8beee8edf1
commit dca24f91d3
No known key found for this signature in database
GPG Key ID: 61B9B2DDE5AA3AC1
4 changed files with 501 additions and 120 deletions

11
app.py
View File

@ -1,8 +1,15 @@
from app.main import BTClockOTAUpdater
import wx
from app.gui.main_window import MainWindow
from app.controller import AppController
if __name__ == "__main__":
app = wx.App(False)
frame = BTClockOTAUpdater(None, 'BTClock OTA updater')
# Create the main window first
window = MainWindow(None, 'BTClock OTA updater')
# Then create the controller and set it in the window
controller = AppController(window)
window.set_controller(controller)
app.MainLoop()

151
app/controller.py Normal file
View File

@ -0,0 +1,151 @@
import concurrent.futures
import os
import wx
from wx import ID_OK, MessageBox, ICON_ERROR
from app.api import ApiHandler
from app.fw_updater import FwUpdater
from app.release_checker import ReleaseChecker
from app.utils import get_app_data_folder
from app.zeroconf_listener import ZeroconfListener
from zeroconf import ServiceBrowser, Zeroconf
from app.gui.serial_flash_dialog import SerialFlashDialog
class AppController:
def __init__(self, main_window):
self.main_window = main_window
self.releaseChecker = ReleaseChecker()
self.zeroconf = Zeroconf()
self.listener = ZeroconfListener(self.on_zeroconf_state_change)
self.browser = ServiceBrowser(
self.zeroconf, "_http._tcp.local.", self.listener)
self.api_handler = ApiHandler()
self.fw_updater = FwUpdater(self.call_progress, self.call_event)
# Start fetching the latest release
wx.CallAfter(self.fetch_latest_release_async)
wx.YieldIfNeeded()
def call_progress(self, progress):
progressPerc = int(progress*100)
self.main_window.update_status(f"Progress: {progressPerc}%")
wx.CallAfter(self.main_window.update_progress, progress)
def call_event(self, message):
self.main_window.update_status(message)
def on_zeroconf_state_change(self, type, name, state, info):
if state == "Added":
deviceSettings = self.api_handler.get_settings(
info.parsed_addresses()[0])
self.main_window.update_device_list(type, name, state, info, deviceSettings)
elif state == "Removed":
self.main_window.update_device_list(type, name, state, info, None)
def fetch_latest_release_async(self):
app_folder = get_app_data_folder()
if not os.path.exists(app_folder):
os.makedirs(app_folder)
executor = concurrent.futures.ThreadPoolExecutor()
future = executor.submit(self.releaseChecker.fetch_latest_release)
future.add_done_callback(self.handle_latest_release)
def handle_latest_release(self, future):
try:
self.latest_release = future.result()
self.main_window.update_firmware_label(
self.latest_release,
self.releaseChecker.commit_hash
)
except Exception as e:
self.main_window.update_firmware_label(
"Error",
str(e)
)
def handle_serial_flash(self):
if not hasattr(self, 'latest_release'):
wx.MessageBox("Please wait for firmware to be downloaded", "Error", wx.OK | wx.ICON_ERROR)
return
dlg = SerialFlashDialog(self.main_window, self.fw_updater, self.latest_release)
if dlg.ShowModal() == wx.ID_OK:
port = dlg.get_selected_port()
if port:
hw_rev = dlg.get_selected_revision()
preserve_nvs = dlg.get_preserve_nvs()
self.fw_updater.start_serial_firmware_update(
self.latest_release,
port,
hw_rev,
preserve_nvs
)
dlg.Destroy()
def handle_firmware_update(self):
selected_index = self.main_window.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.main_window.device_list.GetItemText(selected_index, 0)
hw_rev = self.main_window.device_list.GetItemText(selected_index, 3)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.fw_updater.start_firmware_update(self.latest_release, address, hw_rev)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update",
"Error", wx.ICON_ERROR)
def handle_fs_update(self):
selected_index = self.main_window.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.main_window.device_list.GetItemText(selected_index, 0)
hw_rev = self.main_window.device_list.GetItemText(selected_index, 3)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.fw_updater.start_fs_update(self.latest_release, address, hw_rev)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update",
"Error", wx.ICON_ERROR)
def handle_identify(self):
selected_index = self.main_window.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.main_window.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.api_handler.identify_btclock(address)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox(
"Please select a device to identify", "Error", wx.ICON_ERROR)
def handle_open_webui(self):
selected_index = self.main_window.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.main_window.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
import webbrowser
import threading
thread = threading.Thread(
target=lambda: webbrowser.open(f"http://{address}"))
thread.start()
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox(
"Please select a device to open WebUI", "Error", wx.ICON_ERROR)

View File

@ -1,127 +1,55 @@
import threading
import webbrowser
from app.api import ApiHandler
from app.gui.devices_panel import DevicesPanel
from app.zeroconf_listener import ZeroconfListener
import wx
from wx import (
Panel, Button, BoxSizer, HORIZONTAL, ALL
)
class ActionButtonPanel(wx.Panel):
currentlyUpdating = False
class ActionButtonPanel(Panel):
def __init__(self, parent, app_controller):
Panel.__init__(self, parent)
self.app_controller = app_controller
def __init__(self, parent:wx.Panel, parent_frame:wx.Frame, *args, **kwargs):
super(ActionButtonPanel, self).__init__(parent, *args, **kwargs)
# Create buttons
self.update_fw_btn = Button(self, label="Update Firmware")
self.update_fs_btn = Button(self, label="Update WebUI")
self.identify_btn = Button(self, label="Identify")
self.open_webui_btn = Button(self, label="Open WebUI")
self.parent = parent
self.parent_frame = parent_frame
self.api_handler:ApiHandler = parent_frame.api_handler
self.device_list:DevicesPanel = parent_frame.device_list
self.listener:ZeroconfListener = parent_frame.listener
self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED,
self.on_item_deselected)
self.InitUI()
def InitUI(self):
sizer = wx.BoxSizer(wx.HORIZONTAL)
self.update_button = wx.Button(self, label="Update Firmware")
self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware)
self.update_fs_button = wx.Button(self, label="Update WebUI")
self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs)
self.identify_button = wx.Button(self, label="Identify")
self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify)
self.open_webif_button = wx.Button(self, label="Open WebUI")
self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui)
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
sizer.Add(self.update_button)
sizer.Add(self.update_fs_button)
sizer.Add(self.identify_button)
sizer.Add(self.open_webif_button)
# Add buttons to a sizer
sizer = BoxSizer(HORIZONTAL)
sizer.Add(self.update_fw_btn, 0, ALL, 5)
sizer.Add(self.update_fs_btn, 0, ALL, 5)
sizer.Add(self.identify_btn, 0, ALL, 5)
sizer.Add(self.open_webui_btn, 0, ALL, 5)
self.SetSizer(sizer)
def on_click_update_firmware(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
hw_rev = self.device_list.GetItemText(selected_index, 3)
# Initially disable buttons until device is selected
self.update_fw_btn.Disable()
self.update_fs_btn.Disable()
self.identify_btn.Disable()
self.open_webui_btn.Disable()
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses(
)[0] if info.parsed_addresses() else "N/A"
self.parent_frame.fw_updater.start_firmware_update(self.parent_frame.releaseChecker.release_name, address, hw_rev)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update",
"Error", wx.ICON_ERROR)
# Bind events
self.update_fw_btn.Bind(wx.EVT_BUTTON, self.on_update_firmware)
self.update_fs_btn.Bind(wx.EVT_BUTTON, self.on_update_fs)
self.identify_btn.Bind(wx.EVT_BUTTON, self.on_identify)
self.open_webui_btn.Bind(wx.EVT_BUTTON, self.on_open_webui)
def on_click_webui(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses(
)[0] if info.parsed_addresses() else "N/A"
thread = threading.Thread(
target=lambda: webbrowser.open(f"http://{address}"))
thread.start()
def on_update_firmware(self, event):
self.app_controller.handle_firmware_update()
def on_click_update_fs(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
hw_rev = self.device_list.GetItemText(selected_index, 3)
info = self.listener.services.get(service_name)
if self.currentlyUpdating:
wx.MessageBox("Please wait, already updating",
"Error", wx.ICON_ERROR)
return
def on_update_fs(self, event):
self.app_controller.handle_fs_update()
if info:
address = info.parsed_addresses(
)[0] if info.parsed_addresses() else "N/A"
self.parent_frame.fw_updater.start_fs_update(self.parent_frame.releaseChecker.release_name, address, hw_rev)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update",
"Error", wx.ICON_ERROR)
def on_click_identify(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses(
)[0] if info.parsed_addresses() else "N/A"
port = info.port
self.api_handler.identify_btclock(address)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox(
"Please select a device to make an API call", "Error", wx.ICON_ERROR)
def on_item_selected(self, event):
self.update_button.Enable()
self.update_fs_button.Enable()
self.identify_button.Enable()
self.open_webif_button.Enable()
def on_identify(self, event):
self.app_controller.handle_identify()
def on_item_deselected(self, event):
if self.device_list.GetFirstSelected() == -1:
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
def on_open_webui(self, event):
self.app_controller.handle_open_webui()
def enable_device_buttons(self, enable=True):
"""Enable or disable buttons that require a device to be selected"""
self.update_fw_btn.Enable(enable)
self.update_fs_btn.Enable(enable)
self.identify_btn.Enable(enable)
self.open_webui_btn.Enable(enable)

295
app/gui/main_window.py Normal file
View File

@ -0,0 +1,295 @@
import wx
import wx.richtext as rt
from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin
import logging
import sys
import os
import webbrowser
from datetime import datetime
from app.gui.action_button_panel import ActionButtonPanel
from app.gui.devices_panel import DevicesPanel
from app.gui.serial_flash_dialog import SerialFlashDialog
from app.utils import get_app_data_folder
# Import wx constants
from wx import (
Frame, Panel, BoxSizer, VERTICAL, HORIZONTAL,
EXPAND, ALL, StaticText, Gauge, Menu, MenuBar,
ID_OPEN, ID_ANY, ID_ABOUT, ID_EXIT, NOT_FOUND,
LIST_AUTOSIZE_USEHEADER, OK, MessageDialog,
LaunchDefaultBrowser, ListCtrl, LC_REPORT,
LC_VIRTUAL, BORDER_NONE
)
class LogRedirector:
def __init__(self, logger, level):
self.logger = logger
self.level = level
self.buffer = ""
def write(self, text):
self.buffer += text
while '\n' in self.buffer:
line, self.buffer = self.buffer.split('\n', 1)
if line.strip(): # Only log non-empty lines
self.logger.log(self.level, line.rstrip())
def flush(self):
if self.buffer:
self.logger.log(self.level, self.buffer.rstrip())
self.buffer = ""
class LogListCtrl(wx.ListCtrl, ListCtrlAutoWidthMixin):
def __init__(self, parent):
wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT | wx.LC_VIRTUAL | wx.BORDER_NONE)
ListCtrlAutoWidthMixin.__init__(self)
self.log_entries = []
# Add columns
self.InsertColumn(0, "Time", width=80)
self.InsertColumn(1, "Level", width=70)
self.InsertColumn(2, "Message", width=600)
# Set up virtual list
self.SetItemCount(0)
# Bind events
self.Bind(wx.EVT_LIST_CACHE_HINT, self.OnCacheHint)
def OnGetItemText(self, item, col):
try:
if item < len(self.log_entries):
entry = self.log_entries[item]
if col == 0:
return str(entry['time'])
elif col == 1:
return str(entry['level'])
else:
return str(entry['message'])
except Exception:
pass
return ""
def OnCacheHint(self, evt):
evt.Skip()
def append_log(self, time, level, message):
try:
# Convert float timestamp to string if needed
if isinstance(time, (float, int)):
time = datetime.fromtimestamp(time).strftime('%H:%M:%S')
elif isinstance(time, datetime):
time = time.strftime('%H:%M:%S')
elif not isinstance(time, str):
time = str(time)
self.log_entries.append({
'time': time,
'level': level,
'message': message
})
self.SetItemCount(len(self.log_entries))
# Ensure the last item is visible
if len(self.log_entries) > 0:
self.EnsureVisible(len(self.log_entries) - 1)
except Exception:
pass
class LogListHandler(logging.Handler):
def __init__(self, ctrl):
super().__init__()
self.ctrl = ctrl
def emit(self, record):
try:
# Get time from the record
if hasattr(record, 'asctime'):
time = record.asctime.split()[3] # Get HH:MM:SS part
else:
from datetime import datetime
time = datetime.fromtimestamp(record.created).strftime('%H:%M:%S')
level = record.levelname
# Format the message with its arguments
try:
msg = record.getMessage()
except Exception:
msg = str(record.msg)
wx.CallAfter(self.ctrl.append_log, time, level, msg)
except Exception:
self.handleError(record)
class MainWindow(wx.Frame):
def __init__(self, parent, title):
wx.Frame.__init__(self, parent, title=title, size=(800, 500))
self.app_controller = None
self.SetMinSize((800, 500))
panel = wx.Panel(self)
# Create log list control
self.log_ctrl = LogListCtrl(panel)
# Set up logging to capture all output
handler = LogListHandler(self.log_ctrl)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', '%H:%M:%S')
handler.setFormatter(formatter)
# Get the root logger and remove any existing handlers
root_logger = logging.getLogger()
for h in root_logger.handlers[:]:
root_logger.removeHandler(h)
# Add our handler and set level to DEBUG to capture everything
root_logger.addHandler(handler)
root_logger.setLevel(logging.DEBUG)
# Also capture esptool output
esptool_logger = logging.getLogger('esptool')
esptool_logger.addHandler(handler)
esptool_logger.setLevel(logging.DEBUG)
# Redirect stdout and stderr to the log
sys.stdout = LogRedirector(root_logger, logging.INFO)
sys.stderr = LogRedirector(root_logger, logging.ERROR)
self.device_list = DevicesPanel(panel)
self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_device_selected)
self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self.on_device_deselected)
vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(self.device_list, proportion=2,
flag=wx.EXPAND | wx.ALL, border=20)
hbox = wx.BoxSizer(wx.HORIZONTAL)
self.fw_label = wx.StaticText(
panel, label=f"Fetching latest version from GitHub...")
hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5)
# Create action buttons but don't bind them yet
self.actionButtons = ActionButtonPanel(panel, None)
hbox.AddStretchSpacer()
hbox.Add(self.actionButtons, 2, wx.EXPAND | wx.ALL, 5)
vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20)
self.progress_bar = wx.Gauge(panel, range=100)
vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20)
vbox.Add(self.log_ctrl, 1, flag=wx.EXPAND | wx.ALL, border=20)
panel.SetSizer(vbox)
self.setup_ui()
def setup_ui(self):
self.setup_menubar()
self.status_bar = self.CreateStatusBar(2)
self.Show(True)
self.Centre()
def setup_menubar(self):
filemenu = wx.Menu()
menuOpenDownloadDir = filemenu.Append(
wx.ID_OPEN, "&Open Download Dir", " Open the directory with firmware files and cache")
menuFlashSerial = filemenu.Append(
wx.ID_ANY, "Flash via &Serial...", " Flash firmware using serial connection")
filemenu.AppendSeparator()
menuAbout = filemenu.Append(
wx.ID_ABOUT, "&About", " Information about this program")
menuExit = filemenu.Append(
wx.ID_EXIT, "E&xit", " Terminate the program")
menuBar = wx.MenuBar()
menuBar.Append(filemenu, "&File")
self.SetMenuBar(menuBar)
self.Bind(wx.EVT_MENU, self.OnOpenDownloadFolder, menuOpenDownloadDir)
self.Bind(wx.EVT_MENU, self.on_serial_flash, menuFlashSerial)
self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout)
self.Bind(wx.EVT_MENU, self.OnExit, menuExit)
def update_device_list(self, type, name, state, info, settings):
index = self.device_list.FindItem(0, name)
if state == "Added":
version = info.properties.get(b"rev").decode()
fsHash = "Too old"
hwRev = "REV_A_EPD_2_13"
if 'gitTag' in settings:
version = settings["gitTag"]
if 'fsRev' in settings:
fsHash = settings['fsRev'][:7]
if (info.properties.get(b"hw_rev") is not None):
hwRev = info.properties.get(b"hw_rev").decode()
fwHash = info.properties.get(b"rev").decode()[:7]
address = info.parsed_addresses()[0]
if index == wx.NOT_FOUND:
index = self.device_list.InsertItem(
self.device_list.GetItemCount(), type)
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, version)
self.device_list.SetItem(index, 2, fwHash)
self.device_list.SetItem(index, 3, hwRev)
self.device_list.SetItem(index, 4, address)
else:
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, version)
self.device_list.SetItem(index, 2, fwHash)
self.device_list.SetItem(index, 3, hwRev)
self.device_list.SetItem(index, 4, address)
self.device_list.SetItem(index, 5, fsHash)
self.device_list.SetItemData(index, index)
self.device_list.itemDataMap[index] = [
name, version, fwHash, hwRev, address, fsHash]
for col in range(0, len(self.device_list.column_headings)):
self.device_list.SetColumnWidth(
col, wx.LIST_AUTOSIZE_USEHEADER)
elif state == "Removed":
if index != wx.NOT_FOUND:
self.device_list.DeleteItem(index)
def update_progress(self, progress):
progressPerc = int(progress*100)
self.progress_bar.SetValue(progressPerc)
wx.YieldIfNeeded()
def update_status(self, message):
self.SetStatusText(message)
def update_firmware_label(self, version, commit_hash):
self.fw_label.SetLabelText(f"Downloaded firmware version: {version}\nCommit: {commit_hash}")
def OnOpenDownloadFolder(self, e):
wx.LaunchDefaultBrowser(get_app_data_folder())
def OnAbout(self, e):
dlg = wx.MessageDialog(
self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK)
dlg.ShowModal()
dlg.Destroy()
def OnExit(self, e):
self.Close(False)
def on_serial_flash(self, event):
self.app_controller.handle_serial_flash()
def set_controller(self, controller):
self.app_controller = controller
self.actionButtons.app_controller = controller
def on_device_selected(self, event):
self.actionButtons.enable_device_buttons(True)
event.Skip()
def on_device_deselected(self, event):
if self.device_list.GetFirstSelected() == -1:
self.actionButtons.enable_device_buttons(False)
event.Skip()