from __future__ import annotations import asyncio import json import random from datetime import datetime, timezone from pathlib import Path from fastapi import BackgroundTasks, FastAPI, HTTPException from pydantic import BaseModel, Field from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse from ozan_radio.config import Config from ozan_radio.dj import DeepSeekDJ from ozan_radio.lyria import LyriaEngine from ozan_radio.queue import RadioQueue from ozan_radio.chat_store import ChatStore from ozan_radio.dj import TrackPlan from ozan_radio.library import list_saved_songs from ozan_radio.lyria import GeneratedTrack from ozan_radio.radio_settings import load_radio_settings, save_radio_settings_patch from ozan_radio.stats import cost_per_track, record_generation, today_stats from ozan_radio.lyria_capabilities import probe_lyria_api, static_capabilities from ozan_radio.settings import load_lyria_settings from ozan_radio.taste import load_taste_seeds app = FastAPI(title="Live Ozan Radio", version="0.1.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) _config: Config | None = None _queue: RadioQueue | None = None _generating = False _generation_state: dict = { "busy": False, "phase": None, "error": None, "track_title": None, "last_completed_at": None, "last_completed_title": None, "last_completed_id": None, } _chat = ChatStore() class ChatRequest(BaseModel): message: str = Field(min_length=1, max_length=500) class SettingsPatch(BaseModel): playback: dict | None = None limits: dict | None = None costs: dict | None = None lyria: dict | None = None def _can_generate_today(cfg: Config) -> tuple[bool, dict]: rs = load_radio_settings(lyria_model=cfg.lyria_model) stats = today_stats(cfg.output_dir) cap = rs.limits.max_new_songs_per_day if cap <= 0: return True, { **stats, "max_per_day": 0, "remaining": -1, "unlimited": True, } remaining = cap - stats["generated"] return remaining > 0, { **stats, "max_per_day": cap, "remaining": max(0, remaining), "unlimited": False, } def _dashboard_stats(cfg: Config) -> dict: rs = load_radio_settings(lyria_model=cfg.lyria_model) budget = _can_generate_today(cfg)[1] per_track = rs.per_track_cost() return { "today": budget, "costs": rs.costs.__dict__ | {"per_track_estimate_usd": per_track}, "playback": rs.playback.__dict__, "projected_daily_max_usd": round(per_track * rs.limits.max_new_songs_per_day, 2), "generation": dict(_generation_state), } def _play_library_entry(q: RadioQueue, cfg: Config, entry: dict) -> dict: track = q.play_id(entry["id"]) if not track: plan = TrackPlan( id=entry["id"], title=entry.get("title", entry["id"]), mood=entry.get("mood", ""), dj_line=entry.get("dj_line", ""), lyria_prompt=entry.get("lyria_prompt", ""), ) path = cfg.output_dir / entry["file"] restored = GeneratedTrack(plan=plan, audio_path=path, lyrics=entry.get("lyrics", "")) q.add(restored) q.play_id(entry["id"]) np = q.now_playing() return {"status": "ok", "source": "library", "track": np.__dict__ if np else None} def _set_generation( *, busy: bool, phase: str | None = None, error: str | None = None, title: str | None = None, completed_id: str | None = None, ) -> None: global _generation_state state = { "busy": busy, "phase": phase, "error": error, "track_title": title, "last_completed_at": _generation_state.get("last_completed_at"), "last_completed_title": _generation_state.get("last_completed_title"), "last_completed_id": _generation_state.get("last_completed_id"), } if completed_id and title: state["last_completed_at"] = datetime.now(timezone.utc).isoformat() state["last_completed_title"] = title state["last_completed_id"] = completed_id _generation_state = state async def _compose_track(request: str | None = None, *, check_limit: bool = True) -> dict: global _generating if _generating: return {"status": "busy", "message": "Already generating a track", "generation": _generation_state} _generating = True _set_generation(busy=True, phase="planning", error=None) try: cfg = _get_config() if check_limit: ok, budget = _can_generate_today(cfg) if not ok: _set_generation(busy=False) return { "status": "limit", "message": f"Daily limit reached ({budget['max_per_day']} new songs)", "budget": budget, } seeds = load_taste_seeds() q = _get_queue() vibe = request or _chat.take_vibe_hint() plan = await DeepSeekDJ(cfg).plan_next( q.recent_titles, seeds, request=vibe or None ) _set_generation(busy=True, phase="composing", title=plan.title) track = LyriaEngine(cfg).generate(plan) q.add(track) q.play_id(track.plan.id) rs = load_radio_settings(lyria_model=cfg.lyria_model) cost = cost_per_track(cfg.lyria_model, rs.costs.__dict__) record_generation(cfg.output_dir, cost, track.plan.id, track.plan.title) np = q.now_playing() _, budget = _can_generate_today(cfg) _set_generation( busy=False, title=track.plan.title, completed_id=track.plan.id, ) return { "status": "ok", "source": "generated", "track": np.__dict__ if np else None, "taste_used": seeds.summary if seeds else None, "cost_usd": cost, "budget": budget, } except Exception as exc: message = str(exc) _set_generation(busy=False, error=message) return {"status": "error", "message": message, "generation": _generation_state} finally: _generating = False def _get_config() -> Config: global _config if _config is None: _config = Config.from_env() return _config def _get_queue() -> RadioQueue: global _queue if _queue is None: _queue = RadioQueue(_get_config().output_dir) return _queue @app.get("/") async def root() -> dict: return { "station": "Live Ozan Radio", "tagline": "No catalog. No repeats. DeepSeek DJ + Google Lyria 3.", "endpoints": { "now": "/api/now", "queue": "/api/queue", "generate": "POST /api/generate", "chat": "POST /api/chat", "chat_log": "/api/chat", "songs": "/api/songs", "stats": "/api/stats", "settings": "/api/settings", "lyria": "/api/lyria", "shuffle": "POST /api/shuffle/next", "player": "/player", }, } @app.get("/api/now") async def now_playing() -> dict: q = _get_queue() np = q.now_playing() if not np: return {"status": "idle", "message": "Queue empty — POST /api/generate to start"} return {"status": "playing", "track": np.__dict__} @app.get("/api/queue") async def queue_status() -> dict: return _get_queue().to_dict() @app.get("/stream/{filename}") async def stream_track(filename: str) -> FileResponse: cfg = _get_config() path = (cfg.output_dir / filename).resolve() if not str(path).startswith(str(cfg.output_dir.resolve())): raise HTTPException(403, "Invalid path") if not path.exists(): raise HTTPException(404, "Track not found") return FileResponse(path, media_type="audio/mpeg") @app.get("/api/stats") async def dashboard_stats() -> dict: cfg = _get_config() return _dashboard_stats(cfg) @app.get("/api/lyria") async def lyria_capabilities() -> dict: cfg = _get_config() caps = static_capabilities() api_status = probe_lyria_api(cfg.gemini_api_key) active = load_lyria_settings().to_public_dict() available = set(api_status.get("models_available") or []) models = [] for m in caps["models"]: entry = dict(m) entry["available"] = not available or m["id"] in available models.append(entry) caps["models"] = models return { "capabilities": caps, "active": active, "api": api_status, } @app.get("/api/settings") async def get_settings() -> dict: cfg = _get_config() rs = load_radio_settings(lyria_model=cfg.lyria_model) return rs.to_public_dict() | {"budget": _can_generate_today(cfg)[1]} @app.patch("/api/settings") async def patch_settings(body: SettingsPatch) -> dict: patch = body.model_dump(exclude_none=True) updated = save_radio_settings_patch(patch) cfg = _get_config() return updated | {"budget": _can_generate_today(cfg)[1]} @app.post("/api/generate") async def generate_track(background: BackgroundTasks) -> dict: """Start generation in background — poll /api/stats for progress and ready state.""" cfg = _get_config() if _generating: return { "status": "busy", "message": "Already generating a track", "generation": _generation_state, } ok, budget = _can_generate_today(cfg) if not ok: return { "status": "limit", "message": f"Daily limit reached ({budget['max_per_day']} new songs)", "budget": budget, } background.add_task(_background_compose, None) return { "status": "accepted", "generating": True, "message": "Generating — planning then Lyria compose (~30–90s)", "budget": budget, "generation": _generation_state, } @app.post("/api/shuffle/next") async def shuffle_next() -> dict: cfg = _get_config() rs = load_radio_settings(lyria_model=cfg.lyria_model) q = _get_queue() songs = list_saved_songs(cfg.output_dir) can_new, budget = _can_generate_today(cfg) want_new = False if rs.playback.shuffle and rs.playback.mix_existing_and_new and can_new and songs: want_new = random.random() < rs.playback.new_song_chance elif rs.playback.shuffle and not songs and can_new: want_new = True if want_new: result = await _compose_track() if result.get("status") == "ok": q.shuffle_to_id(result["track"]["track_id"]) if result.get("track") else None return result if songs: current_id = None np = q.now_playing() if np: current_id = np.track_id pool = [s for s in songs if s["id"] != current_id] or songs pick = random.choice(pool) result = _play_library_entry(q, cfg, pick) result["budget"] = budget return result if can_new: return await _compose_track() return { "status": "limit", "message": "No saved songs and daily generation limit reached", "budget": budget, } @app.get("/api/chat") async def chat_log() -> dict: return {"messages": _chat.public_log()} @app.post("/api/chat") async def chat_with_dj(body: ChatRequest, background: BackgroundTasks) -> dict: cfg = _get_config() q = _get_queue() np = q.now_playing() now = f"{np.title} — {np.mood}" if np else None reply = await DeepSeekDJ(cfg).chat(body.message, _chat.history(), now_playing=now) _chat.add("user", body.message) _chat.add("dj", reply.reply) result = { "status": "ok", "reply": reply.reply, "action": reply.action, "generating": False, } if reply.action == "generate": if reply.vibe_hint: _chat.pending_vibe = reply.vibe_hint if not _generating: ok, _ = _can_generate_today(cfg) if ok: result["generating"] = True background.add_task(_background_compose, reply.vibe_hint or None) else: result["reply"] += " (Daily new-song limit reached — playing saved tracks only.)" return result def _vocal_cues_path() -> Path: return Path(__file__).resolve().parents[2] / "vocal_cues.json" def _load_vocal_cues() -> dict: path = _vocal_cues_path() if not path.exists(): return {"tracks": {}} try: return json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return {"tracks": {}} @app.get("/api/vocal-cues") async def vocal_cues_all() -> dict: data = _load_vocal_cues() return {"tracks": list(data.get("tracks", {}).keys()), "count": len(data.get("tracks", {}))} @app.get("/api/vocal-cues/{track_id}") async def vocal_cues_for_track(track_id: str) -> dict: cues = _load_vocal_cues().get("tracks", {}).get(track_id) if not cues: raise HTTPException(404, "No vocal cues for this track") return {"track_id": track_id, **cues} @app.get("/api/songs") async def saved_songs() -> dict: cfg = _get_config() songs = list_saved_songs(cfg.output_dir) return {"count": len(songs), "songs": songs, "folder": str(cfg.output_dir)} @app.post("/api/songs/{track_id}/play") async def play_saved(track_id: str) -> dict: q = _get_queue() track = q.play_id(track_id) if not track: cfg = _get_config() for entry in list_saved_songs(cfg.output_dir): if entry["id"] == track_id: plan = TrackPlan( id=entry["id"], title=entry.get("title", track_id), mood=entry.get("mood", ""), dj_line=entry.get("dj_line", ""), lyria_prompt=entry.get("lyria_prompt", ""), ) path = cfg.output_dir / entry["file"] restored = GeneratedTrack(plan=plan, audio_path=path, lyrics=entry.get("lyrics", "")) q.add(restored) q.play_id(track_id) np = q.now_playing() return {"status": "ok", "track": np.__dict__ if np else None} raise HTTPException(404, "Song not found") np = q.now_playing() return {"status": "ok", "track": np.__dict__ if np else None} @app.post("/api/skip") async def skip_track() -> dict: cfg = _get_config() rs = load_radio_settings(lyria_model=cfg.lyria_model) if rs.playback.shuffle: return await shuffle_next() q = _get_queue() track = q.advance() if not track: return {"status": "idle"} np = q.now_playing() return {"status": "ok", "track": np.__dict__ if np else None, "source": "queue"} @app.get("/player", response_class=HTMLResponse) async def player_page() -> HTMLResponse: gateway = Path(__file__).resolve().parents[2] / "gateway" / "player.html" if gateway.exists(): return HTMLResponse(gateway.read_text(encoding="utf-8")) return HTMLResponse("

Live Ozan Radio

gateway/player.html missing

") async def _background_compose(request: str | None = None) -> None: """Chat-triggered generation — never raises; errors land in _generation_state.""" result = await _compose_track(request) if result.get("status") == "error": _chat.add("dj", f"Couldn't finish that track: {result.get('message', 'unknown error')}") elif result.get("status") == "ok" and result.get("track"): t = result["track"] _chat.add("dj", f"Fresh cut ready: {t.get('title', 'new track')} — hitting the stream now.") async def _autofill_queue(target: int = 2) -> None: """Background: keep a small buffer of generated tracks.""" while True: q = _get_queue() if q.length < target and not _generating: try: await generate_track() except Exception: pass await asyncio.sleep(30) def create_app() -> FastAPI: return app