Files
live-radio/src/ozan_radio/server.py
T

364 lines
11 KiB
Python
Raw Normal View History

from __future__ import annotations
import asyncio
import random
from pathlib import Path
from fastapi import BackgroundTasks, FastAPI, HTTPException
from pydantic import BaseModel, Field
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse
from ozan_radio.config import Config
from ozan_radio.dj import DeepSeekDJ
from ozan_radio.lyria import LyriaEngine
from ozan_radio.queue import RadioQueue
from ozan_radio.spotify import SpotifyTaste
from ozan_radio.chat_store import ChatStore
from ozan_radio.dj import TrackPlan
from ozan_radio.library import list_saved_songs
from ozan_radio.lyria import GeneratedTrack
from ozan_radio.radio_settings import load_radio_settings, save_radio_settings_patch
from ozan_radio.stats import cost_per_track, record_generation, today_stats
from ozan_radio.taste import load_taste_seeds
app = FastAPI(title="Live Ozan Radio", version="0.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
_config: Config | None = None
_queue: RadioQueue | None = None
_generating = False
_chat = ChatStore()
class ChatRequest(BaseModel):
message: str = Field(min_length=1, max_length=500)
class SettingsPatch(BaseModel):
playback: dict | None = None
limits: dict | None = None
costs: dict | None = None
def _can_generate_today(cfg: Config) -> tuple[bool, dict]:
rs = load_radio_settings(lyria_model=cfg.lyria_model)
stats = today_stats(cfg.output_dir)
remaining = rs.limits.max_new_songs_per_day - stats["generated"]
return remaining > 0, {
**stats,
"max_per_day": rs.limits.max_new_songs_per_day,
"remaining": max(0, remaining),
}
def _dashboard_stats(cfg: Config) -> dict:
rs = load_radio_settings(lyria_model=cfg.lyria_model)
budget = _can_generate_today(cfg)[1]
per_track = rs.per_track_cost()
return {
"today": budget,
"costs": rs.costs.__dict__ | {"per_track_estimate_usd": per_track},
"playback": rs.playback.__dict__,
"projected_daily_max_usd": round(per_track * rs.limits.max_new_songs_per_day, 2),
}
def _play_library_entry(q: RadioQueue, cfg: Config, entry: dict) -> dict:
track = q.play_id(entry["id"])
if not track:
plan = TrackPlan(
id=entry["id"],
title=entry.get("title", entry["id"]),
mood=entry.get("mood", ""),
dj_line=entry.get("dj_line", ""),
lyria_prompt=entry.get("lyria_prompt", ""),
)
path = cfg.output_dir / entry["file"]
restored = GeneratedTrack(plan=plan, audio_path=path, lyrics=entry.get("lyrics", ""))
q.add(restored)
q.play_id(entry["id"])
np = q.now_playing()
return {"status": "ok", "source": "library", "track": np.__dict__ if np else None}
async def _compose_track(request: str | None = None, *, check_limit: bool = True) -> dict:
global _generating
if _generating:
return {"status": "busy", "message": "Already generating a track"}
_generating = True
try:
cfg = _get_config()
if check_limit:
ok, budget = _can_generate_today(cfg)
if not ok:
return {
"status": "limit",
"message": f"Daily limit reached ({budget['max_per_day']} new songs)",
"budget": budget,
}
taste = await SpotifyTaste(cfg).fetch_taste()
seeds = None if taste else load_taste_seeds()
q = _get_queue()
vibe = request or _chat.take_vibe_hint()
plan = await DeepSeekDJ(cfg).plan_next(
taste, q.recent_titles, seeds, request=vibe or None
)
track = LyriaEngine(cfg).generate(plan)
q.add(track)
rs = load_radio_settings(lyria_model=cfg.lyria_model)
cost = cost_per_track(cfg.lyria_model, rs.costs.__dict__)
record_generation(cfg.output_dir, cost, track.plan.id, track.plan.title)
np = q.now_playing()
_, budget = _can_generate_today(cfg)
return {
"status": "ok",
"source": "generated",
"track": np.__dict__ if np else None,
"taste_used": taste.summary if taste else (seeds.summary if seeds else None),
"cost_usd": cost,
"budget": budget,
}
finally:
_generating = False
def _get_config() -> Config:
global _config
if _config is None:
_config = Config.from_env()
return _config
def _get_queue() -> RadioQueue:
global _queue
if _queue is None:
_queue = RadioQueue(_get_config().output_dir)
return _queue
@app.get("/")
async def root() -> dict:
return {
"station": "Live Ozan Radio",
"tagline": "No catalog. No repeats. DeepSeek DJ + Google Lyria 3.",
"endpoints": {
"now": "/api/now",
"queue": "/api/queue",
"generate": "POST /api/generate",
"chat": "POST /api/chat",
"chat_log": "/api/chat",
"songs": "/api/songs",
"stats": "/api/stats",
"settings": "/api/settings",
"shuffle": "POST /api/shuffle/next",
"player": "/player",
},
}
@app.get("/api/now")
async def now_playing() -> dict:
q = _get_queue()
np = q.now_playing()
if not np:
return {"status": "idle", "message": "Queue empty — POST /api/generate to start"}
return {"status": "playing", "track": np.__dict__}
@app.get("/api/queue")
async def queue_status() -> dict:
return _get_queue().to_dict()
@app.get("/stream/{filename}")
async def stream_track(filename: str) -> FileResponse:
cfg = _get_config()
path = (cfg.output_dir / filename).resolve()
if not str(path).startswith(str(cfg.output_dir.resolve())):
raise HTTPException(403, "Invalid path")
if not path.exists():
raise HTTPException(404, "Track not found")
return FileResponse(path, media_type="audio/mpeg")
@app.get("/api/stats")
async def dashboard_stats() -> dict:
cfg = _get_config()
return _dashboard_stats(cfg)
@app.get("/api/settings")
async def get_settings() -> dict:
cfg = _get_config()
rs = load_radio_settings(lyria_model=cfg.lyria_model)
return rs.to_public_dict() | {"budget": _can_generate_today(cfg)[1]}
@app.patch("/api/settings")
async def patch_settings(body: SettingsPatch) -> dict:
patch = body.model_dump(exclude_none=True)
updated = save_radio_settings_patch(patch)
cfg = _get_config()
return updated | {"budget": _can_generate_today(cfg)[1]}
@app.post("/api/generate")
async def generate_track() -> dict:
return await _compose_track()
@app.post("/api/shuffle/next")
async def shuffle_next() -> dict:
cfg = _get_config()
rs = load_radio_settings(lyria_model=cfg.lyria_model)
q = _get_queue()
songs = list_saved_songs(cfg.output_dir)
can_new, budget = _can_generate_today(cfg)
want_new = False
if rs.playback.shuffle and rs.playback.mix_existing_and_new and can_new and songs:
want_new = random.random() < rs.playback.new_song_chance
elif rs.playback.shuffle and not songs and can_new:
want_new = True
if want_new:
result = await _compose_track()
if result.get("status") == "ok":
q.shuffle_to_id(result["track"]["track_id"]) if result.get("track") else None
return result
if songs:
current_id = None
np = q.now_playing()
if np:
current_id = np.track_id
pool = [s for s in songs if s["id"] != current_id] or songs
pick = random.choice(pool)
result = _play_library_entry(q, cfg, pick)
result["budget"] = budget
return result
if can_new:
return await _compose_track()
return {
"status": "limit",
"message": "No saved songs and daily generation limit reached",
"budget": budget,
}
@app.get("/api/chat")
async def chat_log() -> dict:
return {"messages": _chat.public_log()}
@app.post("/api/chat")
async def chat_with_dj(body: ChatRequest, background: BackgroundTasks) -> dict:
cfg = _get_config()
q = _get_queue()
np = q.now_playing()
now = f"{np.title}{np.mood}" if np else None
reply = await DeepSeekDJ(cfg).chat(body.message, _chat.history(), now_playing=now)
_chat.add("user", body.message)
_chat.add("dj", reply.reply)
result = {
"status": "ok",
"reply": reply.reply,
"action": reply.action,
"generating": False,
}
if reply.action == "generate":
if reply.vibe_hint:
_chat.pending_vibe = reply.vibe_hint
if not _generating:
ok, _ = _can_generate_today(cfg)
if ok:
result["generating"] = True
background.add_task(_compose_track, reply.vibe_hint or None)
else:
result["reply"] += " (Daily new-song limit reached — playing saved tracks only.)"
return result
@app.get("/api/songs")
async def saved_songs() -> dict:
cfg = _get_config()
songs = list_saved_songs(cfg.output_dir)
return {"count": len(songs), "songs": songs, "folder": str(cfg.output_dir)}
@app.post("/api/songs/{track_id}/play")
async def play_saved(track_id: str) -> dict:
q = _get_queue()
track = q.play_id(track_id)
if not track:
cfg = _get_config()
for entry in list_saved_songs(cfg.output_dir):
if entry["id"] == track_id:
plan = TrackPlan(
id=entry["id"],
title=entry.get("title", track_id),
mood=entry.get("mood", ""),
dj_line=entry.get("dj_line", ""),
lyria_prompt=entry.get("lyria_prompt", ""),
)
path = cfg.output_dir / entry["file"]
restored = GeneratedTrack(plan=plan, audio_path=path, lyrics=entry.get("lyrics", ""))
q.add(restored)
q.play_id(track_id)
np = q.now_playing()
return {"status": "ok", "track": np.__dict__ if np else None}
raise HTTPException(404, "Song not found")
np = q.now_playing()
return {"status": "ok", "track": np.__dict__ if np else None}
@app.post("/api/skip")
async def skip_track() -> dict:
cfg = _get_config()
rs = load_radio_settings(lyria_model=cfg.lyria_model)
if rs.playback.shuffle:
return await shuffle_next()
q = _get_queue()
track = q.advance()
if not track:
return {"status": "idle"}
np = q.now_playing()
return {"status": "ok", "track": np.__dict__ if np else None, "source": "queue"}
@app.get("/player", response_class=HTMLResponse)
async def player_page() -> HTMLResponse:
gateway = Path(__file__).resolve().parents[2] / "gateway" / "player.html"
if gateway.exists():
return HTMLResponse(gateway.read_text(encoding="utf-8"))
return HTMLResponse("<h1>Live Ozan Radio</h1><p>gateway/player.html missing</p>")
async def _autofill_queue(target: int = 2) -> None:
"""Background: keep a small buffer of generated tracks."""
while True:
q = _get_queue()
if q.length < target and not _generating:
try:
await generate_track()
except Exception:
pass
await asyncio.sleep(30)
def create_app() -> FastAPI:
return app