From 81e8173fb98d66b306f1115ebfed098e68e14364 Mon Sep 17 00:00:00 2001 From: ameer Date: Sat, 2 May 2026 02:59:34 +0800 Subject: [PATCH] Add API routes and websocket room manager --- app/room.py | 623 ++++++++++++++++++++++++++++++++++++++++++ app/routes_admin.py | 176 ++++++++++++ app/routes_student.py | 74 +++++ 3 files changed, 873 insertions(+) diff --git a/app/room.py b/app/room.py index 278a10e..49a07f2 100644 --- a/app/room.py +++ b/app/room.py @@ -1 +1,624 @@ """In-process WebSocket room manager.""" + +from __future__ import annotations + +import asyncio +import json +from collections import defaultdict +from datetime import UTC, datetime +from typing import Any + +from fastapi import WebSocket, WebSocketDisconnect + +from app.config import Settings +from app.db import connect +from app.pool import get_question, parse_pool_json, public_question_payload, question_count, question_time_limit +from app.scoring import SCORE_FNS + + +def now_utc() -> datetime: + return datetime.now(UTC) + + +def now_ms() -> int: + return int(now_utc().timestamp() * 1000) + + +def iso_now() -> str: + return now_utc().isoformat() + + +def parse_ts(value: str) -> datetime: + if value.endswith("Z"): + value = value[:-1] + "+00:00" + parsed = datetime.fromisoformat(value) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=UTC) + return parsed + + +class RoomManager: + def __init__(self, settings: Settings): + self.settings = settings + self.student_clients: dict[str, dict[WebSocket, dict[str, Any]]] = defaultdict(dict) + self.instructor_clients: dict[str, set[WebSocket]] = defaultdict(set) + self.autoclose_tasks: dict[tuple[str, int], asyncio.Task] = {} + self.locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + + async def sessions_active(self) -> int: + async with connect(self.settings.db_path) as db: + cursor = await db.execute("SELECT COUNT(*) AS count FROM quiz_sessions WHERE state != 'finished'") + row = await cursor.fetchone() + return int(row["count"]) + + def ws_client_count(self) -> int: + return sum(len(clients) for clients in self.student_clients.values()) + sum( + len(clients) for clients in self.instructor_clients.values() + ) + + async def add_participant(self, sid: str, student_id: str, name: str, cookie_id: str) -> None: + async with connect(self.settings.db_path) as db: + await db.execute( + """ + INSERT INTO participants (sid, student_id, name, cookie_id) + VALUES (?, ?, ?, ?) + ON CONFLICT(sid, student_id) DO UPDATE SET name = excluded.name, cookie_id = excluded.cookie_id + """, + (sid, student_id, name, cookie_id), + ) + await db.execute( + """ + INSERT OR IGNORE INTO submissions (sid, student_id, question_idx, status, score) + SELECT sid, ?, question_idx, 'missed', 0 + FROM question_events + WHERE sid = ? AND closed_at IS NOT NULL + """, + (student_id, sid), + ) + await db.commit() + await self.broadcast_lobby(sid) + + async def student_ws(self, websocket: WebSocket, sid: str, identity: dict[str, Any]) -> None: + await websocket.accept() + self.student_clients[sid][websocket] = identity + try: + await self.send_student_snapshot(websocket, sid, identity) + while True: + data = await websocket.receive_json() + msg_type = data.get("type") + if msg_type == "ping": + await websocket.send_json({"type": "pong"}) + elif msg_type == "submit": + ack = await self.submit_answer(sid, identity["student_id"], data.get("question_idx"), data.get("answer")) + await websocket.send_json(ack) + else: + await websocket.send_json({"type": "error", "code": "bad_message", "message": "Unknown message type"}) + except WebSocketDisconnect: + pass + finally: + self.student_clients[sid].pop(websocket, None) + + async def instructor_ws(self, websocket: WebSocket, sid: str) -> None: + await websocket.accept() + self.instructor_clients[sid].add(websocket) + try: + await self.send_instructor_snapshot(websocket, sid) + while True: + data = await websocket.receive_json() + msg_type = data.get("type") + if msg_type == "ping": + await websocket.send_json({"type": "pong"}) + elif msg_type == "open_question": + await self.open_question(sid, int(data.get("question_idx", 0)), data.get("time_limit")) + elif msg_type == "close_question": + await self.close_question(sid) + elif msg_type == "next": + await self.next_question(sid) + elif msg_type == "end_session": + await self.end_session(sid) + else: + await websocket.send_json({"type": "error", "code": "bad_message", "message": "Unknown message type"}) + except WebSocketDisconnect: + pass + finally: + self.instructor_clients[sid].discard(websocket) + + async def send_student_snapshot(self, websocket: WebSocket, sid: str, identity: dict[str, Any]) -> None: + session = await self.get_session(sid) + await websocket.send_json( + { + "type": "state", + "state": session["state"], + "current_question_idx": session["current_question_idx"], + "title": session["title"], + } + ) + if session["state"] == "question_open": + await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"])) + ack = await self.existing_submit_ack(sid, identity["student_id"], session["current_question_idx"]) + if ack: + await websocket.send_json(ack) + + async def send_instructor_snapshot(self, websocket: WebSocket, sid: str) -> None: + session = await self.get_session(sid) + await websocket.send_json( + { + "type": "state", + "state": session["state"], + "current_question_idx": session["current_question_idx"], + "title": session["title"], + } + ) + await websocket.send_json(await self.lobby_message(sid)) + if session["state"] == "question_open": + await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"])) + await websocket.send_json(await self.live_histogram_message(sid, session["current_question_idx"])) + if session["state"] in {"question_closed", "between_questions", "finished"}: + await websocket.send_json(await self.full_leaderboard_message(sid)) + + async def open_question(self, sid: str, question_idx: int, time_limit: int | None = None) -> None: + async with self.locks[sid]: + session = await self.get_session(sid) + if session["state"] == "question_open" and session["current_question_idx"] == question_idx: + return + if session["state"] == "question_open": + await self._close_question_locked(sid, session["current_question_idx"]) + pool = await self.get_pool_for_session(sid) + if question_idx < 0 or question_idx >= question_count(pool): + await self.broadcast_instructors(sid, {"type": "error", "code": "bad_question", "message": "Question index out of range"}) + return + limit = int(time_limit or question_time_limit(pool, question_idx)) + opened = iso_now() + async with connect(self.settings.db_path) as db: + await db.execute( + """ + INSERT INTO question_events (sid, question_idx, opened_at, time_limit) + VALUES (?, ?, ?, ?) + ON CONFLICT(sid, question_idx) DO UPDATE SET opened_at = excluded.opened_at, + closed_at = NULL, time_limit = excluded.time_limit + """, + (sid, question_idx, opened, limit), + ) + await db.execute( + "UPDATE quiz_sessions SET state = 'question_open', current_question_idx = ? WHERE sid = ?", + (question_idx, sid), + ) + await db.commit() + self._schedule_autoclose(sid, question_idx, limit) + msg = await self.question_open_message(sid, question_idx) + await self.broadcast_students(sid, msg) + await self.broadcast_instructors(sid, msg) + await self.broadcast_instructors(sid, await self.live_histogram_message(sid, question_idx)) + + async def close_question(self, sid: str) -> None: + async with self.locks[sid]: + session = await self.get_session(sid) + if session["state"] != "question_open": + return + question_idx = session["current_question_idx"] + await self._close_question_locked(sid, question_idx) + await self.broadcast_question_closed(sid, question_idx) + + async def _close_question_locked(self, sid: str, question_idx: int) -> None: + task = self.autoclose_tasks.pop((sid, question_idx), None) + current = asyncio.current_task() + if task and task is not current: + task.cancel() + async with connect(self.settings.db_path) as db: + await db.execute( + "UPDATE question_events SET closed_at = COALESCE(closed_at, ?) WHERE sid = ? AND question_idx = ?", + (iso_now(), sid, question_idx), + ) + await db.execute( + """ + INSERT OR IGNORE INTO submissions (sid, student_id, question_idx, status, score) + SELECT p.sid, p.student_id, ?, 'missed', 0 + FROM participants p + WHERE p.sid = ? + """, + (question_idx, sid), + ) + await db.execute( + "UPDATE quiz_sessions SET state = 'question_closed', current_question_idx = ? WHERE sid = ?", + (question_idx, sid), + ) + await db.commit() + + async def next_question(self, sid: str) -> None: + async with self.locks[sid]: + session = await self.get_session(sid) + if session["state"] != "question_closed": + return + next_idx = int(session["current_question_idx"]) + 1 + async with connect(self.settings.db_path) as db: + await db.execute("UPDATE quiz_sessions SET state = 'between_questions' WHERE sid = ?", (sid,)) + await db.commit() + await self.broadcast_between_questions(sid, next_idx) + + async def end_session(self, sid: str) -> None: + async with self.locks[sid]: + session = await self.get_session(sid) + if session["state"] == "question_open": + await self._close_question_locked(sid, session["current_question_idx"]) + async with connect(self.settings.db_path) as db: + await db.execute( + "UPDATE quiz_sessions SET state = 'finished', current_question_idx = NULL, finished_at = ? WHERE sid = ?", + (iso_now(), sid), + ) + await db.commit() + await self.broadcast_session_ended(sid) + await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid)) + + async def submit_answer(self, sid: str, student_id: str, question_idx: Any, answer: Any) -> dict[str, Any]: + try: + qidx = int(question_idx) + except (TypeError, ValueError): + return {"type": "error", "code": "bad_question", "message": "Invalid question index"} + if answer not in {"A", "B", "C", "D"}: + return {"type": "error", "code": "bad_answer", "message": "Answer must be A, B, C, or D"} + async with self.locks[sid]: + session = await self.get_session(sid) + if session["state"] != "question_open" or session["current_question_idx"] != qidx: + return {"type": "error", "code": "not_open", "message": "Question is not open"} + existing = await self.existing_submit_ack(sid, student_id, qidx) + if existing: + return existing + event = await self.get_question_event(sid, qidx) + opened_at = parse_ts(event["opened_at"]) + elapsed_ms = max(0, int((now_utc() - opened_at).total_seconds() * 1000)) + time_limit_ms = int(event["time_limit"]) * 1000 + if elapsed_ms > time_limit_ms: + return {"type": "error", "code": "time_expired", "message": "Question time has expired"} + pool = await self.get_pool_for_session(sid) + question = get_question(pool, qidx) + correct = answer == question["correct"] + score_fn = SCORE_FNS[pool["score_fn"]] + score = score_fn(correct, elapsed_ms, time_limit_ms) + submitted_at = iso_now() + async with connect(self.settings.db_path) as db: + await db.execute( + """ + INSERT INTO submissions (sid, student_id, question_idx, answer, submitted_at, elapsed_ms, score, status) + VALUES (?, ?, ?, ?, ?, ?, ?, 'submitted') + ON CONFLICT(sid, student_id, question_idx) DO NOTHING + """, + (sid, student_id, qidx, answer, submitted_at, elapsed_ms, score), + ) + await db.commit() + await self.broadcast_instructors(sid, await self.live_histogram_message(sid, qidx)) + return {"type": "submit_ack", "question_idx": qidx, "answer": answer, "score": score, "elapsed_ms": elapsed_ms} + + def _schedule_autoclose(self, sid: str, question_idx: int, time_limit: int) -> None: + previous = self.autoclose_tasks.pop((sid, question_idx), None) + if previous: + previous.cancel() + self.autoclose_tasks[(sid, question_idx)] = asyncio.create_task(self._autoclose_after(sid, question_idx, time_limit)) + + async def _autoclose_after(self, sid: str, question_idx: int, time_limit: int) -> None: + try: + await asyncio.sleep(time_limit) + async with self.locks[sid]: + session = await self.get_session(sid) + if session["state"] == "question_open" and session["current_question_idx"] == question_idx: + await self._close_question_locked(sid, question_idx) + else: + return + await self.broadcast_question_closed(sid, question_idx) + except asyncio.CancelledError: + return + + async def get_session(self, sid: str) -> dict[str, Any]: + async with connect(self.settings.db_path) as db: + cursor = await db.execute("SELECT * FROM quiz_sessions WHERE sid = ?", (sid,)) + row = await cursor.fetchone() + if row is None: + raise KeyError(f"Unknown session {sid}") + return dict(row) + + async def session_exists(self, sid: str) -> bool: + async with connect(self.settings.db_path) as db: + cursor = await db.execute("SELECT 1 FROM quiz_sessions WHERE sid = ?", (sid,)) + row = await cursor.fetchone() + return row is not None + + async def get_pool_for_session(self, sid: str) -> dict[str, Any]: + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + """ + SELECT q.pool_json + FROM quizzes q + JOIN quiz_sessions s ON s.quiz_id = q.id + WHERE s.sid = ? + """, + (sid,), + ) + row = await cursor.fetchone() + if row is None: + raise KeyError(f"Unknown session {sid}") + return parse_pool_json(row["pool_json"]) + + async def get_question_event(self, sid: str, question_idx: int) -> dict[str, Any]: + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + "SELECT * FROM question_events WHERE sid = ? AND question_idx = ?", + (sid, question_idx), + ) + row = await cursor.fetchone() + if row is None: + raise KeyError("Question has not been opened") + return dict(row) + + async def existing_submit_ack(self, sid: str, student_id: str, question_idx: int) -> dict[str, Any] | None: + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + """ + SELECT answer, elapsed_ms, score, status + FROM submissions + WHERE sid = ? AND student_id = ? AND question_idx = ? AND status = 'submitted' + """, + (sid, student_id, question_idx), + ) + row = await cursor.fetchone() + if row is None: + return None + return { + "type": "submit_ack", + "question_idx": question_idx, + "answer": row["answer"], + "score": row["score"], + "elapsed_ms": row["elapsed_ms"], + } + + async def question_open_message(self, sid: str, question_idx: int) -> dict[str, Any]: + pool = await self.get_pool_for_session(sid) + event = await self.get_question_event(sid, question_idx) + opened_ms = int(parse_ts(event["opened_at"]).timestamp() * 1000) + remaining_ms = max(0, opened_ms + int(event["time_limit"]) * 1000 - now_ms()) + msg = {"type": "question_open", **public_question_payload(pool, question_idx)} + msg["opened_at_server_ts"] = opened_ms + msg["remaining_ms"] = remaining_ms + return msg + + async def question_closed_message(self, sid: str, question_idx: int, identity: dict[str, Any] | None = None) -> dict[str, Any]: + pool = await self.get_pool_for_session(sid) + question = get_question(pool, question_idx) + msg = { + "type": "question_closed", + "question_idx": question_idx, + "correct": question["correct"], + "explanation": question.get("explanation", ""), + "histogram": await self.histogram(sid, question_idx), + "top5": await self.leaderboard(sid, limit=5), + } + if identity: + student = identity["student_id"] + submission = await self.submission_for(sid, student, question_idx) + rank = await self.rank_for(sid, student) + total = await self.total_for(sid, student) + msg.update( + { + "your_answer": submission.get("answer") if submission else None, + "your_score": submission.get("score", 0) if submission else 0, + "your_rank": rank, + "your_total": total, + } + ) + return msg + + async def between_message(self, sid: str, next_idx: int, identity: dict[str, Any] | None = None) -> dict[str, Any]: + msg = {"type": "between_questions", "next_idx": next_idx, "top5": await self.leaderboard(sid, limit=5)} + if identity: + msg["your_rank"] = await self.rank_for(sid, identity["student_id"]) + msg["your_total"] = await self.total_for(sid, identity["student_id"]) + return msg + + async def ended_message(self, sid: str, identity: dict[str, Any] | None = None) -> dict[str, Any]: + msg = {"type": "session_ended", "final_top5": await self.leaderboard(sid, limit=5)} + if identity: + student = identity["student_id"] + msg.update(await self.student_summary(sid, student)) + msg["your_rank"] = await self.rank_for(sid, student) + msg["your_total"] = await self.total_for(sid, student) + return msg + + async def histogram(self, sid: str, question_idx: int, pending: bool = False) -> dict[str, int]: + result = {"A": 0, "B": 0, "C": 0, "D": 0, "missed": 0} + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + "SELECT answer, status, COUNT(*) AS count FROM submissions WHERE sid = ? AND question_idx = ? GROUP BY answer, status", + (sid, question_idx), + ) + rows = await cursor.fetchall() + total_cursor = await db.execute("SELECT COUNT(*) AS count FROM participants WHERE sid = ?", (sid,)) + total_row = await total_cursor.fetchone() + submitted = 0 + for row in rows: + if row["status"] == "missed": + result["missed"] += row["count"] + elif row["answer"] in result: + result[row["answer"]] += row["count"] + submitted += row["count"] + if pending: + result["pending"] = max(0, int(total_row["count"]) - submitted - result["missed"]) + return result + + async def leaderboard(self, sid: str, limit: int | None = None, include_ids: bool = False) -> list[dict[str, Any]]: + query_limit = "" if limit is None else f"LIMIT {int(limit)}" + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + f""" + SELECT p.student_id, p.name, COALESCE(SUM(s.score), 0) AS score + FROM participants p + LEFT JOIN submissions s ON s.sid = p.sid AND s.student_id = p.student_id + WHERE p.sid = ? + GROUP BY p.student_id, p.name + ORDER BY score DESC, p.name ASC, p.student_id ASC + {query_limit} + """, + (sid,), + ) + rows = await cursor.fetchall() + board = [] + for rank, row in enumerate(rows, start=1): + item = {"rank": rank, "name": row["name"], "score": int(row["score"])} + if include_ids: + item["student_id"] = row["student_id"] + board.append(item) + return board + + async def rank_for(self, sid: str, student_id: str) -> int | None: + board = await self.leaderboard(sid, include_ids=True) + for item in board: + if item["student_id"] == student_id: + return item["rank"] + return None + + async def total_for(self, sid: str, student_id: str) -> int: + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + "SELECT COALESCE(SUM(score), 0) AS total FROM submissions WHERE sid = ? AND student_id = ?", + (sid, student_id), + ) + row = await cursor.fetchone() + return int(row["total"]) + + async def submission_for(self, sid: str, student_id: str, question_idx: int) -> dict[str, Any] | None: + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + "SELECT * FROM submissions WHERE sid = ? AND student_id = ? AND question_idx = ?", + (sid, student_id, question_idx), + ) + row = await cursor.fetchone() + return dict(row) if row else None + + async def student_summary(self, sid: str, student_id: str) -> dict[str, int]: + pool = await self.get_pool_for_session(sid) + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + "SELECT question_idx, answer, status FROM submissions WHERE sid = ? AND student_id = ?", + (sid, student_id), + ) + rows = await cursor.fetchall() + answered = sum(1 for row in rows if row["status"] == "submitted") + correct = 0 + for row in rows: + if row["status"] == "submitted" and row["answer"] == get_question(pool, row["question_idx"])["correct"]: + correct += 1 + return {"questions_answered": answered, "questions_correct": correct} + + async def lobby_message(self, sid: str) -> dict[str, Any]: + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + "SELECT student_id, name, joined_at FROM participants WHERE sid = ? ORDER BY joined_at, name", + (sid,), + ) + rows = await cursor.fetchall() + participants = [dict(row) for row in rows] + return {"type": "lobby_update", "participants": participants, "count": len(participants)} + + async def live_histogram_message(self, sid: str, question_idx: int) -> dict[str, Any]: + histogram = await self.histogram(sid, question_idx, pending=True) + submitted_count = histogram["A"] + histogram["B"] + histogram["C"] + histogram["D"] + return { + "type": "live_histogram", + "question_idx": question_idx, + "histogram": histogram, + "submitted_count": submitted_count, + "total_count": submitted_count + histogram["missed"] + histogram.get("pending", 0), + } + + async def full_leaderboard_message(self, sid: str) -> dict[str, Any]: + return {"type": "full_leaderboard", "leaderboard": await self.leaderboard(sid, include_ids=True)} + + async def me(self, sid: str, student_id: str) -> dict[str, Any]: + async with connect(self.settings.db_path) as db: + part_cursor = await db.execute("SELECT * FROM participants WHERE sid = ? AND student_id = ?", (sid, student_id)) + participant = await part_cursor.fetchone() + sub_cursor = await db.execute( + "SELECT question_idx, answer, elapsed_ms, score, status FROM submissions WHERE sid = ? AND student_id = ? ORDER BY question_idx", + (sid, student_id), + ) + submissions = await sub_cursor.fetchall() + return { + "student_id": participant["student_id"], + "name": participant["name"], + "total_score": await self.total_for(sid, student_id), + "submissions": [dict(row) for row in submissions], + } + + async def stats(self, sid: str, question_idx: int | None, student_id: str | None = None) -> dict[str, Any]: + session = await self.get_session(sid) + qidx = question_idx if question_idx is not None else session["current_question_idx"] + if qidx is None: + return { + "question_idx": None, + "response_time_avg_ms": None, + "response_time_distribution": {}, + "average_score": 0, + "top5": await self.leaderboard(sid, limit=5), + "your_rank": None, + } + async with connect(self.settings.db_path) as db: + cursor = await db.execute( + """ + SELECT elapsed_ms, score + FROM submissions + WHERE sid = ? AND question_idx = ? AND status = 'submitted' + """, + (sid, qidx), + ) + rows = await cursor.fetchall() + times = [row["elapsed_ms"] for row in rows if row["elapsed_ms"] is not None] + scores = [row["score"] for row in rows] + distribution = {"0_10s": 0, "10_30s": 0, "30s_plus": 0} + for elapsed in times: + if elapsed < 10_000: + distribution["0_10s"] += 1 + elif elapsed < 30_000: + distribution["10_30s"] += 1 + else: + distribution["30s_plus"] += 1 + payload = { + "question_idx": qidx, + "response_time_avg_ms": round(sum(times) / len(times)) if times else None, + "response_time_distribution": distribution, + "average_score": round(sum(scores) / len(scores), 2) if scores else 0, + "top5": await self.leaderboard(sid, limit=5), + } + if student_id: + payload["your_rank"] = await self.rank_for(sid, student_id) + return payload + + async def broadcast_lobby(self, sid: str) -> None: + await self.broadcast_instructors(sid, await self.lobby_message(sid)) + + async def broadcast_question_closed(self, sid: str, question_idx: int) -> None: + for websocket, identity in list(self.student_clients[sid].items()): + await self._safe_send(websocket, await self.question_closed_message(sid, question_idx, identity)) + await self.broadcast_instructors(sid, await self.question_closed_message(sid, question_idx)) + await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid)) + + async def broadcast_between_questions(self, sid: str, next_idx: int) -> None: + for websocket, identity in list(self.student_clients[sid].items()): + await self._safe_send(websocket, await self.between_message(sid, next_idx, identity)) + await self.broadcast_instructors(sid, await self.between_message(sid, next_idx)) + await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid)) + + async def broadcast_session_ended(self, sid: str) -> None: + for websocket, identity in list(self.student_clients[sid].items()): + await self._safe_send(websocket, await self.ended_message(sid, identity)) + await self.broadcast_instructors(sid, await self.ended_message(sid)) + + async def broadcast_students(self, sid: str, message: dict[str, Any]) -> None: + for websocket in list(self.student_clients[sid]): + await self._safe_send(websocket, message) + + async def broadcast_instructors(self, sid: str, message: dict[str, Any]) -> None: + for websocket in list(self.instructor_clients[sid]): + await self._safe_send(websocket, message) + + async def _safe_send(self, websocket: WebSocket, message: dict[str, Any]) -> None: + try: + await websocket.send_text(json.dumps(message)) + except Exception: + pass diff --git a/app/routes_admin.py b/app/routes_admin.py index 345c6b7..81d1f80 100644 --- a/app/routes_admin.py +++ b/app/routes_admin.py @@ -1 +1,177 @@ """Instructor routes.""" + +from __future__ import annotations + +import base64 +import json +import secrets +from io import BytesIO +from typing import Any + +import qrcode +import qrcode.image.svg +from fastapi import APIRouter, File, HTTPException, Request, Response, UploadFile, WebSocket +from fastapi.responses import FileResponse, HTMLResponse, PlainTextResponse + +from app import auth +from app.config import Settings +from app.csv_export import export_session_csv +from app.db import connect +from app.models import QuizCreateRequest, SessionCreateRequest +from app.pool import PoolValidationError, parse_pool_json +from app.room import RoomManager + +CROCKFORD = "0123456789ABCDEFGHJKMNPQRSTVWXYZ" + + +def router(settings: Settings, rooms: RoomManager) -> APIRouter: + api = APIRouter() + + def require_admin(request: Request) -> None: + auth.require_admin_request(settings, request) + + @api.get("/admin/login") + async def login_form(): + return HTMLResponse( + "Admin Login
" + "" + "
" + ) + + @api.post("/admin/login") + async def login(request: Request, response: Response): + password = "" + content_type = request.headers.get("content-type", "") + if "application/json" in content_type: + data = await request.json() + password = str(data.get("password", "")) + else: + form = await request.form() + password = str(form.get("password", "")) + if not auth.verify_admin_password(settings, password): + raise HTTPException(status_code=401, detail="Invalid admin password") + auth.set_admin_cookie(settings, response, auth.sign_admin(settings)) + return {"ok": True} + + @api.get("/admin/") + async def admin_page(request: Request): + require_admin(request) + return FileResponse("static/admin.html") + + @api.get("/admin/api/quizzes") + async def list_quizzes(request: Request): + require_admin(request) + async with connect(settings.db_path) as db: + cursor = await db.execute( + "SELECT id, title, time_limit_default, score_fn_name, created_at FROM quizzes ORDER BY created_at DESC, id DESC" + ) + rows = await cursor.fetchall() + return {"quizzes": [dict(row) for row in rows]} + + @api.post("/admin/api/quizzes") + async def create_quiz(request: Request, body: QuizCreateRequest): + require_admin(request) + try: + pool = parse_pool_json(body.pool_json) + except PoolValidationError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + if body.time_limit_default is not None: + pool["time_limit_default"] = body.time_limit_default + title = (body.title or pool["title"]).strip() + async with connect(settings.db_path) as db: + cursor = await db.execute( + "INSERT INTO quizzes (title, pool_json, time_limit_default, score_fn_name) VALUES (?, ?, ?, ?)", + (title, json.dumps(pool), pool["time_limit_default"], pool["score_fn"]), + ) + await db.commit() + quiz_id = cursor.lastrowid + return {"ok": True, "quiz_id": quiz_id} + + @api.post("/admin/api/quizzes/upload") + async def upload_quiz(request: Request, file: UploadFile = File(...)): + require_admin(request) + raw = (await file.read()).decode("utf-8") + try: + pool = parse_pool_json(raw) + except PoolValidationError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + async with connect(settings.db_path) as db: + cursor = await db.execute( + "INSERT INTO quizzes (title, pool_json, time_limit_default, score_fn_name) VALUES (?, ?, ?, ?)", + (pool["title"], json.dumps(pool), pool["time_limit_default"], pool["score_fn"]), + ) + await db.commit() + quiz_id = cursor.lastrowid + return {"ok": True, "quiz_id": quiz_id} + + @api.get("/admin/api/sessions") + async def list_sessions(request: Request): + require_admin(request) + async with connect(settings.db_path) as db: + cursor = await db.execute( + """ + SELECT s.sid, s.quiz_id, s.title, s.state, s.current_question_idx, s.started_at, s.finished_at, + COUNT(p.student_id) AS participant_count + FROM quiz_sessions s + LEFT JOIN participants p ON p.sid = s.sid + GROUP BY s.sid + ORDER BY s.started_at DESC + """ + ) + rows = await cursor.fetchall() + return {"sessions": [dict(row) for row in rows]} + + @api.post("/admin/api/sessions") + async def create_session(request: Request, body: SessionCreateRequest): + require_admin(request) + async with connect(settings.db_path) as db: + quiz_cursor = await db.execute("SELECT id, title FROM quizzes WHERE id = ?", (body.quiz_id,)) + quiz = await quiz_cursor.fetchone() + if quiz is None: + raise HTTPException(status_code=404, detail="Quiz not found") + sid = await _generate_sid(db) + await db.execute( + "INSERT INTO quiz_sessions (sid, quiz_id, title) VALUES (?, ?, ?)", + (sid, body.quiz_id, quiz["title"]), + ) + await db.commit() + join_url = f"{settings.public_url}/?sid={sid}" + return {"sid": sid, "join_url": join_url, "qr_url": _qr_data_url(join_url)} + + @api.get("/admin/api/sessions/{sid}/csv") + async def csv_download(sid: str, request: Request): + require_admin(request) + if not await rooms.session_exists(sid): + raise HTTPException(status_code=404, detail="Session not found") + csv_text = await export_session_csv(settings.db_path, sid) + return PlainTextResponse( + csv_text, + media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="{sid}-results.csv"'}, + ) + + @api.websocket("/ws/instructor/{sid}") + async def instructor_socket(websocket: WebSocket, sid: str): + if not auth.is_admin_ws(settings, websocket) or not await rooms.session_exists(sid): + await websocket.close(code=4001) + return + await rooms.instructor_ws(websocket, sid) + + return api + + +async def _generate_sid(db: Any) -> str: + for _ in range(5): + sid = "".join(secrets.choice(CROCKFORD) for _ in range(6)) + cursor = await db.execute("SELECT 1 FROM quiz_sessions WHERE sid = ?", (sid,)) + if await cursor.fetchone() is None: + return sid + raise HTTPException(status_code=500, detail="Could not allocate session ID") + + +def _qr_data_url(value: str) -> str: + image = qrcode.make(value, image_factory=qrcode.image.svg.SvgPathImage) + buf = BytesIO() + image.save(buf) + encoded = base64.b64encode(buf.getvalue()).decode("ascii") + return f"data:image/svg+xml;base64,{encoded}" diff --git a/app/routes_student.py b/app/routes_student.py index 620704c..4fbdbcd 100644 --- a/app/routes_student.py +++ b/app/routes_student.py @@ -1 +1,75 @@ """Student routes.""" + +from __future__ import annotations + +from pathlib import Path +from uuid import uuid4 + +from fastapi import APIRouter, HTTPException, Request, Response, WebSocket +from fastapi.responses import FileResponse, HTMLResponse + +from app import auth +from app.config import Settings +from app.models import JoinRequest +from app.room import RoomManager + + +def router(settings: Settings, rooms: RoomManager) -> APIRouter: + api = APIRouter() + + @api.get("/") + async def student_entry(sid: str | None = None): + if not sid or not await rooms.session_exists(sid): + return HTMLResponse( + "Quiz

Ask your instructor for the link

" + "

This quiz link is missing or no longer valid.

" + ) + return FileResponse(Path("static/student.html")) + + @api.get("/api/session/{sid}") + async def session_metadata(sid: str): + if not await rooms.session_exists(sid): + raise HTTPException(status_code=404, detail="Session not found") + session = await rooms.get_session(sid) + return { + "title": session["title"], + "state": session["state"], + "current_question_idx": session["current_question_idx"], + "time_limit_default": (await rooms.get_pool_for_session(sid))["time_limit_default"], + } + + @api.post("/api/session/{sid}/join") + async def join_session(sid: str, body: JoinRequest, response: Response): + if not await rooms.session_exists(sid): + raise HTTPException(status_code=404, detail="Session not found") + student_id = body.student_id.strip() + name = body.name.strip() + cookie_id = str(uuid4()) + cookie_value = auth.sign_student(settings, sid, student_id, name, cookie_id) + await rooms.add_participant(sid, student_id, name, cookie_id) + auth.set_student_cookie(settings, response, cookie_value) + return {"ok": True, "cookie_id": cookie_id} + + @api.get("/api/session/{sid}/me") + async def me(sid: str, request: Request): + identity = auth.get_student_identity(settings, request, sid) + if not identity: + raise HTTPException(status_code=401, detail="Student cookie required") + return await rooms.me(sid, identity["student_id"]) + + @api.get("/api/session/{sid}/stats") + async def stats(sid: str, request: Request, question_idx: int | None = None): + if not await rooms.session_exists(sid): + raise HTTPException(status_code=404, detail="Session not found") + identity = auth.get_student_identity(settings, request, sid) + return await rooms.stats(sid, question_idx, identity["student_id"] if identity else None) + + @api.websocket("/ws/student/{sid}") + async def student_socket(websocket: WebSocket, sid: str): + identity = auth.get_student_identity_ws(settings, websocket, sid) + if not identity or not await rooms.session_exists(sid): + await websocket.close(code=4001) + return + await rooms.student_ws(websocket, sid, identity) + + return api