import importlib import re from typing import Any from loguru import logger from lnbits.db import Connection from lnbits.extension_manager import Extension from . import db as core_db from .crud import update_migration_version async def migrate_extension_database(ext: Extension, current_version): try: ext_migrations = importlib.import_module(f"{ext.module_name}.migrations") ext_db = importlib.import_module(ext.module_name).db except ImportError as e: logger.error(e) raise ImportError( f"Please make sure that the extension `{ext.code}` has a migrations file." ) async with ext_db.connect() as ext_conn: await run_migration(ext_conn, ext_migrations, current_version) async def run_migration(db: Connection, migrations_module: Any, current_version: int): matcher = re.compile(r"^m(\d\d\d)_") db_name = migrations_module.__name__.split(".")[-2] for key, migrate in migrations_module.__dict__.items(): match = matcher.match(key) if match: version = int(match.group(1)) if version > current_version: logger.debug(f"running migration {db_name}.{version}") print(f"running migration {db_name}.{version}") await migrate(db) if db.schema is None: await update_migration_version(db, db_name, version) else: async with core_db.connect() as conn: await update_migration_version(conn, db_name, version)