from dotenv import load_dotenv from flask import Flask, jsonify, request import time import os import base64 import requests import json app = Flask(__name__) load_dotenv() # loads CLIENT_ID and CLIENT_SECRET etc. SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token" _token = {"access_token": None, "expires_at": 0} # in-memory cache rickroll = False def refresh_access_token(): client_id = os.getenv("CLIENT") client_secret = os.getenv("SECRET") refresh_token = os.getenv("REFRESH_TOKEN") if not client_id or not client_secret or not refresh_token: return {"statusCode": 400, "body": "Missing Spotify API credentials"} # Headers credentials = f"{client_id}:{client_secret}" credentials_bytes = credentials.encode('ascii') # Base64 encode encoded_credentials = base64.b64encode(credentials_bytes) # Since ubinascii.b2a_base64() adds a newline at the end, we strip it off encoded_credentials = encoded_credentials.strip() # Convert bytes back to string if necessary encoded_credentials_str = encoded_credentials.decode('utf-8') headers = { "Authorization": f"Basic {encoded_credentials_str}", "Content-Type": "application/x-www-form-urlencoded" } data = f"grant_type=refresh_token&refresh_token={refresh_token}" # Request a new access token response = requests.post(SPOTIFY_TOKEN_URL, headers=headers, data=data) #{ token_data = response.json() if "access_token" in token_data: new_access_token = token_data["access_token"] new_expires_in = int(token_data.get("expires_in", 3600)) print("New access token obtained.", new_access_token, new_expires_in) return new_access_token, new_expires_in else: raise Exception("Failed to refresh access token: " + json.dumps(token_data)) return None, None def get_access_token(): global _token # refresh if missing or expiring in < 60s if not _token["access_token"] or _token["expires_at"] - time.time() < 60: _token["access_token"], expires_in = refresh_access_token() # expires_in is seconds from now _token["expires_at"] = time.time() + expires_in # int(data.get("expires_in", 3600)) return _token["access_token"] @app.get("/rick") def rick(): global rickroll rickroll = not rickroll print("Rickroll set to", rickroll) return jsonify(message=f"Rickroll set: {rickroll}") @app.post("/play") def play(): data = request.get_json() if rickroll: album_uri = "spotify:album:6eUW0wxWtzkFdaEFsTJto6" # Rick Astley - Never gonna give you up else: album_uri = data.get("album") if not album_uri: return jsonify(error="Missing 'album'"), 400 headers = { "Authorization": f"Bearer {get_access_token()}", "Content-Type": "application/json" } body = { "context_uri": album_uri, } r = requests.put("https://api.spotify.com/v1/me/player/play", headers=headers, json=body) if r.status_code == 204: return jsonify(message="Playback started", album=album_uri) else: return jsonify(error="Spotify API error", status=r.status_code, detail=r.text), r.status_code @app.get("/healthz") def health(): return jsonify(ok=True) if __name__ == '__main__': app.run()