import math from http import HTTPStatus import json import httpx import random import os import time from fastapi import Query from fastapi.params import Depends from lnurl import decode as decode_lnurl from loguru import logger from starlette.exceptions import HTTPException from lnbits.core.crud import get_wallet_for_key from lnbits.core.crud import get_user from lnbits.core.services import create_invoice from lnbits.core.views.api import api_payment, api_wallet from lnbits.decorators import WalletTypeInfo, get_key_type, require_admin_key from fastapi.templating import Jinja2Templates from . import gerty_ext from .crud import create_gerty, update_gerty, delete_gerty, get_gerty, get_gertys from .models import Gerty from lnbits.utils.exchange_rates import satoshis_amount_as_fiat from ...settings import LNBITS_PATH @gerty_ext.get("/api/v1/gerty", status_code=HTTPStatus.OK) async def api_gertys( all_wallets: bool = Query(False), wallet: WalletTypeInfo = Depends(get_key_type) ): wallet_ids = [wallet.wallet.id] if all_wallets: wallet_ids = (await get_user(wallet.wallet.user)).wallet_ids return [gerty.dict() for gerty in await get_gertys(wallet_ids)] @gerty_ext.post("/api/v1/gerty", status_code=HTTPStatus.CREATED) @gerty_ext.put("/api/v1/gerty/{gerty_id}", status_code=HTTPStatus.OK) async def api_link_create_or_update( data: Gerty, wallet: WalletTypeInfo = Depends(get_key_type), gerty_id: str = Query(None), ): if gerty_id: gerty = await get_gerty(gerty_id) if not gerty: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Gerty does not exist" ) if gerty.wallet != wallet.wallet.id: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Come on, seriously, this isn't your Gerty!", ) data.wallet = wallet.wallet.id gerty = await update_gerty(gerty_id, **data.dict()) else: gerty = await create_gerty(wallet_id=wallet.wallet.id, data=data) return {**gerty.dict()} @gerty_ext.delete("/api/v1/gerty/{gerty_id}") async def api_gerty_delete( gerty_id: str, wallet: WalletTypeInfo = Depends(require_admin_key) ): gerty = await get_gerty(gerty_id) if not gerty: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Gerty does not exist." ) if gerty.wallet != wallet.wallet.id: raise HTTPException(status_code=HTTPStatus.FORBIDDEN, detail="Not your Gerty.") await delete_gerty(gerty_id) raise HTTPException(status_code=HTTPStatus.NO_CONTENT) ####################### with open(os.path.join(LNBITS_PATH, 'extensions/gerty/static/satoshi.json')) as fd: satoshiQuotes = json.load(fd) @gerty_ext.get("/api/v1/gerty/satoshiquote", status_code=HTTPStatus.OK) async def api_gerty_satoshi(): return satoshiQuotes[random.randint(0, 100)] @gerty_ext.get("/api/v1/gerty/{gerty_id}") async def api_gerty_json( gerty_id: str, p: int = None # page number ): gerty = await get_gerty(gerty_id) if not gerty: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Gerty does not exist." ) display_preferences = json.loads(gerty.display_preferences) enabled_screen_count = 0 enabled_screens = [] for screen_slug in display_preferences: is_screen_enabled = display_preferences[screen_slug] if is_screen_enabled: enabled_screen_count += 1 enabled_screens.append(screen_slug) text = await get_screen_text(p, enabled_screens, gerty) next_screen_number = 0 if ((p + 1) >= enabled_screen_count) else p + 1; # Get Exchange Value exchange = [] # if gerty.exchange != "": # try: # amount = await satoshis_amount_as_fiat(100000000, gerty.exchange) # if amount: # exchange.append({ # "fiat": gerty.exchange, # "amount": amount, # }) # except: # pass # # onchain = [] # if gerty.onchain_stats and isinstance(gerty.mempool_endpoint, str): # async with httpx.AsyncClient() as client: # difficulty = [] # r = await client.get(gerty.mempool_endpoint + "/api/v1/difficulty-adjustment") # if r: # difficulty.append(r.json()) # onchain.append({"difficulty":difficulty}) # mempool = [] # r = await client.get(gerty.mempool_endpoint + "/api/v1/fees/mempool-blocks") # if r: # mempool.append(r.json()) # onchain.append({"mempool":mempool}) # threed = [] # r = await client.get(gerty.mempool_endpoint + "/api/v1/mining/hashrate/3d") # if r: # threed.append(r.json()) # onchain.append({"threed":threed}) # ln = [] # if gerty.ln_stats and isinstance(gerty.mempool_endpoint, str): # async with httpx.AsyncClient() as client: # r = await client.get(gerty.mempool_endpoint + "/api/v1/lightning/statistics/latest") # if r: # ln.append(r.json()) return { "settings": { "refreshTime": gerty.refresh_time, "requestTimestamp": math.ceil(time.time()), "nextScreenNumber": next_screen_number, "name": gerty.name }, "screen": { "slug": get_screen_slug_by_index(p, enabled_screens), "group": get_screen_slug_by_index(p, enabled_screens), "text": text } } # Get a screen slug by its position in the screens_list def get_screen_slug_by_index(index: int, screens_list): return list(screens_list)[index] # Get a list of text items for the screen number async def get_screen_text(screen_num: int, screens_list: dict, gerty): screen_slug = get_screen_slug_by_index(screen_num, screens_list) # first get the relevant slug from the display_preferences logger.debug('screen_slug') logger.debug(screen_slug) # text = [] if screen_slug == "lnbits_wallets_balance": text = await get_lnbits_wallet_balances(gerty) elif screen_slug == "fun_satoshi_quotes": text = await get_satoshi_quotes() elif screen_slug == "fun_pieter_wuille_facts": text = await get_placeholder_text() elif screen_slug == "fun_exchange_market_rate": text = await get_placeholder_text() elif screen_slug == "onchain_difficulty_epoch_progress": text = await get_placeholder_text() elif screen_slug == "onchain_difficulty_retarget_date": text = await get_placeholder_text() elif screen_slug == "onchain_difficulty_blocks_remaining": text = await get_placeholder_text() elif screen_slug == "onchain_difficulty_epoch_time_remaining": text = await get_placeholder_text() elif screen_slug == "mempool_recommended_fees": text = await get_placeholder_text() elif screen_slug == "mempool_tx_count": text = await get_placeholder_text() elif screen_slug == "mining_current_hash_rate": text = await get_placeholder_text() elif screen_slug == "mining_current_difficulty": text = await get_placeholder_text() elif screen_slug == "lightning_channel_count": text = await get_placeholder_text() elif screen_slug == "lightning_node_count": text = await get_placeholder_text() elif screen_slug == "lightning_tor_node_count": text = await get_placeholder_text() elif screen_slug == "lightning_clearnet_nodes": text = await get_placeholder_text() elif screen_slug == "lightning_unannounced_nodes": text = await get_placeholder_text() elif screen_slug == "lightning_average_channel_capacity": text = await get_placeholder_text() return text async def get_lnbits_wallet_balances(gerty): # Get Wallet info wallets = [] if gerty.lnbits_wallets != "": for lnbits_wallet in json.loads(gerty.lnbits_wallets): wallet = await get_wallet_for_key(key=lnbits_wallet) logger.debug(wallet) if wallet: wallets.append({ "name": wallet.name, "balance": wallet.balance_msat, "inkey": wallet.inkey, }) return wallets async def get_placeholder_text(): return [ get_text_item_dict("Some placeholder text", 16, 10, 50), get_text_item_dict("Some placeholder text", 16, 10, 50) ] async def get_satoshi_quotes(): # Get Satoshi quotes text = [] quote = await api_gerty_satoshi() if quote: if quote['text']: text.append(get_text_item_dict(quote['text'], 16)) if quote['date']: text.append(get_text_item_dict(quote['date'], 12)) return text # A helper function get a nicely formated dict for the text def get_text_item_dict(text: str, font_size: int, x_pos: int = None, y_pos: int = None): text = { "value": text, "size": font_size } if x_pos is None and y_pos is None: text['position'] = 'center' else: text['x'] = x_pos text['y'] = y_pos return text