Closes the post-recovery re-attack window. Previously cookies were authenticated purely cryptographically — once a hijacker received a signed cookie for student_id=X, that cookie remained valid forever (until QUIZ_SECRET_KEY rotated), even after admin clear-student + legit re-claim issued a fresh cookie_id for X. Now /me, /event, and /ws/student all check that the cookie's cookie_id matches participants.cookie_id for the (sid, student_id). Mismatch -> 401 + Set-Cookie clearing for HTTP, ws.close(4001) for WS. The legit re-claim wins because admin clear_student deletes the row and the next join inserts the new student's cookie_id; the hijacker's cookie now fails the DB check on every subsequent request. Test: tests/test_anti_cheat.py::test_post_recovery_old_cookie_is_dead covers the full hijack -> clear -> re-claim -> hijacker-locked-out sequence end to end.
126 lines
5.7 KiB
Python
126 lines
5.7 KiB
Python
"""Anti-cheat / audit-event coverage:
|
|
- tab-blur events are recorded and surfaced in CSV + presence
|
|
- duplicate-join attempts are 409 + audited
|
|
- admin clear-student removes the participant + submissions
|
|
- submit lockout (one answer per Q per student) is server-enforced
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from conftest import admin_login, join_student
|
|
|
|
|
|
def test_blur_event_is_logged_and_counted(client, sid):
|
|
join_student(client, sid, "s1", "Alice")
|
|
response = client.post(f"/api/session/{sid}/event", json={"kind": "blur", "question_idx": 0})
|
|
assert response.status_code == 200
|
|
response = client.post(f"/api/session/{sid}/event", json={"kind": "blur", "question_idx": 0})
|
|
assert response.status_code == 200
|
|
response = client.post(f"/api/session/{sid}/event", json={"kind": "visibility_hidden"})
|
|
assert response.status_code == 200
|
|
|
|
# The event count is exposed via CSV export. Two blur events + one
|
|
# visibility_hidden event should land on the s1 row.
|
|
admin_login(client)
|
|
csv_text = client.get("/admin/api/csv").text
|
|
s1_row = next(line for line in csv_text.splitlines() if ",s1," in line)
|
|
# Trailing fields are blur_count, hidden_count, duplicate_join_attempts.
|
|
assert s1_row.endswith(",2,1,0"), s1_row
|
|
|
|
|
|
def test_event_endpoint_rejects_unknown_kind(client, sid):
|
|
join_student(client, sid, "s1", "Alice")
|
|
response = client.post(f"/api/session/{sid}/event", json={"kind": "screenshot"})
|
|
assert response.status_code == 422
|
|
|
|
|
|
def test_event_endpoint_requires_student_cookie(client, sid):
|
|
response = client.post(f"/api/session/{sid}/event", json={"kind": "blur"})
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_duplicate_join_is_logged_in_csv(client, sid):
|
|
"""A 409 join attempt records a `duplicate_join` audit row whose
|
|
count rolls up into CSV + presence_update."""
|
|
join_student(client, sid, "s1", "Alice")
|
|
# Second client tries to claim s1 from a fresh cookie jar.
|
|
fresh = client.__class__(client.app)
|
|
response = fresh.post(f"/api/session/{sid}/join", json={"student_id": "s1", "name": "Hijacker"})
|
|
assert response.status_code == 409
|
|
|
|
admin_login(client)
|
|
csv_text = client.get("/admin/api/csv").text
|
|
s1_row = next(line for line in csv_text.splitlines() if ",s1," in line)
|
|
assert s1_row.endswith(",0,0,1"), s1_row
|
|
|
|
|
|
def test_admin_clear_student_frees_id(client, sid):
|
|
"""First-claim-wins recovery: admin can clear a participant so the
|
|
legitimate student (or anyone, since there's no further identity
|
|
check) can re-join with that id."""
|
|
join_student(client, sid, "s1", "Alice")
|
|
admin_login(client)
|
|
response = client.delete("/admin/api/students/s1")
|
|
assert response.status_code == 200
|
|
# The slot is now free; the same id can be re-claimed from a fresh
|
|
# cookie jar.
|
|
fresh = client.__class__(client.app)
|
|
response = fresh.post(f"/api/session/{sid}/join", json={"student_id": "s1", "name": "Alice Again"})
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_admin_clear_student_404s_when_no_match(client, sid):
|
|
admin_login(client)
|
|
assert client.delete("/admin/api/students/nobody").status_code == 404
|
|
|
|
|
|
def test_post_recovery_old_cookie_is_dead(client, sid):
|
|
"""Hijack -> recovery flow: after admin clears a hijacked id and the
|
|
legitimate student re-claims with a fresh cookie_id, the original
|
|
hijacker's still-cryptographically-valid cookie must NOT continue to
|
|
authenticate. The DB cookie_id check is what closes that gap."""
|
|
Hijacker = client.__class__
|
|
hijacker = Hijacker(client.app)
|
|
response = hijacker.post(f"/api/session/{sid}/join", json={"student_id": "alice", "name": "Hijacker"})
|
|
assert response.status_code == 200
|
|
# Hijacker's cookie/me works while they hold the slot.
|
|
assert hijacker.get(f"/api/session/{sid}/me").status_code == 200
|
|
|
|
# Admin clears the hijacked id; legit student re-claims with a fresh
|
|
# browser (= fresh cookie jar = fresh signed cookie_id).
|
|
admin_login(client)
|
|
assert client.delete(f"/admin/api/students/alice").status_code == 200
|
|
legit = Hijacker(client.app)
|
|
response = legit.post(f"/api/session/{sid}/join", json={"student_id": "alice", "name": "Real Alice"})
|
|
assert response.status_code == 200
|
|
assert legit.get(f"/api/session/{sid}/me").json()["name"] == "Real Alice"
|
|
|
|
# The hijacker's old (still-cryptographically-valid) cookie now fails
|
|
# auth because its cookie_id doesn't match the DB row anymore.
|
|
response = hijacker.get(f"/api/session/{sid}/me")
|
|
assert response.status_code == 401
|
|
# And the cookie should be cleared so their browser bounces back to
|
|
# the join form rather than retrying with the dead cookie.
|
|
assert any(
|
|
h.lower() == "set-cookie" and "qz_student" in v and ("max-age=0" in v.lower() or "expires=" in v.lower())
|
|
for h, v in response.headers.items()
|
|
), response.headers
|
|
# Same goes for the audit-event endpoint.
|
|
response = hijacker.post(f"/api/session/{sid}/event", json={"kind": "blur"})
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_submit_lockout_is_server_enforced(client, sid):
|
|
"""Server-side: a second submit for the same (sid, student_id, qidx)
|
|
returns the *original* ack rather than overwriting the answer. The
|
|
PK constraint + existing_submit_ack early-return guarantees this."""
|
|
join_student(client, sid, "s1", "Alice")
|
|
rooms = client.app.state.rooms
|
|
client.portal.call(rooms.open_question, sid, 0, 5)
|
|
first = client.portal.call(rooms.submit_answer, sid, "s1", 0, "B")
|
|
second = client.portal.call(rooms.submit_answer, sid, "s1", 0, "C")
|
|
assert first["type"] == "submit_ack"
|
|
assert second["type"] == "submit_ack"
|
|
assert second["answer"] == first["answer"] == "B"
|
|
assert second["score"] == first["score"]
|