from quart import g, jsonify, request from http import HTTPStatus from lnurl.exceptions import InvalidUrl as LnurlInvalidUrl # type: ignore from lnbits.decorators import api_check_wallet_key, api_validate_post_request import httpx from . import jukebox_ext from .crud import ( create_jukebox, update_jukebox, get_jukebox, get_jukebox_by_user, get_jukeboxs, delete_jukebox, ) from .models import Jukebox @jukebox_ext.route("/api/v1/jukebox", methods=["GET"]) @api_check_wallet_key("invoice") async def api_get_jukeboxs(): try: return ( jsonify( [{**jukebox._asdict()} for jukebox in await get_jukeboxs(g.wallet.user)] ), HTTPStatus.OK, ) except: return "", HTTPStatus.NO_CONTENT ##################SPOTIFY AUTH##################### @jukebox_ext.route("/api/v1/jukebox/spotify/cb/", methods=["GET"]) async def api_check_credentials_callbac(juke_id): print(request.args) sp_code = "" sp_access_token = "" sp_refresh_token = "" jukebox = await get_jukebox(juke_id) if request.args.get("code"): sp_code = request.args.get("code") jukebox = await update_jukebox( juke_id=juke_id, sp_secret=jukebox.sp_secret, sp_access_token=sp_code ) if request.args.get("access_token"): sp_access_token = request.args.get("access_token") sp_refresh_token = request.args.get("refresh_token") jukebox = await update_jukebox( juke_id=juke_id, sp_secret=jukebox.sp_secret, sp_access_token=sp_access_token, sp_refresh_token=sp_refresh_token, ) return "

Success!

You can close this window

" @jukebox_ext.route("/api/v1/jukebox/spotify/", methods=["GET"]) @api_check_wallet_key("invoice") async def api_check_credentials_check(sp_id): jukebox = await get_jukebox(sp_id) return jsonify(jukebox._asdict()), HTTPStatus.CREATED @jukebox_ext.route("/api/v1/jukebox/", methods=["POST"]) @jukebox_ext.route("/api/v1/jukebox/", methods=["PUT"]) @api_check_wallet_key("admin") @api_validate_post_request( schema={ "user": {"type": "string", "empty": False, "required": True}, "title": {"type": "string", "empty": False, "required": True}, "wallet": {"type": "string", "empty": False, "required": True}, "sp_user": {"type": "string", "empty": False, "required": True}, "sp_secret": {"type": "string", "required": True}, "sp_access_token": {"type": "string", "required": False}, "sp_refresh_token": {"type": "string", "required": False}, "sp_device": {"type": "string", "required": False}, "sp_playlists": {"type": "string", "required": False}, "price": {"type": "string", "required": True}, } ) async def api_create_update_jukebox(juke_id=None): if juke_id: jukebox = await update_jukebox(juke_id=juke_id, **g.data) else: jukebox = await create_jukebox(**g.data) return jsonify(jukebox._asdict()), HTTPStatus.CREATED @jukebox_ext.route("/api/v1/jukebox/", methods=["DELETE"]) @api_check_wallet_key("admin") async def api_delete_item(juke_id): await delete_jukebox(juke_id) try: return ( jsonify( [{**jukebox._asdict()} for jukebox in await get_jukeboxs(g.wallet.user)] ), HTTPStatus.OK, ) except: return "", HTTPStatus.NO_CONTENT ################JUKEBOX ENDPOINTS################## @jukebox_ext.route("/api/v1/jukebox/jb/", methods=["GET"]) async def api_get_jukebox_songs(sp_id): jukebox = await get_jukebox(sp_id) print(jukebox.playlists.split(",")[0].split("-")[1]) return jsonify(jukebox._asdict()), HTTPStatus.CREATED