mirror of
https://github.com/lnbits/lnbits-legend.git
synced 2025-02-24 22:58:46 +01:00
179 lines
6.3 KiB
Python
179 lines
6.3 KiB
Python
from quart import g, jsonify, request
|
|
from http import HTTPStatus
|
|
from lnurl.exceptions import InvalidUrl as LnurlInvalidUrl # type: ignore
|
|
from base64 import urlsafe_b64encode
|
|
import base64
|
|
|
|
from lnbits.decorators import api_check_wallet_key, api_validate_post_request
|
|
import httpx
|
|
from . import jukebox_ext
|
|
from .crud import (
|
|
create_jukebox,
|
|
update_jukebox,
|
|
get_jukebox,
|
|
get_jukebox_by_user,
|
|
get_jukeboxs,
|
|
delete_jukebox,
|
|
)
|
|
from .models import Jukebox
|
|
|
|
|
|
@jukebox_ext.route("/api/v1/jukebox", methods=["GET"])
|
|
@api_check_wallet_key("invoice")
|
|
async def api_get_jukeboxs():
|
|
try:
|
|
return (
|
|
jsonify(
|
|
[{**jukebox._asdict()} for jukebox in await get_jukeboxs(g.wallet.user)]
|
|
),
|
|
HTTPStatus.OK,
|
|
)
|
|
except:
|
|
return "", HTTPStatus.NO_CONTENT
|
|
|
|
|
|
##################SPOTIFY AUTH#####################
|
|
|
|
|
|
@jukebox_ext.route("/api/v1/jukebox/spotify/cb/<juke_id>", methods=["GET"])
|
|
async def api_check_credentials_callbac(juke_id):
|
|
print(request.args)
|
|
sp_code = ""
|
|
sp_access_token = ""
|
|
sp_refresh_token = ""
|
|
jukebox = await get_jukebox(juke_id)
|
|
if request.args.get("code"):
|
|
sp_code = request.args.get("code")
|
|
jukebox = await update_jukebox(
|
|
juke_id=juke_id, sp_secret=jukebox.sp_secret, sp_access_token=sp_code
|
|
)
|
|
if request.args.get("access_token"):
|
|
sp_access_token = request.args.get("access_token")
|
|
sp_refresh_token = request.args.get("refresh_token")
|
|
jukebox = await update_jukebox(
|
|
juke_id=juke_id,
|
|
sp_secret=jukebox.sp_secret,
|
|
sp_access_token=sp_access_token,
|
|
sp_refresh_token=sp_refresh_token,
|
|
)
|
|
return "<h1>Success!</h1><h2>You can close this window</h2>"
|
|
|
|
|
|
@jukebox_ext.route("/api/v1/jukebox/spotify/<sp_id>", methods=["GET"])
|
|
@api_check_wallet_key("invoice")
|
|
async def api_check_credentials_check(sp_id):
|
|
jukebox = await get_jukebox(sp_id)
|
|
return jsonify(jukebox._asdict()), HTTPStatus.CREATED
|
|
|
|
|
|
@jukebox_ext.route("/api/v1/jukebox/", methods=["POST"])
|
|
@jukebox_ext.route("/api/v1/jukebox/<juke_id>", methods=["PUT"])
|
|
@api_check_wallet_key("admin")
|
|
@api_validate_post_request(
|
|
schema={
|
|
"user": {"type": "string", "empty": False, "required": True},
|
|
"title": {"type": "string", "empty": False, "required": True},
|
|
"wallet": {"type": "string", "empty": False, "required": True},
|
|
"sp_user": {"type": "string", "empty": False, "required": True},
|
|
"sp_secret": {"type": "string", "required": True},
|
|
"sp_access_token": {"type": "string", "required": False},
|
|
"sp_refresh_token": {"type": "string", "required": False},
|
|
"sp_device": {"type": "string", "required": False},
|
|
"sp_playlists": {"type": "string", "required": False},
|
|
"price": {"type": "string", "required": True},
|
|
}
|
|
)
|
|
async def api_create_update_jukebox(juke_id=None):
|
|
if juke_id:
|
|
jukebox = await update_jukebox(juke_id=juke_id, **g.data)
|
|
else:
|
|
jukebox = await create_jukebox(**g.data)
|
|
return jsonify(jukebox._asdict()), HTTPStatus.CREATED
|
|
|
|
|
|
@jukebox_ext.route("/api/v1/jukebox/<juke_id>", methods=["DELETE"])
|
|
@api_check_wallet_key("admin")
|
|
async def api_delete_item(juke_id):
|
|
await delete_jukebox(juke_id)
|
|
try:
|
|
return (
|
|
jsonify(
|
|
[{**jukebox._asdict()} for jukebox in await get_jukeboxs(g.wallet.user)]
|
|
),
|
|
HTTPStatus.OK,
|
|
)
|
|
except:
|
|
return "", HTTPStatus.NO_CONTENT
|
|
|
|
|
|
################JUKEBOX ENDPOINTS##################
|
|
|
|
|
|
@jukebox_ext.route("/api/v1/jukebox/jb/<sp_id>", methods=["GET"])
|
|
async def api_get_jukebox_songs(sp_id):
|
|
jukebox = await get_jukebox(sp_id)
|
|
print(jukebox.sp_playlists.split(",")[0].split("-")[1])
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
r = await client.get(
|
|
"https://api.spotify.com/v1/playlists/"
|
|
+ jukebox.sp_playlists.split(",")[0].split("-")[1]
|
|
+ "/tracks",
|
|
timeout=40,
|
|
headers={"Authorization": "Bearer " + jukebox.sp_access_token},
|
|
)
|
|
if r.json()["error"]["status"] == 401:
|
|
token = await api_get_token(sp_id)
|
|
if token['error'] == 'invalid_client':
|
|
print("invalid")
|
|
return ""
|
|
else:
|
|
return await api_get_jukebox_songs(sp_id)
|
|
print(r.json()["items"])
|
|
resp = r.json()["items"][0]
|
|
print("id: " + resp["track"]["id"])
|
|
print("name: " + resp["track"]["name"])
|
|
print("album: " + resp["track"]["album"]["name"])
|
|
print("artist: " + resp["track"]["artists"][0]["name"])
|
|
print("image: " + resp["track"]["album"]["images"][0])
|
|
except AssertionError:
|
|
something = None
|
|
return jsonify(jukebox._asdict()), HTTPStatus.CREATED
|
|
|
|
|
|
@jukebox_ext.route("/api/v1/jukebox/jb/<sp_id>", methods=["GET"])
|
|
async def api_get_token(sp_id):
|
|
jukebox = await get_jukebox(sp_id)
|
|
print(jukebox.sp_playlists.split(",")[0].split("-")[1])
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
r = await client.post(
|
|
"https://accounts.spotify.com/api/token",
|
|
timeout=40,
|
|
params={
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": jukebox.sp_refresh_token,
|
|
"client_id": jukebox.sp_user,
|
|
},
|
|
headers={
|
|
"Authorization": "Bearer "
|
|
+ base64.b64encode(
|
|
(jukebox.sp_user + ":" + jukebox.sp_refresh_token).encode(
|
|
"utf-8"
|
|
)
|
|
).decode("ascii"),
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
},
|
|
)
|
|
print(r)
|
|
print(r.json())
|
|
if r.json()['error'] == 'invalid_client':
|
|
return r.json()
|
|
#await update_jukebox(
|
|
# juke_id=sp_id,
|
|
# sp_access_token=r.json()["access_token"],
|
|
# sp_refresh_token=r.json()["refresh_token"],
|
|
#)
|
|
except AssertionError:
|
|
something = None
|
|
return jsonify(jukebox._asdict()), HTTPStatus.CREATED
|