Add API routes and websocket room manager
This commit is contained in:
623
app/room.py
623
app/room.py
@@ -1 +1,624 @@
|
||||
"""In-process WebSocket room manager."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from collections import defaultdict
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from fastapi import WebSocket, WebSocketDisconnect
|
||||
|
||||
from app.config import Settings
|
||||
from app.db import connect
|
||||
from app.pool import get_question, parse_pool_json, public_question_payload, question_count, question_time_limit
|
||||
from app.scoring import SCORE_FNS
|
||||
|
||||
|
||||
def now_utc() -> datetime:
|
||||
return datetime.now(UTC)
|
||||
|
||||
|
||||
def now_ms() -> int:
|
||||
return int(now_utc().timestamp() * 1000)
|
||||
|
||||
|
||||
def iso_now() -> str:
|
||||
return now_utc().isoformat()
|
||||
|
||||
|
||||
def parse_ts(value: str) -> datetime:
|
||||
if value.endswith("Z"):
|
||||
value = value[:-1] + "+00:00"
|
||||
parsed = datetime.fromisoformat(value)
|
||||
if parsed.tzinfo is None:
|
||||
parsed = parsed.replace(tzinfo=UTC)
|
||||
return parsed
|
||||
|
||||
|
||||
class RoomManager:
|
||||
def __init__(self, settings: Settings):
|
||||
self.settings = settings
|
||||
self.student_clients: dict[str, dict[WebSocket, dict[str, Any]]] = defaultdict(dict)
|
||||
self.instructor_clients: dict[str, set[WebSocket]] = defaultdict(set)
|
||||
self.autoclose_tasks: dict[tuple[str, int], asyncio.Task] = {}
|
||||
self.locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
|
||||
|
||||
async def sessions_active(self) -> int:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute("SELECT COUNT(*) AS count FROM quiz_sessions WHERE state != 'finished'")
|
||||
row = await cursor.fetchone()
|
||||
return int(row["count"])
|
||||
|
||||
def ws_client_count(self) -> int:
|
||||
return sum(len(clients) for clients in self.student_clients.values()) + sum(
|
||||
len(clients) for clients in self.instructor_clients.values()
|
||||
)
|
||||
|
||||
async def add_participant(self, sid: str, student_id: str, name: str, cookie_id: str) -> None:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT INTO participants (sid, student_id, name, cookie_id)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(sid, student_id) DO UPDATE SET name = excluded.name, cookie_id = excluded.cookie_id
|
||||
""",
|
||||
(sid, student_id, name, cookie_id),
|
||||
)
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT OR IGNORE INTO submissions (sid, student_id, question_idx, status, score)
|
||||
SELECT sid, ?, question_idx, 'missed', 0
|
||||
FROM question_events
|
||||
WHERE sid = ? AND closed_at IS NOT NULL
|
||||
""",
|
||||
(student_id, sid),
|
||||
)
|
||||
await db.commit()
|
||||
await self.broadcast_lobby(sid)
|
||||
|
||||
async def student_ws(self, websocket: WebSocket, sid: str, identity: dict[str, Any]) -> None:
|
||||
await websocket.accept()
|
||||
self.student_clients[sid][websocket] = identity
|
||||
try:
|
||||
await self.send_student_snapshot(websocket, sid, identity)
|
||||
while True:
|
||||
data = await websocket.receive_json()
|
||||
msg_type = data.get("type")
|
||||
if msg_type == "ping":
|
||||
await websocket.send_json({"type": "pong"})
|
||||
elif msg_type == "submit":
|
||||
ack = await self.submit_answer(sid, identity["student_id"], data.get("question_idx"), data.get("answer"))
|
||||
await websocket.send_json(ack)
|
||||
else:
|
||||
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Unknown message type"})
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
finally:
|
||||
self.student_clients[sid].pop(websocket, None)
|
||||
|
||||
async def instructor_ws(self, websocket: WebSocket, sid: str) -> None:
|
||||
await websocket.accept()
|
||||
self.instructor_clients[sid].add(websocket)
|
||||
try:
|
||||
await self.send_instructor_snapshot(websocket, sid)
|
||||
while True:
|
||||
data = await websocket.receive_json()
|
||||
msg_type = data.get("type")
|
||||
if msg_type == "ping":
|
||||
await websocket.send_json({"type": "pong"})
|
||||
elif msg_type == "open_question":
|
||||
await self.open_question(sid, int(data.get("question_idx", 0)), data.get("time_limit"))
|
||||
elif msg_type == "close_question":
|
||||
await self.close_question(sid)
|
||||
elif msg_type == "next":
|
||||
await self.next_question(sid)
|
||||
elif msg_type == "end_session":
|
||||
await self.end_session(sid)
|
||||
else:
|
||||
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Unknown message type"})
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
finally:
|
||||
self.instructor_clients[sid].discard(websocket)
|
||||
|
||||
async def send_student_snapshot(self, websocket: WebSocket, sid: str, identity: dict[str, Any]) -> None:
|
||||
session = await self.get_session(sid)
|
||||
await websocket.send_json(
|
||||
{
|
||||
"type": "state",
|
||||
"state": session["state"],
|
||||
"current_question_idx": session["current_question_idx"],
|
||||
"title": session["title"],
|
||||
}
|
||||
)
|
||||
if session["state"] == "question_open":
|
||||
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
|
||||
ack = await self.existing_submit_ack(sid, identity["student_id"], session["current_question_idx"])
|
||||
if ack:
|
||||
await websocket.send_json(ack)
|
||||
|
||||
async def send_instructor_snapshot(self, websocket: WebSocket, sid: str) -> None:
|
||||
session = await self.get_session(sid)
|
||||
await websocket.send_json(
|
||||
{
|
||||
"type": "state",
|
||||
"state": session["state"],
|
||||
"current_question_idx": session["current_question_idx"],
|
||||
"title": session["title"],
|
||||
}
|
||||
)
|
||||
await websocket.send_json(await self.lobby_message(sid))
|
||||
if session["state"] == "question_open":
|
||||
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
|
||||
await websocket.send_json(await self.live_histogram_message(sid, session["current_question_idx"]))
|
||||
if session["state"] in {"question_closed", "between_questions", "finished"}:
|
||||
await websocket.send_json(await self.full_leaderboard_message(sid))
|
||||
|
||||
async def open_question(self, sid: str, question_idx: int, time_limit: int | None = None) -> None:
|
||||
async with self.locks[sid]:
|
||||
session = await self.get_session(sid)
|
||||
if session["state"] == "question_open" and session["current_question_idx"] == question_idx:
|
||||
return
|
||||
if session["state"] == "question_open":
|
||||
await self._close_question_locked(sid, session["current_question_idx"])
|
||||
pool = await self.get_pool_for_session(sid)
|
||||
if question_idx < 0 or question_idx >= question_count(pool):
|
||||
await self.broadcast_instructors(sid, {"type": "error", "code": "bad_question", "message": "Question index out of range"})
|
||||
return
|
||||
limit = int(time_limit or question_time_limit(pool, question_idx))
|
||||
opened = iso_now()
|
||||
async with connect(self.settings.db_path) as db:
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT INTO question_events (sid, question_idx, opened_at, time_limit)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(sid, question_idx) DO UPDATE SET opened_at = excluded.opened_at,
|
||||
closed_at = NULL, time_limit = excluded.time_limit
|
||||
""",
|
||||
(sid, question_idx, opened, limit),
|
||||
)
|
||||
await db.execute(
|
||||
"UPDATE quiz_sessions SET state = 'question_open', current_question_idx = ? WHERE sid = ?",
|
||||
(question_idx, sid),
|
||||
)
|
||||
await db.commit()
|
||||
self._schedule_autoclose(sid, question_idx, limit)
|
||||
msg = await self.question_open_message(sid, question_idx)
|
||||
await self.broadcast_students(sid, msg)
|
||||
await self.broadcast_instructors(sid, msg)
|
||||
await self.broadcast_instructors(sid, await self.live_histogram_message(sid, question_idx))
|
||||
|
||||
async def close_question(self, sid: str) -> None:
|
||||
async with self.locks[sid]:
|
||||
session = await self.get_session(sid)
|
||||
if session["state"] != "question_open":
|
||||
return
|
||||
question_idx = session["current_question_idx"]
|
||||
await self._close_question_locked(sid, question_idx)
|
||||
await self.broadcast_question_closed(sid, question_idx)
|
||||
|
||||
async def _close_question_locked(self, sid: str, question_idx: int) -> None:
|
||||
task = self.autoclose_tasks.pop((sid, question_idx), None)
|
||||
current = asyncio.current_task()
|
||||
if task and task is not current:
|
||||
task.cancel()
|
||||
async with connect(self.settings.db_path) as db:
|
||||
await db.execute(
|
||||
"UPDATE question_events SET closed_at = COALESCE(closed_at, ?) WHERE sid = ? AND question_idx = ?",
|
||||
(iso_now(), sid, question_idx),
|
||||
)
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT OR IGNORE INTO submissions (sid, student_id, question_idx, status, score)
|
||||
SELECT p.sid, p.student_id, ?, 'missed', 0
|
||||
FROM participants p
|
||||
WHERE p.sid = ?
|
||||
""",
|
||||
(question_idx, sid),
|
||||
)
|
||||
await db.execute(
|
||||
"UPDATE quiz_sessions SET state = 'question_closed', current_question_idx = ? WHERE sid = ?",
|
||||
(question_idx, sid),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
async def next_question(self, sid: str) -> None:
|
||||
async with self.locks[sid]:
|
||||
session = await self.get_session(sid)
|
||||
if session["state"] != "question_closed":
|
||||
return
|
||||
next_idx = int(session["current_question_idx"]) + 1
|
||||
async with connect(self.settings.db_path) as db:
|
||||
await db.execute("UPDATE quiz_sessions SET state = 'between_questions' WHERE sid = ?", (sid,))
|
||||
await db.commit()
|
||||
await self.broadcast_between_questions(sid, next_idx)
|
||||
|
||||
async def end_session(self, sid: str) -> None:
|
||||
async with self.locks[sid]:
|
||||
session = await self.get_session(sid)
|
||||
if session["state"] == "question_open":
|
||||
await self._close_question_locked(sid, session["current_question_idx"])
|
||||
async with connect(self.settings.db_path) as db:
|
||||
await db.execute(
|
||||
"UPDATE quiz_sessions SET state = 'finished', current_question_idx = NULL, finished_at = ? WHERE sid = ?",
|
||||
(iso_now(), sid),
|
||||
)
|
||||
await db.commit()
|
||||
await self.broadcast_session_ended(sid)
|
||||
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
|
||||
|
||||
async def submit_answer(self, sid: str, student_id: str, question_idx: Any, answer: Any) -> dict[str, Any]:
|
||||
try:
|
||||
qidx = int(question_idx)
|
||||
except (TypeError, ValueError):
|
||||
return {"type": "error", "code": "bad_question", "message": "Invalid question index"}
|
||||
if answer not in {"A", "B", "C", "D"}:
|
||||
return {"type": "error", "code": "bad_answer", "message": "Answer must be A, B, C, or D"}
|
||||
async with self.locks[sid]:
|
||||
session = await self.get_session(sid)
|
||||
if session["state"] != "question_open" or session["current_question_idx"] != qidx:
|
||||
return {"type": "error", "code": "not_open", "message": "Question is not open"}
|
||||
existing = await self.existing_submit_ack(sid, student_id, qidx)
|
||||
if existing:
|
||||
return existing
|
||||
event = await self.get_question_event(sid, qidx)
|
||||
opened_at = parse_ts(event["opened_at"])
|
||||
elapsed_ms = max(0, int((now_utc() - opened_at).total_seconds() * 1000))
|
||||
time_limit_ms = int(event["time_limit"]) * 1000
|
||||
if elapsed_ms > time_limit_ms:
|
||||
return {"type": "error", "code": "time_expired", "message": "Question time has expired"}
|
||||
pool = await self.get_pool_for_session(sid)
|
||||
question = get_question(pool, qidx)
|
||||
correct = answer == question["correct"]
|
||||
score_fn = SCORE_FNS[pool["score_fn"]]
|
||||
score = score_fn(correct, elapsed_ms, time_limit_ms)
|
||||
submitted_at = iso_now()
|
||||
async with connect(self.settings.db_path) as db:
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT INTO submissions (sid, student_id, question_idx, answer, submitted_at, elapsed_ms, score, status)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, 'submitted')
|
||||
ON CONFLICT(sid, student_id, question_idx) DO NOTHING
|
||||
""",
|
||||
(sid, student_id, qidx, answer, submitted_at, elapsed_ms, score),
|
||||
)
|
||||
await db.commit()
|
||||
await self.broadcast_instructors(sid, await self.live_histogram_message(sid, qidx))
|
||||
return {"type": "submit_ack", "question_idx": qidx, "answer": answer, "score": score, "elapsed_ms": elapsed_ms}
|
||||
|
||||
def _schedule_autoclose(self, sid: str, question_idx: int, time_limit: int) -> None:
|
||||
previous = self.autoclose_tasks.pop((sid, question_idx), None)
|
||||
if previous:
|
||||
previous.cancel()
|
||||
self.autoclose_tasks[(sid, question_idx)] = asyncio.create_task(self._autoclose_after(sid, question_idx, time_limit))
|
||||
|
||||
async def _autoclose_after(self, sid: str, question_idx: int, time_limit: int) -> None:
|
||||
try:
|
||||
await asyncio.sleep(time_limit)
|
||||
async with self.locks[sid]:
|
||||
session = await self.get_session(sid)
|
||||
if session["state"] == "question_open" and session["current_question_idx"] == question_idx:
|
||||
await self._close_question_locked(sid, question_idx)
|
||||
else:
|
||||
return
|
||||
await self.broadcast_question_closed(sid, question_idx)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
async def get_session(self, sid: str) -> dict[str, Any]:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute("SELECT * FROM quiz_sessions WHERE sid = ?", (sid,))
|
||||
row = await cursor.fetchone()
|
||||
if row is None:
|
||||
raise KeyError(f"Unknown session {sid}")
|
||||
return dict(row)
|
||||
|
||||
async def session_exists(self, sid: str) -> bool:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute("SELECT 1 FROM quiz_sessions WHERE sid = ?", (sid,))
|
||||
row = await cursor.fetchone()
|
||||
return row is not None
|
||||
|
||||
async def get_pool_for_session(self, sid: str) -> dict[str, Any]:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"""
|
||||
SELECT q.pool_json
|
||||
FROM quizzes q
|
||||
JOIN quiz_sessions s ON s.quiz_id = q.id
|
||||
WHERE s.sid = ?
|
||||
""",
|
||||
(sid,),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if row is None:
|
||||
raise KeyError(f"Unknown session {sid}")
|
||||
return parse_pool_json(row["pool_json"])
|
||||
|
||||
async def get_question_event(self, sid: str, question_idx: int) -> dict[str, Any]:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT * FROM question_events WHERE sid = ? AND question_idx = ?",
|
||||
(sid, question_idx),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if row is None:
|
||||
raise KeyError("Question has not been opened")
|
||||
return dict(row)
|
||||
|
||||
async def existing_submit_ack(self, sid: str, student_id: str, question_idx: int) -> dict[str, Any] | None:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"""
|
||||
SELECT answer, elapsed_ms, score, status
|
||||
FROM submissions
|
||||
WHERE sid = ? AND student_id = ? AND question_idx = ? AND status = 'submitted'
|
||||
""",
|
||||
(sid, student_id, question_idx),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return {
|
||||
"type": "submit_ack",
|
||||
"question_idx": question_idx,
|
||||
"answer": row["answer"],
|
||||
"score": row["score"],
|
||||
"elapsed_ms": row["elapsed_ms"],
|
||||
}
|
||||
|
||||
async def question_open_message(self, sid: str, question_idx: int) -> dict[str, Any]:
|
||||
pool = await self.get_pool_for_session(sid)
|
||||
event = await self.get_question_event(sid, question_idx)
|
||||
opened_ms = int(parse_ts(event["opened_at"]).timestamp() * 1000)
|
||||
remaining_ms = max(0, opened_ms + int(event["time_limit"]) * 1000 - now_ms())
|
||||
msg = {"type": "question_open", **public_question_payload(pool, question_idx)}
|
||||
msg["opened_at_server_ts"] = opened_ms
|
||||
msg["remaining_ms"] = remaining_ms
|
||||
return msg
|
||||
|
||||
async def question_closed_message(self, sid: str, question_idx: int, identity: dict[str, Any] | None = None) -> dict[str, Any]:
|
||||
pool = await self.get_pool_for_session(sid)
|
||||
question = get_question(pool, question_idx)
|
||||
msg = {
|
||||
"type": "question_closed",
|
||||
"question_idx": question_idx,
|
||||
"correct": question["correct"],
|
||||
"explanation": question.get("explanation", ""),
|
||||
"histogram": await self.histogram(sid, question_idx),
|
||||
"top5": await self.leaderboard(sid, limit=5),
|
||||
}
|
||||
if identity:
|
||||
student = identity["student_id"]
|
||||
submission = await self.submission_for(sid, student, question_idx)
|
||||
rank = await self.rank_for(sid, student)
|
||||
total = await self.total_for(sid, student)
|
||||
msg.update(
|
||||
{
|
||||
"your_answer": submission.get("answer") if submission else None,
|
||||
"your_score": submission.get("score", 0) if submission else 0,
|
||||
"your_rank": rank,
|
||||
"your_total": total,
|
||||
}
|
||||
)
|
||||
return msg
|
||||
|
||||
async def between_message(self, sid: str, next_idx: int, identity: dict[str, Any] | None = None) -> dict[str, Any]:
|
||||
msg = {"type": "between_questions", "next_idx": next_idx, "top5": await self.leaderboard(sid, limit=5)}
|
||||
if identity:
|
||||
msg["your_rank"] = await self.rank_for(sid, identity["student_id"])
|
||||
msg["your_total"] = await self.total_for(sid, identity["student_id"])
|
||||
return msg
|
||||
|
||||
async def ended_message(self, sid: str, identity: dict[str, Any] | None = None) -> dict[str, Any]:
|
||||
msg = {"type": "session_ended", "final_top5": await self.leaderboard(sid, limit=5)}
|
||||
if identity:
|
||||
student = identity["student_id"]
|
||||
msg.update(await self.student_summary(sid, student))
|
||||
msg["your_rank"] = await self.rank_for(sid, student)
|
||||
msg["your_total"] = await self.total_for(sid, student)
|
||||
return msg
|
||||
|
||||
async def histogram(self, sid: str, question_idx: int, pending: bool = False) -> dict[str, int]:
|
||||
result = {"A": 0, "B": 0, "C": 0, "D": 0, "missed": 0}
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT answer, status, COUNT(*) AS count FROM submissions WHERE sid = ? AND question_idx = ? GROUP BY answer, status",
|
||||
(sid, question_idx),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
total_cursor = await db.execute("SELECT COUNT(*) AS count FROM participants WHERE sid = ?", (sid,))
|
||||
total_row = await total_cursor.fetchone()
|
||||
submitted = 0
|
||||
for row in rows:
|
||||
if row["status"] == "missed":
|
||||
result["missed"] += row["count"]
|
||||
elif row["answer"] in result:
|
||||
result[row["answer"]] += row["count"]
|
||||
submitted += row["count"]
|
||||
if pending:
|
||||
result["pending"] = max(0, int(total_row["count"]) - submitted - result["missed"])
|
||||
return result
|
||||
|
||||
async def leaderboard(self, sid: str, limit: int | None = None, include_ids: bool = False) -> list[dict[str, Any]]:
|
||||
query_limit = "" if limit is None else f"LIMIT {int(limit)}"
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
f"""
|
||||
SELECT p.student_id, p.name, COALESCE(SUM(s.score), 0) AS score
|
||||
FROM participants p
|
||||
LEFT JOIN submissions s ON s.sid = p.sid AND s.student_id = p.student_id
|
||||
WHERE p.sid = ?
|
||||
GROUP BY p.student_id, p.name
|
||||
ORDER BY score DESC, p.name ASC, p.student_id ASC
|
||||
{query_limit}
|
||||
""",
|
||||
(sid,),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
board = []
|
||||
for rank, row in enumerate(rows, start=1):
|
||||
item = {"rank": rank, "name": row["name"], "score": int(row["score"])}
|
||||
if include_ids:
|
||||
item["student_id"] = row["student_id"]
|
||||
board.append(item)
|
||||
return board
|
||||
|
||||
async def rank_for(self, sid: str, student_id: str) -> int | None:
|
||||
board = await self.leaderboard(sid, include_ids=True)
|
||||
for item in board:
|
||||
if item["student_id"] == student_id:
|
||||
return item["rank"]
|
||||
return None
|
||||
|
||||
async def total_for(self, sid: str, student_id: str) -> int:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT COALESCE(SUM(score), 0) AS total FROM submissions WHERE sid = ? AND student_id = ?",
|
||||
(sid, student_id),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
return int(row["total"])
|
||||
|
||||
async def submission_for(self, sid: str, student_id: str, question_idx: int) -> dict[str, Any] | None:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT * FROM submissions WHERE sid = ? AND student_id = ? AND question_idx = ?",
|
||||
(sid, student_id, question_idx),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
async def student_summary(self, sid: str, student_id: str) -> dict[str, int]:
|
||||
pool = await self.get_pool_for_session(sid)
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT question_idx, answer, status FROM submissions WHERE sid = ? AND student_id = ?",
|
||||
(sid, student_id),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
answered = sum(1 for row in rows if row["status"] == "submitted")
|
||||
correct = 0
|
||||
for row in rows:
|
||||
if row["status"] == "submitted" and row["answer"] == get_question(pool, row["question_idx"])["correct"]:
|
||||
correct += 1
|
||||
return {"questions_answered": answered, "questions_correct": correct}
|
||||
|
||||
async def lobby_message(self, sid: str) -> dict[str, Any]:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT student_id, name, joined_at FROM participants WHERE sid = ? ORDER BY joined_at, name",
|
||||
(sid,),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
participants = [dict(row) for row in rows]
|
||||
return {"type": "lobby_update", "participants": participants, "count": len(participants)}
|
||||
|
||||
async def live_histogram_message(self, sid: str, question_idx: int) -> dict[str, Any]:
|
||||
histogram = await self.histogram(sid, question_idx, pending=True)
|
||||
submitted_count = histogram["A"] + histogram["B"] + histogram["C"] + histogram["D"]
|
||||
return {
|
||||
"type": "live_histogram",
|
||||
"question_idx": question_idx,
|
||||
"histogram": histogram,
|
||||
"submitted_count": submitted_count,
|
||||
"total_count": submitted_count + histogram["missed"] + histogram.get("pending", 0),
|
||||
}
|
||||
|
||||
async def full_leaderboard_message(self, sid: str) -> dict[str, Any]:
|
||||
return {"type": "full_leaderboard", "leaderboard": await self.leaderboard(sid, include_ids=True)}
|
||||
|
||||
async def me(self, sid: str, student_id: str) -> dict[str, Any]:
|
||||
async with connect(self.settings.db_path) as db:
|
||||
part_cursor = await db.execute("SELECT * FROM participants WHERE sid = ? AND student_id = ?", (sid, student_id))
|
||||
participant = await part_cursor.fetchone()
|
||||
sub_cursor = await db.execute(
|
||||
"SELECT question_idx, answer, elapsed_ms, score, status FROM submissions WHERE sid = ? AND student_id = ? ORDER BY question_idx",
|
||||
(sid, student_id),
|
||||
)
|
||||
submissions = await sub_cursor.fetchall()
|
||||
return {
|
||||
"student_id": participant["student_id"],
|
||||
"name": participant["name"],
|
||||
"total_score": await self.total_for(sid, student_id),
|
||||
"submissions": [dict(row) for row in submissions],
|
||||
}
|
||||
|
||||
async def stats(self, sid: str, question_idx: int | None, student_id: str | None = None) -> dict[str, Any]:
|
||||
session = await self.get_session(sid)
|
||||
qidx = question_idx if question_idx is not None else session["current_question_idx"]
|
||||
if qidx is None:
|
||||
return {
|
||||
"question_idx": None,
|
||||
"response_time_avg_ms": None,
|
||||
"response_time_distribution": {},
|
||||
"average_score": 0,
|
||||
"top5": await self.leaderboard(sid, limit=5),
|
||||
"your_rank": None,
|
||||
}
|
||||
async with connect(self.settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"""
|
||||
SELECT elapsed_ms, score
|
||||
FROM submissions
|
||||
WHERE sid = ? AND question_idx = ? AND status = 'submitted'
|
||||
""",
|
||||
(sid, qidx),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
times = [row["elapsed_ms"] for row in rows if row["elapsed_ms"] is not None]
|
||||
scores = [row["score"] for row in rows]
|
||||
distribution = {"0_10s": 0, "10_30s": 0, "30s_plus": 0}
|
||||
for elapsed in times:
|
||||
if elapsed < 10_000:
|
||||
distribution["0_10s"] += 1
|
||||
elif elapsed < 30_000:
|
||||
distribution["10_30s"] += 1
|
||||
else:
|
||||
distribution["30s_plus"] += 1
|
||||
payload = {
|
||||
"question_idx": qidx,
|
||||
"response_time_avg_ms": round(sum(times) / len(times)) if times else None,
|
||||
"response_time_distribution": distribution,
|
||||
"average_score": round(sum(scores) / len(scores), 2) if scores else 0,
|
||||
"top5": await self.leaderboard(sid, limit=5),
|
||||
}
|
||||
if student_id:
|
||||
payload["your_rank"] = await self.rank_for(sid, student_id)
|
||||
return payload
|
||||
|
||||
async def broadcast_lobby(self, sid: str) -> None:
|
||||
await self.broadcast_instructors(sid, await self.lobby_message(sid))
|
||||
|
||||
async def broadcast_question_closed(self, sid: str, question_idx: int) -> None:
|
||||
for websocket, identity in list(self.student_clients[sid].items()):
|
||||
await self._safe_send(websocket, await self.question_closed_message(sid, question_idx, identity))
|
||||
await self.broadcast_instructors(sid, await self.question_closed_message(sid, question_idx))
|
||||
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
|
||||
|
||||
async def broadcast_between_questions(self, sid: str, next_idx: int) -> None:
|
||||
for websocket, identity in list(self.student_clients[sid].items()):
|
||||
await self._safe_send(websocket, await self.between_message(sid, next_idx, identity))
|
||||
await self.broadcast_instructors(sid, await self.between_message(sid, next_idx))
|
||||
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
|
||||
|
||||
async def broadcast_session_ended(self, sid: str) -> None:
|
||||
for websocket, identity in list(self.student_clients[sid].items()):
|
||||
await self._safe_send(websocket, await self.ended_message(sid, identity))
|
||||
await self.broadcast_instructors(sid, await self.ended_message(sid))
|
||||
|
||||
async def broadcast_students(self, sid: str, message: dict[str, Any]) -> None:
|
||||
for websocket in list(self.student_clients[sid]):
|
||||
await self._safe_send(websocket, message)
|
||||
|
||||
async def broadcast_instructors(self, sid: str, message: dict[str, Any]) -> None:
|
||||
for websocket in list(self.instructor_clients[sid]):
|
||||
await self._safe_send(websocket, message)
|
||||
|
||||
async def _safe_send(self, websocket: WebSocket, message: dict[str, Any]) -> None:
|
||||
try:
|
||||
await websocket.send_text(json.dumps(message))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -1 +1,177 @@
|
||||
"""Instructor routes."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import secrets
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
|
||||
import qrcode
|
||||
import qrcode.image.svg
|
||||
from fastapi import APIRouter, File, HTTPException, Request, Response, UploadFile, WebSocket
|
||||
from fastapi.responses import FileResponse, HTMLResponse, PlainTextResponse
|
||||
|
||||
from app import auth
|
||||
from app.config import Settings
|
||||
from app.csv_export import export_session_csv
|
||||
from app.db import connect
|
||||
from app.models import QuizCreateRequest, SessionCreateRequest
|
||||
from app.pool import PoolValidationError, parse_pool_json
|
||||
from app.room import RoomManager
|
||||
|
||||
CROCKFORD = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"
|
||||
|
||||
|
||||
def router(settings: Settings, rooms: RoomManager) -> APIRouter:
|
||||
api = APIRouter()
|
||||
|
||||
def require_admin(request: Request) -> None:
|
||||
auth.require_admin_request(settings, request)
|
||||
|
||||
@api.get("/admin/login")
|
||||
async def login_form():
|
||||
return HTMLResponse(
|
||||
"<!doctype html><title>Admin Login</title><form method='post'>"
|
||||
"<label>Password <input name='password' type='password'></label>"
|
||||
"<button type='submit'>Log in</button></form>"
|
||||
)
|
||||
|
||||
@api.post("/admin/login")
|
||||
async def login(request: Request, response: Response):
|
||||
password = ""
|
||||
content_type = request.headers.get("content-type", "")
|
||||
if "application/json" in content_type:
|
||||
data = await request.json()
|
||||
password = str(data.get("password", ""))
|
||||
else:
|
||||
form = await request.form()
|
||||
password = str(form.get("password", ""))
|
||||
if not auth.verify_admin_password(settings, password):
|
||||
raise HTTPException(status_code=401, detail="Invalid admin password")
|
||||
auth.set_admin_cookie(settings, response, auth.sign_admin(settings))
|
||||
return {"ok": True}
|
||||
|
||||
@api.get("/admin/")
|
||||
async def admin_page(request: Request):
|
||||
require_admin(request)
|
||||
return FileResponse("static/admin.html")
|
||||
|
||||
@api.get("/admin/api/quizzes")
|
||||
async def list_quizzes(request: Request):
|
||||
require_admin(request)
|
||||
async with connect(settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"SELECT id, title, time_limit_default, score_fn_name, created_at FROM quizzes ORDER BY created_at DESC, id DESC"
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return {"quizzes": [dict(row) for row in rows]}
|
||||
|
||||
@api.post("/admin/api/quizzes")
|
||||
async def create_quiz(request: Request, body: QuizCreateRequest):
|
||||
require_admin(request)
|
||||
try:
|
||||
pool = parse_pool_json(body.pool_json)
|
||||
except PoolValidationError as exc:
|
||||
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
||||
if body.time_limit_default is not None:
|
||||
pool["time_limit_default"] = body.time_limit_default
|
||||
title = (body.title or pool["title"]).strip()
|
||||
async with connect(settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"INSERT INTO quizzes (title, pool_json, time_limit_default, score_fn_name) VALUES (?, ?, ?, ?)",
|
||||
(title, json.dumps(pool), pool["time_limit_default"], pool["score_fn"]),
|
||||
)
|
||||
await db.commit()
|
||||
quiz_id = cursor.lastrowid
|
||||
return {"ok": True, "quiz_id": quiz_id}
|
||||
|
||||
@api.post("/admin/api/quizzes/upload")
|
||||
async def upload_quiz(request: Request, file: UploadFile = File(...)):
|
||||
require_admin(request)
|
||||
raw = (await file.read()).decode("utf-8")
|
||||
try:
|
||||
pool = parse_pool_json(raw)
|
||||
except PoolValidationError as exc:
|
||||
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
||||
async with connect(settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"INSERT INTO quizzes (title, pool_json, time_limit_default, score_fn_name) VALUES (?, ?, ?, ?)",
|
||||
(pool["title"], json.dumps(pool), pool["time_limit_default"], pool["score_fn"]),
|
||||
)
|
||||
await db.commit()
|
||||
quiz_id = cursor.lastrowid
|
||||
return {"ok": True, "quiz_id": quiz_id}
|
||||
|
||||
@api.get("/admin/api/sessions")
|
||||
async def list_sessions(request: Request):
|
||||
require_admin(request)
|
||||
async with connect(settings.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"""
|
||||
SELECT s.sid, s.quiz_id, s.title, s.state, s.current_question_idx, s.started_at, s.finished_at,
|
||||
COUNT(p.student_id) AS participant_count
|
||||
FROM quiz_sessions s
|
||||
LEFT JOIN participants p ON p.sid = s.sid
|
||||
GROUP BY s.sid
|
||||
ORDER BY s.started_at DESC
|
||||
"""
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return {"sessions": [dict(row) for row in rows]}
|
||||
|
||||
@api.post("/admin/api/sessions")
|
||||
async def create_session(request: Request, body: SessionCreateRequest):
|
||||
require_admin(request)
|
||||
async with connect(settings.db_path) as db:
|
||||
quiz_cursor = await db.execute("SELECT id, title FROM quizzes WHERE id = ?", (body.quiz_id,))
|
||||
quiz = await quiz_cursor.fetchone()
|
||||
if quiz is None:
|
||||
raise HTTPException(status_code=404, detail="Quiz not found")
|
||||
sid = await _generate_sid(db)
|
||||
await db.execute(
|
||||
"INSERT INTO quiz_sessions (sid, quiz_id, title) VALUES (?, ?, ?)",
|
||||
(sid, body.quiz_id, quiz["title"]),
|
||||
)
|
||||
await db.commit()
|
||||
join_url = f"{settings.public_url}/?sid={sid}"
|
||||
return {"sid": sid, "join_url": join_url, "qr_url": _qr_data_url(join_url)}
|
||||
|
||||
@api.get("/admin/api/sessions/{sid}/csv")
|
||||
async def csv_download(sid: str, request: Request):
|
||||
require_admin(request)
|
||||
if not await rooms.session_exists(sid):
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
csv_text = await export_session_csv(settings.db_path, sid)
|
||||
return PlainTextResponse(
|
||||
csv_text,
|
||||
media_type="text/csv",
|
||||
headers={"Content-Disposition": f'attachment; filename="{sid}-results.csv"'},
|
||||
)
|
||||
|
||||
@api.websocket("/ws/instructor/{sid}")
|
||||
async def instructor_socket(websocket: WebSocket, sid: str):
|
||||
if not auth.is_admin_ws(settings, websocket) or not await rooms.session_exists(sid):
|
||||
await websocket.close(code=4001)
|
||||
return
|
||||
await rooms.instructor_ws(websocket, sid)
|
||||
|
||||
return api
|
||||
|
||||
|
||||
async def _generate_sid(db: Any) -> str:
|
||||
for _ in range(5):
|
||||
sid = "".join(secrets.choice(CROCKFORD) for _ in range(6))
|
||||
cursor = await db.execute("SELECT 1 FROM quiz_sessions WHERE sid = ?", (sid,))
|
||||
if await cursor.fetchone() is None:
|
||||
return sid
|
||||
raise HTTPException(status_code=500, detail="Could not allocate session ID")
|
||||
|
||||
|
||||
def _qr_data_url(value: str) -> str:
|
||||
image = qrcode.make(value, image_factory=qrcode.image.svg.SvgPathImage)
|
||||
buf = BytesIO()
|
||||
image.save(buf)
|
||||
encoded = base64.b64encode(buf.getvalue()).decode("ascii")
|
||||
return f"data:image/svg+xml;base64,{encoded}"
|
||||
|
||||
@@ -1 +1,75 @@
|
||||
"""Student routes."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Request, Response, WebSocket
|
||||
from fastapi.responses import FileResponse, HTMLResponse
|
||||
|
||||
from app import auth
|
||||
from app.config import Settings
|
||||
from app.models import JoinRequest
|
||||
from app.room import RoomManager
|
||||
|
||||
|
||||
def router(settings: Settings, rooms: RoomManager) -> APIRouter:
|
||||
api = APIRouter()
|
||||
|
||||
@api.get("/")
|
||||
async def student_entry(sid: str | None = None):
|
||||
if not sid or not await rooms.session_exists(sid):
|
||||
return HTMLResponse(
|
||||
"<!doctype html><title>Quiz</title><main><h1>Ask your instructor for the link</h1>"
|
||||
"<p>This quiz link is missing or no longer valid.</p></main>"
|
||||
)
|
||||
return FileResponse(Path("static/student.html"))
|
||||
|
||||
@api.get("/api/session/{sid}")
|
||||
async def session_metadata(sid: str):
|
||||
if not await rooms.session_exists(sid):
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
session = await rooms.get_session(sid)
|
||||
return {
|
||||
"title": session["title"],
|
||||
"state": session["state"],
|
||||
"current_question_idx": session["current_question_idx"],
|
||||
"time_limit_default": (await rooms.get_pool_for_session(sid))["time_limit_default"],
|
||||
}
|
||||
|
||||
@api.post("/api/session/{sid}/join")
|
||||
async def join_session(sid: str, body: JoinRequest, response: Response):
|
||||
if not await rooms.session_exists(sid):
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
student_id = body.student_id.strip()
|
||||
name = body.name.strip()
|
||||
cookie_id = str(uuid4())
|
||||
cookie_value = auth.sign_student(settings, sid, student_id, name, cookie_id)
|
||||
await rooms.add_participant(sid, student_id, name, cookie_id)
|
||||
auth.set_student_cookie(settings, response, cookie_value)
|
||||
return {"ok": True, "cookie_id": cookie_id}
|
||||
|
||||
@api.get("/api/session/{sid}/me")
|
||||
async def me(sid: str, request: Request):
|
||||
identity = auth.get_student_identity(settings, request, sid)
|
||||
if not identity:
|
||||
raise HTTPException(status_code=401, detail="Student cookie required")
|
||||
return await rooms.me(sid, identity["student_id"])
|
||||
|
||||
@api.get("/api/session/{sid}/stats")
|
||||
async def stats(sid: str, request: Request, question_idx: int | None = None):
|
||||
if not await rooms.session_exists(sid):
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
identity = auth.get_student_identity(settings, request, sid)
|
||||
return await rooms.stats(sid, question_idx, identity["student_id"] if identity else None)
|
||||
|
||||
@api.websocket("/ws/student/{sid}")
|
||||
async def student_socket(websocket: WebSocket, sid: str):
|
||||
identity = auth.get_student_identity_ws(settings, websocket, sid)
|
||||
if not identity or not await rooms.session_exists(sid):
|
||||
await websocket.close(code=4001)
|
||||
return
|
||||
await rooms.student_ws(websocket, sid, identity)
|
||||
|
||||
return api
|
||||
|
||||
Reference in New Issue
Block a user