Add Live Ozan Radio — DeepSeek DJ, Lyria 3, and player chat.
Personal AI station that generates tracks from taste seeds or Spotify; safe to share — secrets and cache are gitignored. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
"""Live Ozan Radio — AI DJ powered by DeepSeek + Google Lyria 3."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
@@ -0,0 +1,61 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
import uvicorn
|
||||
|
||||
from ozan_radio.config import Config
|
||||
from ozan_radio.dj import DeepSeekDJ
|
||||
from ozan_radio.lyria import LyriaEngine
|
||||
from ozan_radio.queue import RadioQueue
|
||||
from ozan_radio.server import app
|
||||
from ozan_radio.spotify import SpotifyTaste
|
||||
from ozan_radio.taste import load_taste_seeds
|
||||
|
||||
|
||||
async def generate_one() -> None:
|
||||
"""CLI: generate a single track and print the result."""
|
||||
cfg = Config.from_env()
|
||||
taste = await SpotifyTaste(cfg).fetch_taste()
|
||||
seeds = None if taste else load_taste_seeds()
|
||||
if taste:
|
||||
print(f"Taste: {taste.summary}\n")
|
||||
elif seeds:
|
||||
print(f"Taste seeds: {seeds.summary}\n")
|
||||
else:
|
||||
print("No Spotify or seeds — DJ will freestyle.\n")
|
||||
|
||||
q = RadioQueue(cfg.output_dir)
|
||||
plan = await DeepSeekDJ(cfg).plan_next(taste, q.recent_titles, seeds)
|
||||
print(f"DJ: {plan.dj_line}")
|
||||
print(f"Title: {plan.title}")
|
||||
print(f"Prompt: {plan.lyria_prompt}\n")
|
||||
print("Generating with Lyria 3…")
|
||||
|
||||
track = LyriaEngine(cfg).generate(plan)
|
||||
q.add(track)
|
||||
print(f"Saved: {track.audio_path}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Live Ozan Radio")
|
||||
parser.add_argument(
|
||||
"command",
|
||||
nargs="?",
|
||||
default="serve",
|
||||
choices=["serve", "generate"],
|
||||
help="serve = start radio server, generate = one-shot track",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command == "generate":
|
||||
asyncio.run(generate_one())
|
||||
return
|
||||
|
||||
cfg = Config.from_env()
|
||||
uvicorn.run(app, host=cfg.radio_host, port=cfg.radio_port, log_level="info")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,40 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from time import time
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChatTurn:
|
||||
role: str
|
||||
content: str
|
||||
ts: float = field(default_factory=time)
|
||||
|
||||
|
||||
class ChatStore:
|
||||
"""In-memory chat history for the DJ panel."""
|
||||
|
||||
def __init__(self, max_turns: int = 40) -> None:
|
||||
self._turns: list[ChatTurn] = []
|
||||
self._max = max_turns
|
||||
self.pending_vibe: str = ""
|
||||
|
||||
def add(self, role: str, content: str) -> None:
|
||||
self._turns.append(ChatTurn(role=role, content=content))
|
||||
if len(self._turns) > self._max:
|
||||
self._turns = self._turns[-self._max :]
|
||||
|
||||
def history(self) -> list[dict]:
|
||||
return [{"role": t.role, "content": t.content} for t in self._turns]
|
||||
|
||||
def public_log(self) -> list[dict]:
|
||||
return [
|
||||
{"role": t.role, "content": t.content, "ts": t.ts}
|
||||
for t in self._turns
|
||||
if t.role in ("user", "dj")
|
||||
]
|
||||
|
||||
def take_vibe_hint(self) -> str:
|
||||
hint = self.pending_vibe.strip()
|
||||
self.pending_vibe = ""
|
||||
return hint
|
||||
@@ -0,0 +1,60 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Config:
|
||||
gemini_api_key: str
|
||||
deepseek_base_url: str
|
||||
deepseek_api_key: str
|
||||
deepseek_model: str
|
||||
spotify_client_id: str
|
||||
spotify_client_secret: str
|
||||
spotify_refresh_token: str
|
||||
radio_host: str
|
||||
radio_port: int
|
||||
output_dir: Path
|
||||
lyria_model: str
|
||||
|
||||
@classmethod
|
||||
def from_env(cls) -> Config:
|
||||
output = Path(os.getenv("RADIO_OUTPUT_DIR", "./radio_cache"))
|
||||
output.mkdir(parents=True, exist_ok=True)
|
||||
return cls(
|
||||
gemini_api_key=os.getenv("GEMINI_API_KEY", ""),
|
||||
deepseek_base_url=os.getenv(
|
||||
"DEEPSEEK_BASE_URL", "https://api.deepseek.com/v1"
|
||||
),
|
||||
deepseek_api_key=os.getenv("DEEPSEEK_API_KEY", ""),
|
||||
deepseek_model=os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
|
||||
spotify_client_id=os.getenv("SPOTIFY_CLIENT_ID", ""),
|
||||
spotify_client_secret=os.getenv("SPOTIFY_CLIENT_SECRET", ""),
|
||||
spotify_refresh_token=os.getenv("SPOTIFY_REFRESH_TOKEN", ""),
|
||||
radio_host=os.getenv("RADIO_HOST", "127.0.0.1"),
|
||||
radio_port=int(os.getenv("RADIO_PORT", "8787")),
|
||||
output_dir=output,
|
||||
lyria_model=os.getenv("LYRIA_MODEL", "lyria-3-pro-preview"),
|
||||
)
|
||||
|
||||
def require_gemini(self) -> None:
|
||||
if not self.gemini_api_key:
|
||||
raise RuntimeError("GEMINI_API_KEY is required for Lyria music generation")
|
||||
|
||||
def require_deepseek(self) -> None:
|
||||
if not self.deepseek_api_key:
|
||||
raise RuntimeError("DEEPSEEK_API_KEY is required for the DJ brain")
|
||||
|
||||
@property
|
||||
def spotify_configured(self) -> bool:
|
||||
return bool(
|
||||
self.spotify_client_id
|
||||
and self.spotify_client_secret
|
||||
and self.spotify_refresh_token
|
||||
)
|
||||
@@ -0,0 +1,146 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
|
||||
import httpx
|
||||
|
||||
from ozan_radio.config import Config
|
||||
from ozan_radio.spotify import TasteProfile
|
||||
from ozan_radio.taste import TasteSeeds
|
||||
|
||||
CHAT_SYSTEM = """You are the on-air DJ for Live Ozan Radio. Chat with the listener in a warm,
|
||||
concise radio-host voice. You never play catalog music — only AI-generated tracks via Lyria 3.
|
||||
|
||||
When the listener asks for a vibe, mood, genre, or "play something X", set action to "generate"
|
||||
and include a vibe_hint Lyria can use. Otherwise action is "none".
|
||||
|
||||
Respond with JSON only:
|
||||
{
|
||||
"reply": "your on-air response (1-3 sentences)",
|
||||
"action": "none" or "generate",
|
||||
"vibe_hint": "optional — detailed direction for the next composed track"
|
||||
}
|
||||
"""
|
||||
|
||||
DJ_SYSTEM = """You are the DJ for Live Ozan Radio — a personal AI station that never plays
|
||||
catalog music. Every track is generated fresh by Google Lyria 3 based on your prompts.
|
||||
|
||||
Your job:
|
||||
1. Read the listener's Spotify taste (if provided).
|
||||
2. Pick a mood, tempo, and genre blend that feels like a natural next track.
|
||||
3. Write a Lyria prompt that produces a 1–2 minute instrumental or vocal track.
|
||||
4. Keep variety — don't repeat the same vibe twice in a row.
|
||||
5. Favor warm, groove-forward, slightly eclectic picks (Ozan's lane).
|
||||
6. West African / Sahel / desert blues / griot energy is on-brand (think Baaba Maal warmth).
|
||||
|
||||
Respond with JSON only:
|
||||
{
|
||||
"title": "short track title",
|
||||
"mood": "one-line mood",
|
||||
"dj_line": "what you'd say on air (1 sentence)",
|
||||
"lyria_prompt": "detailed prompt for Lyria 3 Pro — structure, instruments, tempo, energy"
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class TrackPlan:
|
||||
id: str
|
||||
title: str
|
||||
mood: str
|
||||
dj_line: str
|
||||
lyria_prompt: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChatReply:
|
||||
reply: str
|
||||
action: str
|
||||
vibe_hint: str
|
||||
|
||||
|
||||
class DeepSeekDJ:
|
||||
"""DeepSeek orchestrates what plays next — taste in, Lyria prompt out."""
|
||||
|
||||
def __init__(self, config: Config) -> None:
|
||||
self._config = config
|
||||
|
||||
async def _completion(self, messages: list[dict], *, json_mode: bool = False) -> str:
|
||||
self._config.require_deepseek()
|
||||
payload: dict = {
|
||||
"model": self._config.deepseek_model,
|
||||
"messages": messages,
|
||||
"temperature": 0.85,
|
||||
}
|
||||
if json_mode:
|
||||
payload["response_format"] = {"type": "json_object"}
|
||||
|
||||
url = f"{self._config.deepseek_base_url.rstrip('/')}/chat/completions"
|
||||
async with httpx.AsyncClient(timeout=120) as client:
|
||||
resp = await client.post(
|
||||
url,
|
||||
headers={"Authorization": f"Bearer {self._config.deepseek_api_key}"},
|
||||
json=payload,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()["choices"][0]["message"]["content"]
|
||||
|
||||
async def chat(
|
||||
self,
|
||||
message: str,
|
||||
history: list[dict],
|
||||
*,
|
||||
now_playing: str | None = None,
|
||||
) -> ChatReply:
|
||||
context = [f"Listener says: {message}"]
|
||||
if now_playing:
|
||||
context.append(f"Currently playing: {now_playing}")
|
||||
|
||||
messages = [{"role": "system", "content": CHAT_SYSTEM}]
|
||||
for turn in history[-8:]:
|
||||
role = "assistant" if turn["role"] == "dj" else turn["role"]
|
||||
messages.append({"role": role, "content": turn["content"]})
|
||||
messages.append({"role": "user", "content": "\n".join(context)})
|
||||
|
||||
data = json.loads(await self._completion(messages, json_mode=True))
|
||||
return ChatReply(
|
||||
reply=data.get("reply", "Copy that — stay tuned."),
|
||||
action=data.get("action", "none"),
|
||||
vibe_hint=data.get("vibe_hint", ""),
|
||||
)
|
||||
|
||||
async def plan_next(
|
||||
self,
|
||||
taste: TasteProfile | None,
|
||||
recent_titles: list[str],
|
||||
seeds: TasteSeeds | None = None,
|
||||
request: str | None = None,
|
||||
) -> TrackPlan:
|
||||
self._config.require_deepseek()
|
||||
|
||||
user_parts = ["Plan the next generated track for Live Ozan Radio."]
|
||||
if taste:
|
||||
user_parts.append(f"Spotify taste: {taste.summary}")
|
||||
if taste.top_genres:
|
||||
user_parts.append(f"Genres: {', '.join(taste.top_genres[:8])}")
|
||||
elif seeds:
|
||||
user_parts.append(f"Taste seeds (no Spotify): {seeds.summary}")
|
||||
if recent_titles:
|
||||
user_parts.append(f"Already played (avoid repeating): {', '.join(recent_titles[-5:])}")
|
||||
if request:
|
||||
user_parts.append(f"Listener request: {request}")
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": DJ_SYSTEM},
|
||||
{"role": "user", "content": "\n".join(user_parts)},
|
||||
]
|
||||
data = json.loads(await self._completion(messages, json_mode=True))
|
||||
return TrackPlan(
|
||||
id=str(uuid.uuid4())[:8],
|
||||
title=data.get("title", "Untitled Transmission"),
|
||||
mood=data.get("mood", ""),
|
||||
dj_line=data.get("dj_line", "Coming up — something new, just for you."),
|
||||
lyria_prompt=data.get("lyria_prompt", "An upbeat instrumental groove."),
|
||||
)
|
||||
@@ -0,0 +1,60 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from google import genai
|
||||
from google.genai import types
|
||||
|
||||
from ozan_radio.config import Config
|
||||
from ozan_radio.dj import TrackPlan
|
||||
|
||||
|
||||
@dataclass
|
||||
class GeneratedTrack:
|
||||
plan: TrackPlan
|
||||
audio_path: Path
|
||||
lyrics: str
|
||||
|
||||
|
||||
class LyriaEngine:
|
||||
"""Google Lyria 3 — generates tracks from DJ prompts."""
|
||||
|
||||
def __init__(self, config: Config) -> None:
|
||||
self._config = config
|
||||
config.require_gemini()
|
||||
self._client = genai.Client(api_key=config.gemini_api_key)
|
||||
|
||||
def generate(self, plan: TrackPlan) -> GeneratedTrack:
|
||||
prompt = (
|
||||
f"{plan.lyria_prompt}\n\n"
|
||||
"Instrumental preferred unless the prompt explicitly asks for vocals. "
|
||||
"High fidelity, stereo, radio-ready mix."
|
||||
)
|
||||
|
||||
response = self._client.models.generate_content(
|
||||
model=self._config.lyria_model,
|
||||
contents=prompt,
|
||||
config=types.GenerateContentConfig(
|
||||
response_modalities=["AUDIO", "TEXT"],
|
||||
),
|
||||
)
|
||||
|
||||
lyrics = ""
|
||||
audio_bytes: bytes | None = None
|
||||
|
||||
for part in response.parts:
|
||||
if part.text:
|
||||
lyrics += part.text + "\n"
|
||||
elif part.inline_data and part.inline_data.data:
|
||||
audio_bytes = part.inline_data.data
|
||||
|
||||
if not audio_bytes:
|
||||
raise RuntimeError(f"Lyria returned no audio for track {plan.id}")
|
||||
|
||||
ext = "mp3"
|
||||
safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in plan.title)[:40]
|
||||
out_path = self._config.output_dir / f"{plan.id}_{safe_title}.{ext}"
|
||||
out_path.write_bytes(audio_bytes)
|
||||
|
||||
return GeneratedTrack(plan=plan, audio_path=out_path, lyrics=lyrics.strip())
|
||||
@@ -0,0 +1,149 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from ozan_radio.dj import TrackPlan
|
||||
from ozan_radio.lyria import GeneratedTrack
|
||||
|
||||
|
||||
@dataclass
|
||||
class NowPlaying:
|
||||
track_id: str
|
||||
title: str
|
||||
mood: str
|
||||
dj_line: str
|
||||
audio_url: str
|
||||
lyrics: str
|
||||
|
||||
|
||||
class RadioQueue:
|
||||
"""Simple in-memory + disk queue for generated tracks."""
|
||||
|
||||
def __init__(self, output_dir: Path) -> None:
|
||||
self._tracks: list[GeneratedTrack] = []
|
||||
self._index = 0
|
||||
self._manifest = output_dir / "manifest.json"
|
||||
self._output_dir = output_dir
|
||||
self._load_manifest()
|
||||
|
||||
def _load_manifest(self) -> None:
|
||||
if self._manifest.exists():
|
||||
try:
|
||||
data = json.loads(self._manifest.read_text(encoding="utf-8"))
|
||||
self._index = data.get("index", 0)
|
||||
for entry in data.get("tracks", []):
|
||||
path = self._output_dir / entry["file"]
|
||||
if not path.exists():
|
||||
continue
|
||||
plan = TrackPlan(
|
||||
id=entry.get("id", "????"),
|
||||
title=entry.get("title", path.stem),
|
||||
mood=entry.get("mood", ""),
|
||||
dj_line=entry.get("dj_line", ""),
|
||||
lyria_prompt=entry.get("lyria_prompt", ""),
|
||||
)
|
||||
self._tracks.append(
|
||||
GeneratedTrack(plan=plan, audio_path=path, lyrics=entry.get("lyrics", ""))
|
||||
)
|
||||
if self._tracks:
|
||||
return
|
||||
except (json.JSONDecodeError, OSError, KeyError):
|
||||
self._tracks = []
|
||||
self._index = 0
|
||||
|
||||
self._restore_from_filenames()
|
||||
|
||||
def _restore_from_filenames(self) -> None:
|
||||
for path in sorted(self._output_dir.glob("*.mp3")):
|
||||
stem = path.stem
|
||||
if "_" in stem:
|
||||
track_id, title = stem.split("_", 1)
|
||||
title = title.replace("_", " ")
|
||||
else:
|
||||
track_id, title = stem[:8], stem
|
||||
plan = TrackPlan(
|
||||
id=track_id,
|
||||
title=title,
|
||||
mood="",
|
||||
dj_line="Restored from cache.",
|
||||
lyria_prompt="",
|
||||
)
|
||||
self._tracks.append(GeneratedTrack(plan=plan, audio_path=path, lyrics=""))
|
||||
|
||||
def _save_manifest(self) -> None:
|
||||
payload = {
|
||||
"index": self._index,
|
||||
"count": len(self._tracks),
|
||||
"tracks": [
|
||||
{
|
||||
"id": t.plan.id,
|
||||
"title": t.plan.title,
|
||||
"mood": t.plan.mood,
|
||||
"dj_line": t.plan.dj_line,
|
||||
"lyria_prompt": t.plan.lyria_prompt,
|
||||
"lyrics": t.lyrics,
|
||||
"file": t.audio_path.name,
|
||||
}
|
||||
for t in self._tracks
|
||||
],
|
||||
}
|
||||
self._manifest.write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
||||
|
||||
@property
|
||||
def length(self) -> int:
|
||||
return len(self._tracks)
|
||||
|
||||
def add(self, track: GeneratedTrack) -> None:
|
||||
self._tracks.append(track)
|
||||
self._save_manifest()
|
||||
|
||||
@property
|
||||
def recent_titles(self) -> list[str]:
|
||||
return [t.plan.title for t in self._tracks]
|
||||
|
||||
def current(self) -> GeneratedTrack | None:
|
||||
if not self._tracks:
|
||||
return None
|
||||
if self._index >= len(self._tracks):
|
||||
self._index = 0
|
||||
return self._tracks[self._index]
|
||||
|
||||
def advance(self) -> GeneratedTrack | None:
|
||||
if not self._tracks:
|
||||
return None
|
||||
self._index = (self._index + 1) % len(self._tracks)
|
||||
self._save_manifest()
|
||||
return self.current()
|
||||
|
||||
def now_playing(self, base_url: str = "") -> NowPlaying | None:
|
||||
track = self.current()
|
||||
if not track:
|
||||
return None
|
||||
rel = track.audio_path.name
|
||||
return NowPlaying(
|
||||
track_id=track.plan.id,
|
||||
title=track.plan.title,
|
||||
mood=track.plan.mood,
|
||||
dj_line=track.plan.dj_line,
|
||||
audio_url=f"{base_url}/stream/{rel}",
|
||||
lyrics=track.lyrics,
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
current = self.current()
|
||||
return {
|
||||
"index": self._index,
|
||||
"queue_length": len(self._tracks),
|
||||
"tracks": [
|
||||
{
|
||||
"id": t.plan.id,
|
||||
"title": t.plan.title,
|
||||
"mood": t.plan.mood,
|
||||
"file": t.audio_path.name,
|
||||
}
|
||||
for t in self._tracks
|
||||
],
|
||||
"current": asdict(current.plan) if current else None,
|
||||
}
|
||||
@@ -0,0 +1,188 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import BackgroundTasks, FastAPI, HTTPException
|
||||
from pydantic import BaseModel, Field
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import FileResponse, HTMLResponse
|
||||
|
||||
from ozan_radio.config import Config
|
||||
from ozan_radio.dj import DeepSeekDJ
|
||||
from ozan_radio.lyria import LyriaEngine
|
||||
from ozan_radio.queue import RadioQueue
|
||||
from ozan_radio.spotify import SpotifyTaste
|
||||
from ozan_radio.chat_store import ChatStore
|
||||
from ozan_radio.taste import load_taste_seeds
|
||||
|
||||
app = FastAPI(title="Live Ozan Radio", version="0.1.0")
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
_config: Config | None = None
|
||||
_queue: RadioQueue | None = None
|
||||
_generating = False
|
||||
_chat = ChatStore()
|
||||
|
||||
|
||||
class ChatRequest(BaseModel):
|
||||
message: str = Field(min_length=1, max_length=500)
|
||||
|
||||
|
||||
async def _compose_track(request: str | None = None) -> dict:
|
||||
global _generating
|
||||
if _generating:
|
||||
return {"status": "busy", "message": "Already generating a track"}
|
||||
|
||||
_generating = True
|
||||
try:
|
||||
cfg = _get_config()
|
||||
taste = await SpotifyTaste(cfg).fetch_taste()
|
||||
seeds = None if taste else load_taste_seeds()
|
||||
q = _get_queue()
|
||||
vibe = request or _chat.take_vibe_hint()
|
||||
plan = await DeepSeekDJ(cfg).plan_next(
|
||||
taste, q.recent_titles, seeds, request=vibe or None
|
||||
)
|
||||
track = LyriaEngine(cfg).generate(plan)
|
||||
q.add(track)
|
||||
np = q.now_playing()
|
||||
return {
|
||||
"status": "ok",
|
||||
"track": np.__dict__ if np else None,
|
||||
"taste_used": taste.summary if taste else (seeds.summary if seeds else None),
|
||||
}
|
||||
finally:
|
||||
_generating = False
|
||||
|
||||
|
||||
def _get_config() -> Config:
|
||||
global _config
|
||||
if _config is None:
|
||||
_config = Config.from_env()
|
||||
return _config
|
||||
|
||||
|
||||
def _get_queue() -> RadioQueue:
|
||||
global _queue
|
||||
if _queue is None:
|
||||
_queue = RadioQueue(_get_config().output_dir)
|
||||
return _queue
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root() -> dict:
|
||||
return {
|
||||
"station": "Live Ozan Radio",
|
||||
"tagline": "No catalog. No repeats. DeepSeek DJ + Google Lyria 3.",
|
||||
"endpoints": {
|
||||
"now": "/api/now",
|
||||
"queue": "/api/queue",
|
||||
"generate": "POST /api/generate",
|
||||
"chat": "POST /api/chat",
|
||||
"chat_log": "/api/chat",
|
||||
"player": "/player",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@app.get("/api/now")
|
||||
async def now_playing() -> dict:
|
||||
q = _get_queue()
|
||||
np = q.now_playing()
|
||||
if not np:
|
||||
return {"status": "idle", "message": "Queue empty — POST /api/generate to start"}
|
||||
return {"status": "playing", "track": np.__dict__}
|
||||
|
||||
|
||||
@app.get("/api/queue")
|
||||
async def queue_status() -> dict:
|
||||
return _get_queue().to_dict()
|
||||
|
||||
|
||||
@app.get("/stream/{filename}")
|
||||
async def stream_track(filename: str) -> FileResponse:
|
||||
cfg = _get_config()
|
||||
path = (cfg.output_dir / filename).resolve()
|
||||
if not str(path).startswith(str(cfg.output_dir.resolve())):
|
||||
raise HTTPException(403, "Invalid path")
|
||||
if not path.exists():
|
||||
raise HTTPException(404, "Track not found")
|
||||
return FileResponse(path, media_type="audio/mpeg")
|
||||
|
||||
|
||||
@app.post("/api/generate")
|
||||
async def generate_track() -> dict:
|
||||
return await _compose_track()
|
||||
|
||||
|
||||
@app.get("/api/chat")
|
||||
async def chat_log() -> dict:
|
||||
return {"messages": _chat.public_log()}
|
||||
|
||||
|
||||
@app.post("/api/chat")
|
||||
async def chat_with_dj(body: ChatRequest, background: BackgroundTasks) -> dict:
|
||||
cfg = _get_config()
|
||||
q = _get_queue()
|
||||
np = q.now_playing()
|
||||
now = f"{np.title} — {np.mood}" if np else None
|
||||
|
||||
reply = await DeepSeekDJ(cfg).chat(body.message, _chat.history(), now_playing=now)
|
||||
_chat.add("user", body.message)
|
||||
_chat.add("dj", reply.reply)
|
||||
|
||||
result = {
|
||||
"status": "ok",
|
||||
"reply": reply.reply,
|
||||
"action": reply.action,
|
||||
"generating": False,
|
||||
}
|
||||
|
||||
if reply.action == "generate":
|
||||
if reply.vibe_hint:
|
||||
_chat.pending_vibe = reply.vibe_hint
|
||||
if not _generating:
|
||||
result["generating"] = True
|
||||
background.add_task(_compose_track, reply.vibe_hint or None)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@app.post("/api/skip")
|
||||
async def skip_track() -> dict:
|
||||
q = _get_queue()
|
||||
track = q.advance()
|
||||
if not track:
|
||||
return {"status": "idle"}
|
||||
np = q.now_playing()
|
||||
return {"status": "ok", "track": np.__dict__ if np else None}
|
||||
|
||||
|
||||
@app.get("/player", response_class=HTMLResponse)
|
||||
async def player_page() -> HTMLResponse:
|
||||
gateway = Path(__file__).resolve().parents[2] / "gateway" / "player.html"
|
||||
if gateway.exists():
|
||||
return HTMLResponse(gateway.read_text(encoding="utf-8"))
|
||||
return HTMLResponse("<h1>Live Ozan Radio</h1><p>gateway/player.html missing</p>")
|
||||
|
||||
|
||||
async def _autofill_queue(target: int = 2) -> None:
|
||||
"""Background: keep a small buffer of generated tracks."""
|
||||
while True:
|
||||
q = _get_queue()
|
||||
if q.length < target and not _generating:
|
||||
try:
|
||||
await generate_track()
|
||||
except Exception:
|
||||
pass
|
||||
await asyncio.sleep(30)
|
||||
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
return app
|
||||
@@ -0,0 +1,102 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
from dataclasses import dataclass
|
||||
|
||||
import httpx
|
||||
|
||||
from ozan_radio.config import Config
|
||||
|
||||
|
||||
@dataclass
|
||||
class TasteProfile:
|
||||
top_artists: list[str]
|
||||
top_genres: list[str]
|
||||
recent_tracks: list[str]
|
||||
summary: str
|
||||
|
||||
|
||||
class SpotifyTaste:
|
||||
"""Read Ozan's listening taste — no playback, profile only."""
|
||||
|
||||
def __init__(self, config: Config) -> None:
|
||||
self._config = config
|
||||
self._token: str | None = None
|
||||
|
||||
async def _access_token(self) -> str:
|
||||
if self._token:
|
||||
return self._token
|
||||
creds = f"{self._config.spotify_client_id}:{self._config.spotify_client_secret}"
|
||||
auth = base64.b64encode(creds.encode()).decode()
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.post(
|
||||
"https://accounts.spotify.com/api/token",
|
||||
headers={
|
||||
"Authorization": f"Basic {auth}",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
},
|
||||
data={
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": self._config.spotify_refresh_token,
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
self._token = resp.json()["access_token"]
|
||||
return self._token
|
||||
|
||||
async def fetch_taste(self) -> TasteProfile | None:
|
||||
if not self._config.spotify_configured:
|
||||
return None
|
||||
|
||||
token = await self._access_token()
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
artists_resp = await client.get(
|
||||
"https://api.spotify.com/v1/me/top/artists",
|
||||
headers=headers,
|
||||
params={"limit": 10, "time_range": "medium_term"},
|
||||
)
|
||||
tracks_resp = await client.get(
|
||||
"https://api.spotify.com/v1/me/top/tracks",
|
||||
headers=headers,
|
||||
params={"limit": 10, "time_range": "medium_term"},
|
||||
)
|
||||
artists_resp.raise_for_status()
|
||||
tracks_resp.raise_for_status()
|
||||
|
||||
artists = artists_resp.json().get("items", [])
|
||||
tracks = tracks_resp.json().get("items", [])
|
||||
|
||||
top_artists = [a["name"] for a in artists]
|
||||
genres: list[str] = []
|
||||
for artist in artists:
|
||||
genres.extend(artist.get("genres", []))
|
||||
# dedupe while preserving order
|
||||
seen: set[str] = set()
|
||||
top_genres = []
|
||||
for g in genres:
|
||||
if g not in seen:
|
||||
seen.add(g)
|
||||
top_genres.append(g)
|
||||
|
||||
recent_tracks = [
|
||||
f"{t['name']} — {t['artists'][0]['name']}" for t in tracks if t.get("artists")
|
||||
]
|
||||
|
||||
genre_hint = ", ".join(top_genres[:6]) if top_genres else "eclectic"
|
||||
artist_hint = ", ".join(top_artists[:5]) if top_artists else "varied"
|
||||
summary = (
|
||||
f"Listener leans toward {genre_hint}. "
|
||||
f"Frequent artists: {artist_hint}. "
|
||||
f"Recent favorites include {', '.join(recent_tracks[:3])}."
|
||||
if recent_tracks
|
||||
else f"Listener leans toward {genre_hint}. Frequent artists: {artist_hint}."
|
||||
)
|
||||
|
||||
return TasteProfile(
|
||||
top_artists=top_artists,
|
||||
top_genres=top_genres[:12],
|
||||
recent_tracks=recent_tracks,
|
||||
summary=summary,
|
||||
)
|
||||
@@ -0,0 +1,44 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@dataclass
|
||||
class TasteSeeds:
|
||||
genres: list[str]
|
||||
tracks: list[dict]
|
||||
summary: str
|
||||
|
||||
|
||||
def load_taste_seeds(repo_root: Path | None = None) -> TasteSeeds | None:
|
||||
"""Load taste_seeds.json from repo root — fallback when Spotify is offline."""
|
||||
root = repo_root or Path(__file__).resolve().parents[2]
|
||||
path = root / "taste_seeds.json"
|
||||
if not path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return None
|
||||
|
||||
genres = data.get("genres", [])
|
||||
tracks = data.get("tracks", [])
|
||||
if not genres and not tracks:
|
||||
return None
|
||||
|
||||
track_hints = []
|
||||
for t in tracks[:5]:
|
||||
artists = ", ".join(t.get("artists", []))
|
||||
vibe = t.get("vibe", "")
|
||||
track_hints.append(f"{t.get('title', '?')} ({artists}) — {vibe}")
|
||||
|
||||
summary = "Seed taste profile. "
|
||||
if genres:
|
||||
summary += f"Genres: {', '.join(genres[:8])}. "
|
||||
if track_hints:
|
||||
summary += "Reference vibes: " + "; ".join(track_hints)
|
||||
|
||||
return TasteSeeds(genres=genres, tracks=tracks, summary=summary.strip())
|
||||
Reference in New Issue
Block a user