btclock-ota-flasher/app/main.py
2025-01-05 16:26:28 +01:00

336 lines
12 KiB
Python

import concurrent.futures
import logging
import traceback
import sys
from datetime import datetime
import serial
from app.gui.action_button_panel import ActionButtonPanel
from app.gui.serial_flash_dialog import SerialFlashDialog
from app.release_checker import ReleaseChecker
import wx
import wx.richtext as rt
from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin
from zeroconf import ServiceBrowser, Zeroconf
import os
import webbrowser
from app import espota
from app.api import ApiHandler
from app.fw_updater import FwUpdater
from app.gui.devices_panel import DevicesPanel
from app.utils import get_app_data_folder
from app.zeroconf_listener import ZeroconfListener
from app.espota import FLASH, SPIFFS
# Type: ignore[attr-defined]
# The above comment tells the linter to ignore attribute-defined errors for this file
class LogRedirector:
def __init__(self, logger, level):
self.logger = logger
self.level = level
self.buffer = ""
def write(self, text):
self.buffer += text
while '\n' in self.buffer:
line, self.buffer = self.buffer.split('\n', 1)
if line.strip(): # Only log non-empty lines
self.logger.log(self.level, line.rstrip())
def flush(self):
if self.buffer:
self.logger.log(self.level, self.buffer.rstrip())
self.buffer = ""
class LogListCtrl(wx.ListCtrl, ListCtrlAutoWidthMixin): # type: ignore[misc]
def __init__(self, parent):
wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT | wx.LC_VIRTUAL | wx.BORDER_NONE) # type: ignore[attr-defined]
ListCtrlAutoWidthMixin.__init__(self)
self.log_entries = []
# Add columns
self.InsertColumn(0, "Time", width=80)
self.InsertColumn(1, "Level", width=70)
self.InsertColumn(2, "Message", width=600)
# Set up virtual list
self.SetItemCount(0)
# Bind events
self.Bind(wx.EVT_LIST_CACHE_HINT, self.OnCacheHint)
def OnGetItemText(self, item, col):
try:
if item < len(self.log_entries):
entry = self.log_entries[item]
if col == 0:
return str(entry['time'])
elif col == 1:
return str(entry['level'])
else:
return str(entry['message'])
except Exception:
pass
return ""
def OnCacheHint(self, evt):
evt.Skip()
def append_log(self, time, level, message):
try:
# Convert float timestamp to string if needed
if isinstance(time, (float, int)):
time = datetime.fromtimestamp(time).strftime('%H:%M:%S')
elif isinstance(time, datetime):
time = time.strftime('%H:%M:%S')
elif not isinstance(time, str):
time = str(time)
self.log_entries.append({
'time': time,
'level': level,
'message': message
})
self.SetItemCount(len(self.log_entries))
# Ensure the last item is visible
if len(self.log_entries) > 0:
self.EnsureVisible(len(self.log_entries) - 1)
except Exception:
pass
class LogListHandler(logging.Handler):
def __init__(self, ctrl):
super().__init__()
self.ctrl = ctrl
def emit(self, record):
try:
# Get time from the record
if hasattr(record, 'asctime'):
time = record.asctime.split()[3] # Get HH:MM:SS part
else:
from datetime import datetime
time = datetime.fromtimestamp(record.created).strftime('%H:%M:%S')
level = record.levelname
# Format the message with its arguments
try:
msg = record.getMessage()
except Exception:
msg = str(record.msg)
wx.CallAfter(self.ctrl.append_log, time, level, msg)
except Exception:
self.handleError(record)
class BTClockOTAApp(wx.App):
def OnInit(self):
return True
class BTClockOTAUpdater(wx.Frame):
def __init__(self, parent, title):
wx.Frame.__init__(self, parent, title=title, size=(800, 500))
self.SetMinSize((800, 500))
self.releaseChecker = ReleaseChecker()
self.zeroconf = Zeroconf()
self.listener = ZeroconfListener(self.on_zeroconf_state_change)
self.browser = ServiceBrowser(
self.zeroconf, "_http._tcp.local.", self.listener)
self.api_handler = ApiHandler()
self.fw_updater = FwUpdater(self.call_progress, self.SetStatusText)
panel = wx.Panel(self)
# Create log list control
self.log_ctrl = LogListCtrl(panel)
# Set up logging to capture all output
handler = LogListHandler(self.log_ctrl)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', '%H:%M:%S')
handler.setFormatter(formatter)
# Get the root logger and remove any existing handlers
root_logger = logging.getLogger()
for h in root_logger.handlers[:]:
root_logger.removeHandler(h)
# Add our handler and set level to DEBUG to capture everything
root_logger.addHandler(handler)
root_logger.setLevel(logging.DEBUG)
# Also capture esptool output
esptool_logger = logging.getLogger('esptool')
esptool_logger.addHandler(handler)
esptool_logger.setLevel(logging.DEBUG)
# Redirect stdout and stderr to the log
sys.stdout = LogRedirector(root_logger, logging.INFO)
sys.stderr = LogRedirector(root_logger, logging.ERROR)
self.device_list = DevicesPanel(panel)
vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(self.device_list, proportion=2,
flag=wx.EXPAND | wx.ALL, border=20)
hbox = wx.BoxSizer(wx.HORIZONTAL)
self.fw_label = wx.StaticText(
panel, label=f"Fetching latest version from GitHub...")
hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5)
self.actionButtons = ActionButtonPanel(
panel, self)
hbox.AddStretchSpacer()
hbox.Add(self.actionButtons, 2, wx.EXPAND | wx.ALL, 5)
vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20)
self.progress_bar = wx.Gauge(panel, range=100)
vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20)
vbox.Add(self.log_ctrl, 1, flag=wx.EXPAND | wx.ALL, border=20)
panel.SetSizer(vbox)
self.setup_ui()
wx.CallAfter(self.fetch_latest_release_async)
wx.YieldIfNeeded()
def setup_ui(self):
self.setup_menubar()
self.status_bar = self.CreateStatusBar(2)
self.Show(True)
self.Centre()
def setup_menubar(self):
filemenu = wx.Menu()
menuOpenDownloadDir = filemenu.Append(
wx.ID_OPEN, "&Open Download Dir", " Open the directory with firmware files and cache")
menuFlashSerial = filemenu.Append(
wx.ID_ANY, "Flash via &Serial...", " Flash firmware using serial connection")
filemenu.AppendSeparator()
menuAbout = filemenu.Append(
wx.ID_ABOUT, "&About", " Information about this program")
menuExit = filemenu.Append(
wx.ID_EXIT, "E&xit", " Terminate the program")
menuBar = wx.MenuBar()
menuBar.Append(filemenu, "&File")
self.SetMenuBar(menuBar)
self.Bind(wx.EVT_MENU, self.OnOpenDownloadFolder, menuOpenDownloadDir)
self.Bind(wx.EVT_MENU, self.on_serial_flash, menuFlashSerial)
self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout)
self.Bind(wx.EVT_MENU, self.OnExit, menuExit)
def on_zeroconf_state_change(self, type, name, state, info):
index = self.device_list.FindItem(0, name)
if state == "Added":
deviceSettings = self.api_handler.get_settings(
info.parsed_addresses()[0])
version = info.properties.get(b"rev").decode()
fsHash = "Too old"
hwRev = "REV_A_EPD_2_13"
if 'gitTag' in deviceSettings:
version = deviceSettings["gitTag"]
if 'fsRev' in deviceSettings:
fsHash = deviceSettings['fsRev'][:7]
if (info.properties.get(b"hw_rev") is not None):
hwRev = info.properties.get(b"hw_rev").decode()
fwHash = info.properties.get(b"rev").decode()[:7]
address = info.parsed_addresses()[0]
if index == wx.NOT_FOUND:
index = self.device_list.InsertItem(
self.device_list.GetItemCount(), type)
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, version)
self.device_list.SetItem(index, 2, fwHash)
self.device_list.SetItem(index, 3, hwRev)
self.device_list.SetItem(index, 4, address)
else:
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, version)
self.device_list.SetItem(index, 2, fwHash)
self.device_list.SetItem(index, 3, hwRev)
self.device_list.SetItem(index, 4, address)
self.device_list.SetItem(index, 5, fsHash)
self.device_list.SetItemData(index, index)
self.device_list.itemDataMap[index] = [
name, version, fwHash, hwRev, address, fsHash]
for col in range(0, len(self.device_list.column_headings)):
self.device_list.SetColumnWidth(
col, wx.LIST_AUTOSIZE_USEHEADER)
elif state == "Removed":
if index != wx.NOT_FOUND:
self.device_list.DeleteItem(index)
def call_progress(self, progress):
progressPerc = int(progress*100)
self.SetStatusText(f"Progress: {progressPerc}%")
wx.CallAfter(self.update_progress, progress)
def update_progress(self, progress):
progressPerc = int(progress*100)
self.progress_bar.SetValue(progressPerc)
wx.YieldIfNeeded()
def fetch_latest_release_async(self):
# Start a new thread to execute fetch_latest_release
app_folder = get_app_data_folder()
if not os.path.exists(app_folder):
os.makedirs(app_folder)
executor = concurrent.futures.ThreadPoolExecutor()
future = executor.submit(self.releaseChecker.fetch_latest_release)
future.add_done_callback(self.handle_latest_release)
def handle_latest_release(self, future):
try:
self.latest_release = future.result() # Store the result
self.fw_label.SetLabelText(f"Downloaded firmware version: {
self.latest_release}\nCommit: {self.releaseChecker.commit_hash}")
except Exception as e:
self.fw_label.SetLabel(f"Error occurred: {str(e)}")
traceback.print_tb(e.__traceback__)
def OnOpenDownloadFolder(self, e):
wx.LaunchDefaultBrowser(get_app_data_folder())
def OnAbout(self, e):
dlg = wx.MessageDialog(
self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK)
dlg.ShowModal()
dlg.Destroy()
def OnExit(self, e):
self.Close(False)
def on_serial_flash(self, event):
if not hasattr(self, 'latest_release'):
wx.MessageBox("Please wait for firmware to be downloaded", "Error", wx.OK | wx.ICON_ERROR)
return
dlg = SerialFlashDialog(self, self.fw_updater, self.latest_release)
if dlg.ShowModal() == wx.ID_OK:
port = dlg.get_selected_port()
if port:
hw_rev = dlg.get_selected_revision()
preserve_nvs = dlg.get_preserve_nvs()
self.fw_updater.start_serial_firmware_update(
self.latest_release,
port,
hw_rev,
preserve_nvs
)
dlg.Destroy()