btclock-ota-flasher/app/release_checker.py

137 lines
4.9 KiB
Python
Raw Normal View History

2024-06-09 23:06:53 +02:00
import json
2024-06-09 19:38:32 +02:00
import os
import requests
import wx
from typing import Callable
2024-06-09 23:06:53 +02:00
from datetime import datetime, timedelta
2024-06-09 19:38:32 +02:00
from app.utils import keep_latest_versions
2024-06-09 23:06:53 +02:00
CACHE_FILE = 'firmware/cache.json'
CACHE_DURATION = timedelta(minutes=30)
2024-06-09 19:38:32 +02:00
class ReleaseChecker:
'''Release Checker for firmware updates'''
release_name = ""
commit_hash = ""
def __init__(self):
self.progress_callback: Callable[[int], None] = None
2024-06-09 23:06:53 +02:00
def load_cache(self):
'''Load cached data from file'''
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, 'r') as f:
return json.load(f)
return {}
def save_cache(self, cache_data):
'''Save cache data to file'''
with open(CACHE_FILE, 'w') as f:
json.dump(cache_data, f)
2024-06-09 19:38:32 +02:00
def fetch_latest_release(self):
'''Fetch latest firmware release from GitHub'''
repo = "btclock/btclock_v3"
2024-06-09 23:06:53 +02:00
cache = self.load_cache()
now = datetime.now()
2024-06-09 19:38:32 +02:00
if not os.path.exists("firmware"):
os.makedirs("firmware")
2024-06-09 23:06:53 +02:00
if 'latest_release' in cache and (now - datetime.fromisoformat(cache['latest_release']['timestamp'])) < CACHE_DURATION:
latest_release = cache['latest_release']['data']
else:
url = f"https://api.github.com/repos/{repo}/releases/latest"
try:
response = requests.get(url)
response.raise_for_status()
latest_release = response.json()
cache['latest_release'] = {
'data': latest_release,
'timestamp': now.isoformat()
}
self.save_cache(cache)
except requests.RequestException as e:
raise ReleaseCheckerException(
f"Error fetching release: {e}") from e
release_name = latest_release['tag_name']
self.release_name = release_name
2024-06-09 19:38:32 +02:00
filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin",
"btclock_rev_b_213epd_firmware.bin", "littlefs.bin"]
2024-06-09 23:06:53 +02:00
asset_urls = [asset['browser_download_url']
for asset in latest_release['assets'] if asset['name'] in filenames_to_download]
if asset_urls:
for asset_url in asset_urls:
self.download_file(asset_url, release_name)
ref_url = f"https://api.github.com/repos/{
repo}/git/ref/tags/{release_name}"
if ref_url in cache and (now - datetime.fromisoformat(cache[ref_url]['timestamp'])) < CACHE_DURATION:
commit_hash = cache[ref_url]['data']
else:
2024-06-09 19:38:32 +02:00
response = requests.get(ref_url)
response.raise_for_status()
ref_info = response.json()
2024-06-09 23:06:53 +02:00
if ref_info["object"]["type"] == "commit":
commit_hash = ref_info["object"]["sha"]
2024-06-09 19:38:32 +02:00
else:
tag_url = f"https://api.github.com/repos/{
2024-06-09 23:06:53 +02:00
repo}/git/tags/{ref_info['object']['sha']}"
2024-06-09 19:38:32 +02:00
response = requests.get(tag_url)
response.raise_for_status()
tag_info = response.json()
2024-06-09 23:06:53 +02:00
commit_hash = tag_info["object"]["sha"]
cache[ref_url] = {
'data': commit_hash,
'timestamp': now.isoformat()
}
self.save_cache(cache)
2024-06-09 19:38:32 +02:00
2024-06-09 23:06:53 +02:00
self.commit_hash = commit_hash
2024-06-09 19:38:32 +02:00
2024-06-09 23:06:53 +02:00
return self.release_name
else:
2024-06-09 19:38:32 +02:00
raise ReleaseCheckerException(
2024-06-09 23:06:53 +02:00
f"File {filenames_to_download} not found in latest release")
2024-06-09 19:38:32 +02:00
def download_file(self, url, release_name):
'''Downloads Fimware Files'''
local_filename = f"{release_name}_{url.split('/')[-1]}"
response = requests.get(url, stream=True)
total_length = response.headers.get('content-length')
if not os.path.exists("firmware"):
os.makedirs("firmware")
if os.path.exists(f"firmware/{local_filename}"):
return
keep_latest_versions('firmware', 2)
if total_length is None:
raise ReleaseCheckerException("No content length header")
else:
total_length = int(total_length)
chunk_size = 1024
num_chunks = total_length // chunk_size
with open(f"firmware/{local_filename}", 'wb') as f:
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
f.flush()
progress = int((i / num_chunks) * 100)
if callable(self.progress_callback):
self.progress_callback(progress)
if callable(self.progress_callback):
self.progress_callback(100)
class ReleaseCheckerException(Exception):
pass