feat: migrate in chunks (#2936)

This commit is contained in:
Vlad Stan 2025-02-07 14:16:53 +02:00 committed by GitHub
parent e134c5c7b9
commit a8809ed5a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -602,34 +602,46 @@ async def m026_update_payment_table(db: Connection):
async def m027_update_apipayments_data(db: Connection):
result = None
try:
result = await db.execute("SELECT * FROM apipayments")
result = await db.execute("SELECT * FROM apipayments LIMIT 100")
except Exception as exc:
logger.warning("Could not select, trying again after cache cleared.")
logger.debug(exc)
await db.execute("COMMIT")
result = await db.execute("SELECT * FROM apipayments")
offset = 0
limit = 1000
payments: list[dict[Any, Any]] = []
logger.info("Updating payments")
while len(payments) > 0 or offset == 0:
logger.info(f"Updating {offset} to {offset+limit}")
payments = result.mappings().all()
for payment in payments:
tag = None
created_at = payment.get("time")
if payment.get("extra"):
extra = json.loads(payment.get("extra"))
tag = extra.get("tag")
tsph = db.timestamp_placeholder("created_at")
await db.execute(
f"""
UPDATE apipayments
SET tag = :tag, created_at = {tsph}, updated_at = {tsph}
WHERE checking_id = :checking_id
""",
{
"tag": tag,
"created_at": created_at,
"checking_id": payment.get("checking_id"),
},
result = await db.execute(
f"SELECT * FROM apipayments ORDER BY time LIMIT {limit} OFFSET {offset}"
)
payments = result.mappings().all()
logger.info(f"Payments count: {len(payments)}")
for payment in payments:
tag = None
created_at = payment.get("time")
if payment.get("extra"):
extra = json.loads(str(payment.get("extra")))
tag = extra.get("tag")
tsph = db.timestamp_placeholder("created_at")
await db.execute(
f"""
UPDATE apipayments
SET tag = :tag, created_at = {tsph}, updated_at = {tsph}
WHERE checking_id = :checking_id
""",
{
"tag": tag,
"created_at": created_at,
"checking_id": payment.get("checking_id"),
},
)
offset += limit
logger.info("Payments updated")
async def m028_update_settings(db: Connection):