138 lines
5.6 KiB
Python
138 lines
5.6 KiB
Python
"""Tests for pass (проходка) management."""
|
|
import pytest
|
|
import sqlite3
|
|
import time
|
|
import secrets
|
|
from tests.conftest import auth_headers
|
|
import auth
|
|
|
|
|
|
class TestPassActivate:
|
|
"""Test /auth/pass/activate endpoint."""
|
|
|
|
def test_activate_valid_pass(self, client, logged_in_user):
|
|
"""Create a pass code and activate it."""
|
|
pass_code = f"TEST-PASS-{secrets.token_hex(4)}"
|
|
|
|
# Create a pass in DB (use auth.AUTH_DB which is patched by conftest)
|
|
conn = sqlite3.connect(str(auth.AUTH_DB))
|
|
conn.execute(
|
|
"INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 0)",
|
|
(pass_code,)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
resp = client.post("/auth/pass/activate", json={
|
|
"pass_code": pass_code
|
|
}, headers=auth_headers(logged_in_user["access_token"]))
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "message" in data
|
|
assert "success" in data and data["success"] is True
|
|
|
|
# Verify pass is now used
|
|
conn = sqlite3.connect(str(auth.AUTH_DB))
|
|
row = conn.execute("SELECT uses, activated_by FROM passes WHERE code = ?", (pass_code,)).fetchone()
|
|
conn.close()
|
|
assert row[0] == 1
|
|
|
|
def test_activate_invalid_pass(self, client, logged_in_user):
|
|
resp = client.post("/auth/pass/activate", json={
|
|
"pass_code": "NONEXISTENT-CODE"
|
|
}, headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code == 404
|
|
|
|
def test_activate_already_used_pass(self, client, logged_in_user):
|
|
"""Create an already-used pass."""
|
|
pass_code = f"USED-PASS-{secrets.token_hex(4)}"
|
|
|
|
conn = sqlite3.connect(str(auth.AUTH_DB))
|
|
conn.execute(
|
|
"INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 1)",
|
|
(pass_code,)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
resp = client.post("/auth/pass/activate", json={
|
|
"pass_code": pass_code
|
|
}, headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code in (400, 404) # 400 for max uses reached, 404 for not found
|
|
|
|
def test_activate_pass_empty_code(self, client, logged_in_user):
|
|
resp = client.post("/auth/pass/activate", json={
|
|
"pass_code": ""
|
|
}, headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code == 422
|
|
|
|
|
|
class TestPassMyStatus:
|
|
"""Test /auth/pass/my endpoint."""
|
|
|
|
def test_my_pass_no_pass(self, client, logged_in_user):
|
|
resp = client.get("/auth/pass/my", headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data == {"has_active": False}
|
|
|
|
def test_my_pass_with_pass(self, client, logged_in_user_with_pass):
|
|
conn = sqlite3.connect(str(auth.AUTH_DB))
|
|
pass_code = f"PASS-{secrets.token_hex(4)}"
|
|
conn.execute("INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 0)", (pass_code,))
|
|
conn.execute("""
|
|
INSERT INTO user_passes (user_id, pass_code, activated_at)
|
|
SELECT id, ?, ? FROM users WHERE username = ?
|
|
""", (pass_code, time.time(), logged_in_user_with_pass["username"]))
|
|
conn.execute("UPDATE passes SET uses = 1 WHERE code = ?", (pass_code,))
|
|
conn.commit()
|
|
conn.close()
|
|
resp = client.get("/auth/pass/my", headers=auth_headers(logged_in_user_with_pass["access_token"]))
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data == {"has_active": True}
|
|
|
|
def test_my_pass_after_activation(self, client, logged_in_user):
|
|
pass_code = f"AFTER-{secrets.token_hex(4)}"
|
|
conn = sqlite3.connect(str(auth.AUTH_DB))
|
|
conn.execute("INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 0)", (pass_code,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
resp = client.post("/auth/pass/activate", json={"pass_code": pass_code},
|
|
headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code == 200
|
|
|
|
resp = client.get("/auth/pass/my", headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data == {"has_active": True}
|
|
|
|
def test_my_pass_stale_jwt_role(self, client, registered_user):
|
|
"""Test that /auth/pass/my works even if JWT has stale role.
|
|
|
|
Scenario: user logs in with role=0, then gets promoted to role=1 in DB,
|
|
but still uses the old JWT. The endpoint should check DB directly."""
|
|
resp = client.post("/auth/login", json=registered_user)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
old_token = data["access_token"]
|
|
assert data["role"] == 0
|
|
|
|
conn = sqlite3.connect(str(auth.AUTH_DB))
|
|
conn.execute("UPDATE users SET role = 1 WHERE username = ?", (registered_user["username"],))
|
|
pass_code = f"STALE-{secrets.token_hex(4)}"
|
|
conn.execute("INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 0)", (pass_code,))
|
|
conn.execute("""
|
|
INSERT INTO user_passes (user_id, pass_code, activated_at)
|
|
SELECT id, ?, ? FROM users WHERE username = ?
|
|
""", (pass_code, time.time(), registered_user["username"]))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
resp = client.get("/auth/pass/my", headers=auth_headers(old_token))
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data == {"has_active": True}, "Should detect active pass despite stale JWT role"
|