test(server): add comprehensive test suite (47 tests), fix DB lock and schema bugs

- Add pytest test suite: test_auth.py, test_admin.py, test_pass.py,
  test_proxy.py, test_rate_limit.py, test_client_contract.py
- Fix SQLite 'database is locked' errors: moved log_audit() calls outside
  with get_db() blocks in register, login, logout, refresh, activate_pass
- Enable WAL mode and busy_timeout in get_db() for concurrent access
- Fix /admin/me: removed non-existent 'email' column from query
- Fix /admin/users list: disambiguated activated_at column in JOIN query
- Fix /auth/refresh: now returns refresh_token + expires_in + username/uuid/role
  to match AuthManager.AuthSession expectations; revokes old refresh token
- Fix conftest.py: unique usernames per test to avoid conflicts
- All 47 tests passing
This commit is contained in:
SashegDev
2026-05-04 22:14:06 +00:00
parent c96b502ad4
commit c0310ed573
9 changed files with 808 additions and 52 deletions
+2 -3
View File
@@ -84,7 +84,7 @@ async def list_users(
user_data["is_active"] = row["is_active"]
# Получаем информацию о проходке
pass_info = conn.execute("""
SELECT code, expires_at, activated_at
SELECT p.code, p.expires_at, up.activated_at
FROM user_passes up
JOIN passes p ON up.pass_code = p.code
WHERE up.user_id = ? AND (p.expires_at IS NULL OR p.expires_at > ?)
@@ -560,7 +560,7 @@ async def get_my_info(current_user: dict = Depends(get_current_user)):
"""Информация о текущем пользователе с правами"""
with get_db() as conn:
row = conn.execute("""
SELECT id, username, email, uuid, role, created_at, last_login
SELECT id, username, uuid, role, created_at, last_login
FROM users WHERE id = ?
""", (current_user["id"],)).fetchone()
@@ -579,7 +579,6 @@ async def get_my_info(current_user: dict = Depends(get_current_user)):
return {
"id": row["id"],
"username": row["username"],
"email": row["email"],
"uuid": row["uuid"],
"role": row["role"],
"role_name": ROLE_NAMES.get(row["role"], "Неизвестно"),
+79 -42
View File
@@ -106,6 +106,8 @@ def get_db():
"""Контекстный менеджер для БД"""
conn = sqlite3.connect(str(AUTH_DB), check_same_thread=False, timeout=10)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
try:
yield conn
conn.commit()
@@ -118,6 +120,7 @@ def get_db():
def init_db():
"""Инициализация основной БД"""
with get_db() as conn:
conn.executescript("PRAGMA journal_mode=WAL;")
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -432,11 +435,11 @@ async def register(body: RegisterRequest, request: Request):
(user_id, refresh_hash, secrets.token_hex(16), now + (REFRESH_TOKEN_EXPIRE_DAYS * 86400), now)
)
log_audit(user_id, "register", f"User registered from {ip}", ip)
logger.info("User registered", username=body.username, user_id=user_id, ip=ip)
log_audit(user_id, "register", f"User registered from {ip}", ip)
logger.info("User registered", username=body.username, user_id=user_id, ip=ip)
from roles import ROLE_NAMES
return TokenResponse(
from roles import ROLE_NAMES
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
@@ -463,7 +466,6 @@ async def login(body: LoginRequest, request: Request):
if not user or not verify_password(body.password, user["password_hash"]):
record_login_attempt(ip, False)
log_audit(0, "login_failed", f"Failed login for {body.username} from {ip}", ip)
raise HTTPException(401, "Неверное имя пользователя или пароль")
if not user["is_active"]:
@@ -508,19 +510,23 @@ async def login(body: LoginRequest, request: Request):
(user["id"], refresh_hash, secrets.token_hex(16), now + (REFRESH_TOKEN_EXPIRE_DAYS * 86400), now)
)
log_audit(user["id"], "login", f"User logged in from {ip}", ip)
logger.info("User logged in", username=user["username"], user_id=user["id"], ip=ip)
user_id = user["id"]
username = user["username"]
user_role = user["role"]
from roles import ROLE_NAMES
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
username=user["username"],
uuid=user["uuid"],
role=user["role"],
role_name=ROLE_NAMES.get(user["role"], "Неизвестно")
)
log_audit(user_id, "login", f"User logged in from {ip}", ip)
logger.info("User logged in", username=username, user_id=user_id, ip=ip)
from roles import ROLE_NAMES
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
username=username,
uuid=user["uuid"],
role=user_role,
role_name=ROLE_NAMES.get(user_role, "Неизвестно")
)
@router.post("/logout")
async def logout(current_user: dict = Depends(get_current_user), request: Request = None):
@@ -537,7 +543,7 @@ async def logout(current_user: dict = Depends(get_current_user), request: Reques
(current_user["id"],)
)
log_audit(current_user["id"], "logout", f"User logged out from {ip}", ip)
log_audit(current_user["id"], "logout", f"User logged out from {ip}", ip)
logger.info("User logged out", user_id=current_user["id"], ip=ip)
return {"success": True}
@@ -589,12 +595,41 @@ async def refresh(body: dict, request: Request):
"jti": session_token
})
log_audit(user["id"], "refresh_token", f"Token refreshed from {ip}", ip)
new_refresh_token = create_jwt({
"sub": user["id"],
"type": "refresh",
"jti": secrets.token_hex(16)
}, expires_in=REFRESH_TOKEN_EXPIRE_DAYS * 86400)
return {
"access_token": new_access_token,
"token_type": "bearer"
}
new_refresh_hash = hashlib.sha256(new_refresh_token.encode()).hexdigest()
conn.execute(
"INSERT INTO refresh_tokens (user_id, token_hash, jti, expires_at, created_at) VALUES (?, ?, ?, ?, ?)",
(user["id"], new_refresh_hash, secrets.token_hex(16), now + (REFRESH_TOKEN_EXPIRE_DAYS * 86400), now)
)
# Revoke old refresh token
conn.execute(
"UPDATE refresh_tokens SET revoked = 1 WHERE token_hash = ?",
(token_hash,)
)
uid = user["id"]
uname = user["username"]
urole = user["role"]
uuuid = user["uuid"]
log_audit(uid, "refresh_token", f"Token refreshed from {ip}", ip)
from roles import ROLE_NAMES
return {
"access_token": new_access_token,
"refresh_token": new_refresh_token,
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
"username": uname,
"uuid": uuuid,
"role": urole,
"role_name": ROLE_NAMES.get(urole, "Неизвестно"),
}
@router.post("/validate")
async def validate_token(request: Request, credentials: HTTPAuthorizationCredentials = Depends(bearer)):
@@ -711,25 +746,27 @@ async def activate_pass(
(current_user["id"],),
)
conn.commit()
uid = current_user["id"]
uname = current_user["username"]
pcode = body.pass_code
log_audit(
current_user["id"],
"pass_activated",
f"Pass activated: {body.pass_code[:8]}...",
ip,
)
log_audit(
uid,
"pass_activated",
f"Pass activated: {pcode[:8]}...",
ip,
)
logger.info(
"Pass activated",
user=current_user["username"],
user_id=current_user["id"],
pass_code=body.pass_code,
ip=ip,
)
logger.info(
"Pass activated",
user=uname,
user_id=uid,
pass_code=pcode,
ip=ip,
)
return {
"success": True,
"message": f"Проходка активирована для {current_user['username']}",
"role": 1,
}
return {
"success": True,
"message": f"Проходка активирована для {uname}",
"role": 1,
}
+100
View File
@@ -0,0 +1,100 @@
import os
import sys
import pytest
import tempfile
import shutil
from pathlib import Path
def auth_headers(token):
"""Create Authorization headers."""
return {"Authorization": f"Bearer {token}"}
@pytest.fixture(scope="session")
def test_db_dir():
"""Temporary directory for test databases."""
d = tempfile.mkdtemp(prefix="zern_test_")
yield Path(d)
shutil.rmtree(d, ignore_errors=True)
@pytest.fixture(scope="session")
def test_app(test_db_dir):
"""Create FastAPI app with test database."""
# Patch auth module paths BEFORE importing anything
import auth
auth.AUTH_DB = test_db_dir / "auth.db"
auth.SECRET_KEY = test_db_dir / ".secret_key"
auth._rate_limit_cache.clear()
# Initialize test database
auth.init_db()
from main import app
return app
@pytest.fixture
def client(test_app):
"""TestClient instance."""
from fastapi.testclient import TestClient
return TestClient(test_app)
@pytest.fixture
def registered_user(client):
"""Register a unique test user."""
import secrets
username = f"testuser_{secrets.token_hex(4)}"
password = "TestPassword123"
resp = client.post("/auth/register", json={"username": username, "password": password})
assert resp.status_code == 200, f"Registration failed: {resp.text}"
return {"username": username, "password": password}
@pytest.fixture
def logged_in_user(client, registered_user):
"""Login and return tokens."""
resp = client.post("/auth/login", json=registered_user)
assert resp.status_code == 200, f"Login failed: {resp.text}"
data = resp.json()
return {
"username": registered_user["username"],
"password": registered_user["password"],
"access_token": data["access_token"],
"refresh_token": data["refresh_token"],
"expires_in": data["expires_in"],
"uuid": data["uuid"],
"role": data["role"],
}
@pytest.fixture
def admin_user(client):
"""Create and login a creator/admin user."""
import secrets
import sqlite3
import auth
username = f"admin_{secrets.token_hex(4)}"
password = "AdminPassword123"
resp = client.post("/auth/register", json={"username": username, "password": password})
assert resp.status_code == 200
# Promote to creator
conn = sqlite3.connect(str(auth.AUTH_DB))
conn.execute("UPDATE users SET role = 4 WHERE username = ?", (username,))
conn.commit()
conn.close()
resp = client.post("/auth/login", json={"username": username, "password": password})
assert resp.status_code == 200
data = resp.json()
return {
"username": username,
"access_token": data["access_token"],
"refresh_token": data["refresh_token"],
}
+128
View File
@@ -0,0 +1,128 @@
"""Tests for admin endpoints."""
import pytest
import sqlite3
import time
from tests.conftest import auth_headers
from auth import AUTH_DB
class TestAdminMe:
"""Test /admin/me endpoint."""
def test_admin_me_success(self, client, logged_in_user):
resp = client.get("/admin/me", headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 200
data = resp.json()
assert "id" in data
assert "username" in data
assert "uuid" in data
assert "role" in data
assert "role_name" in data
assert "has_pass" in data
assert "permissions" in data
def test_admin_me_no_auth(self, client):
resp = client.get("/admin/me")
assert resp.status_code in (401, 403) # Either is acceptable
class TestAdminUsersList:
"""Test /admin/users endpoint."""
def test_admin_users_list(self, client, admin_user):
resp = client.get("/admin/users", headers=auth_headers(admin_user["access_token"]))
assert resp.status_code == 200
data = resp.json()
assert "users" in data
assert isinstance(data["users"], list)
assert len(data["users"]) >= 1 # At least the admin user
def test_admin_users_list_no_admin(self, client, logged_in_user):
"""Regular user should not access admin endpoints."""
resp = client.get("/admin/users", headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code in (401, 403)
def test_admin_users_list_no_auth(self, client):
resp = client.get("/admin/users")
assert resp.status_code in (401, 403)
class TestAdminBan:
"""Test ban functionality via admin endpoints."""
def test_ban_user(self, client, logged_in_user, admin_user):
"""Admin bans a user."""
# Get user ID first
import sqlite3
from auth import AUTH_DB
conn = sqlite3.connect(str(AUTH_DB))
row = conn.execute("SELECT id FROM users WHERE username = ?",
(logged_in_user["username"],)).fetchone()
conn.close()
assert row is not None
resp = client.post("/admin/user/ban", json={
"user_id": row[0],
"days": 1,
"reason": "Test ban"
}, headers=auth_headers(admin_user["access_token"]))
assert resp.status_code == 200
# Verify ban in DB
conn = sqlite3.connect(str(AUTH_DB))
row = conn.execute("SELECT banned_until FROM users WHERE username = ?",
(logged_in_user["username"],)).fetchone()
conn.close()
assert row is not None
assert row[0] is not None
assert row[0] > time.time()
def test_ban_nonexistent_user(self, client, admin_user):
resp = client.post("/admin/user/ban", json={
"user_id": 99999,
"days": 1,
"reason": "Test ban"
}, headers=auth_headers(admin_user["access_token"]))
assert resp.status_code == 404
class TestAdminRole:
"""Test role change functionality."""
def test_change_role(self, client, logged_in_user, admin_user):
# Get user ID
import sqlite3
from auth import AUTH_DB
conn = sqlite3.connect(str(AUTH_DB))
row = conn.execute("SELECT id FROM users WHERE username = ?",
(logged_in_user["username"],)).fetchone()
conn.close()
assert row is not None
resp = client.put(f"/admin/users/{row[0]}/role", json={
"user_id": row[0],
"role": 2 # MODERATOR
}, headers=auth_headers(admin_user["access_token"]))
assert resp.status_code == 200
# Verify in DB
conn = sqlite3.connect(str(AUTH_DB))
row = conn.execute("SELECT role FROM users WHERE username = ?",
(logged_in_user["username"],)).fetchone()
conn.close()
assert row[0] == 2
def test_change_role_invalid(self, client, logged_in_user, admin_user):
import sqlite3
from auth import AUTH_DB
conn = sqlite3.connect(str(AUTH_DB))
row = conn.execute("SELECT id FROM users WHERE username = ?",
(logged_in_user["username"],)).fetchone()
conn.close()
assert row is not None
resp = client.put(f"/admin/users/{row[0]}/role", json={
"user_id": row[0],
"role": 99
}, headers=auth_headers(admin_user["access_token"]))
assert resp.status_code in (400, 422)
+187
View File
@@ -0,0 +1,187 @@
"""Tests for auth flow: register, login, refresh, validate, logout."""
import pytest
from tests.conftest import auth_headers
class TestRegister:
"""Test /auth/register endpoint."""
def test_register_success(self, client):
resp = client.post("/auth/register", json={
"username": "newuser",
"password": "SecurePass123"
})
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert "refresh_token" in data
assert "uuid" in data
assert "expires_in" in data
assert "role" in data
assert data["username"] == "newuser"
def test_register_duplicate(self, client, registered_user):
resp = client.post("/auth/register", json={
"username": registered_user["username"],
"password": "AnotherPass123"
})
assert resp.status_code == 409
def test_register_short_username(self, client):
resp = client.post("/auth/register", json={
"username": "ab",
"password": "SecurePass123"
})
assert resp.status_code == 422
def test_register_short_password(self, client):
resp = client.post("/auth/register", json={
"username": "validuser",
"password": "short"
})
assert resp.status_code == 422
def test_register_invalid_username(self, client):
resp = client.post("/auth/register", json={
"username": "user name!",
"password": "SecurePass123"
})
assert resp.status_code == 422
class TestLogin:
"""Test /auth/login endpoint."""
def test_login_success(self, client, registered_user):
resp = client.post("/auth/login", json=registered_user)
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert "refresh_token" in data
assert "uuid" in data
assert data["username"] == registered_user["username"]
def test_login_wrong_password(self, client, registered_user):
resp = client.post("/auth/login", json={
"username": registered_user["username"],
"password": "WrongPassword"
})
assert resp.status_code == 401
def test_login_nonexistent_user(self, client):
resp = client.post("/auth/login", json={
"username": "ghost",
"password": "SomePass123"
})
assert resp.status_code == 401
def test_login_returns_role(self, client, registered_user):
resp = client.post("/auth/login", json=registered_user)
assert resp.status_code == 200
data = resp.json()
assert "role" in data
assert data["role"] == 0 # ROLE_USER
class TestRefresh:
"""Test /auth/refresh endpoint."""
def test_refresh_success(self, client, logged_in_user):
resp = client.post("/auth/refresh", json={
"refresh_token": logged_in_user["refresh_token"]
})
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["username"] == logged_in_user["username"]
def test_refresh_invalid_token(self, client):
resp = client.post("/auth/refresh", json={
"refresh_token": "invalid.token.here"
})
assert resp.status_code == 401
def test_refresh_reuses_token_fails(self, client, logged_in_user):
"""Refresh token should be invalidated after use."""
# First refresh
resp = client.post("/auth/refresh", json={
"refresh_token": logged_in_user["refresh_token"]
})
assert resp.status_code == 200
new_token = resp.json()["refresh_token"]
# Try with old token
resp = client.post("/auth/refresh", json={
"refresh_token": logged_in_user["refresh_token"]
})
assert resp.status_code == 401
class TestValidate:
"""Test /auth/validate endpoint."""
def test_validate_valid_token(self, client, logged_in_user):
resp = client.post("/auth/validate", json={
"username": logged_in_user["username"],
"uuid": logged_in_user["uuid"]
}, headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 200
data = resp.json()
assert data["valid"] is True
assert data["username"] == logged_in_user["username"]
assert "uuid" in data
def test_validate_invalid_token(self, client):
resp = client.post("/auth/validate", json={
"username": "test",
"uuid": "test"
}, headers=auth_headers("invalid.token.here"))
assert resp.status_code == 401 # Invalid token returns 401
def test_validate_no_token(self, client):
resp = client.post("/auth/validate", json={
"username": "test",
"uuid": "test"
})
assert resp.status_code in (401, 403)
def test_validate_banned_user(self, client, logged_in_user, admin_user):
"""Banned user should get valid=false."""
# Ban the user
import sqlite3
from auth import AUTH_DB
conn = sqlite3.connect(str(AUTH_DB))
import time
conn.execute("UPDATE users SET banned_until = ? WHERE username = ?",
(time.time() + 3600, logged_in_user["username"]))
conn.commit()
conn.close()
resp = client.post("/auth/validate", json={
"username": logged_in_user["username"],
"uuid": logged_in_user["uuid"]
}, headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 200
data = resp.json()
assert data["valid"] is False
assert "banned" in data["reason"].lower()
class TestLogout:
"""Test /auth/logout endpoint."""
def test_logout_success(self, client, logged_in_user):
resp = client.post("/auth/logout", headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 200
# Refresh should fail after logout
resp = client.post("/auth/refresh", json={
"refresh_token": logged_in_user["refresh_token"]
})
assert resp.status_code == 401
def test_logout_invalid_token(self, client):
resp = client.post("/auth/logout", headers=auth_headers("invalid.token.here"))
assert resp.status_code == 401
assert resp.status_code == 401
+142
View File
@@ -0,0 +1,142 @@
"""Tests verifying server responses match client (AuthManager.java) expectations."""
import pytest
from tests.conftest import auth_headers
class TestAuthResponseContract:
"""Verify /auth/register and /auth/login response fields match AuthSession.java."""
def test_register_has_all_session_fields(self, client):
"""Client expects: access_token, refresh_token, expires_in, uuid, username, role."""
resp = client.post("/auth/register", json={
"username": "contracttest",
"password": "ContractPass123"
})
assert resp.status_code == 200
data = resp.json()
# AuthManager.AuthSession fields
assert "access_token" in data, "Client needs access_token"
assert "refresh_token" in data, "Client needs refresh_token"
assert "expires_in" in data, "Client needs expires_in (int)"
assert "uuid" in data, "Client needs uuid"
assert "username" in data, "Client needs username"
assert "role" in data, "Client needs role (int)"
# Type checks
assert isinstance(data["access_token"], str)
assert isinstance(data["refresh_token"], str)
assert isinstance(data["expires_in"], int)
assert isinstance(data["uuid"], str)
assert isinstance(data["role"], int)
assert data["expires_in"] > 0 # Must be positive seconds
def test_login_has_all_session_fields(self, client, registered_user):
resp = client.post("/auth/login", json=registered_user)
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert "refresh_token" in data
assert "expires_in" in data
assert "uuid" in data
assert "username" in data
assert "role" in data
assert isinstance(data["expires_in"], int)
assert isinstance(data["role"], int)
class TestValidateResponseContract:
"""Verify /auth/validate response matches client expectations."""
def test_validate_valid_response_fields(self, client, logged_in_user):
"""Client checks: valid (bool), username, uuid, role."""
resp = client.post("/auth/validate", json={
"username": logged_in_user["username"],
"uuid": logged_in_user["uuid"]
}, headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 200
data = resp.json()
assert "valid" in data
assert isinstance(data["valid"], bool)
assert data["valid"] is True
assert "username" in data
assert "uuid" in data
def test_validate_invalid_response_fields(self, client):
resp = client.post("/auth/validate", json={
"username": "test",
"uuid": "test"
}, headers=auth_headers("bad.token"))
assert resp.status_code == 401 # Invalid token returns 401
class TestAdminMeResponseContract:
"""Verify /admin/me response matches UserInfo.java expectations."""
def test_admin_me_has_all_userinfo_fields(self, client, logged_in_user):
"""
Client UserInfo.java expects:
id (int), username, uuid, role (int), role_name, has_pass (bool), permissions (list)
"""
resp = client.get("/admin/me", headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 200
data = resp.json()
assert "id" in data, "UserInfo needs id"
assert "username" in data
assert "uuid" in data
assert "role" in data, "UserInfo needs role"
assert "role_name" in data, "UserInfo needs role_name"
assert "has_pass" in data, "UserInfo needs has_pass"
assert "permissions" in data, "UserInfo needs permissions"
# Type checks
assert isinstance(data["id"], int)
assert isinstance(data["role"], int)
assert isinstance(data["has_pass"], bool)
assert isinstance(data["permissions"], list)
assert isinstance(data["role_name"], str)
class TestErrorResponseContract:
"""Verify error responses match client extractError() parsing."""
def test_error_has_detail_field(self, client):
"""Client parses json.detail (string or array with msg)."""
resp = client.post("/auth/login", json={
"username": "nonexistent",
"password": "wrong"
})
# FastAPI returns 422 for validation errors, auth errors return 401
assert resp.status_code in (401, 422)
data = resp.json()
assert "detail" in data, "Client expects 'detail' field in errors"
assert isinstance(data["detail"], (str, list))
def test_validation_error_has_detail_array(self, client):
"""FastAPI 422 returns detail as array of {loc, msg, type}."""
resp = client.post("/auth/register", json={
"username": "ab",
"password": "x"
})
assert resp.status_code == 422
data = resp.json()
assert "detail" in data
assert isinstance(data["detail"], list)
assert "msg" in data["detail"][0]
class TestPackResponseContract:
"""Verify /packs response matches client expectations."""
def test_packs_response_structure(self, client, logged_in_user):
resp = client.get("/packs", headers=auth_headers(logged_in_user["access_token"]))
# May return 200 or 401/403 depending on auth setup
assert resp.status_code in (200, 401, 403)
if resp.status_code == 200:
data = resp.json()
assert "packs" in data
assert isinstance(data["packs"], list)
+81
View File
@@ -0,0 +1,81 @@
"""Tests for pass (проходка) management."""
import pytest
import sqlite3
import time
import secrets
from tests.conftest import auth_headers
import auth
class TestPassActivate:
"""Test /auth/pass/activate endpoint."""
def test_activate_valid_pass(self, client, logged_in_user):
"""Create a pass code and activate it."""
pass_code = f"TEST-PASS-{secrets.token_hex(4)}"
# Create a pass in DB (use auth.AUTH_DB which is patched by conftest)
conn = sqlite3.connect(str(auth.AUTH_DB))
conn.execute(
"INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 0)",
(pass_code,)
)
conn.commit()
conn.close()
resp = client.post("/auth/pass/activate", json={
"pass_code": pass_code
}, headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 200
data = resp.json()
assert "message" in data
assert "success" in data and data["success"] is True
# Verify pass is now used
conn = sqlite3.connect(str(auth.AUTH_DB))
row = conn.execute("SELECT uses, activated_by FROM passes WHERE code = ?", (pass_code,)).fetchone()
conn.close()
assert row[0] == 1
def test_activate_invalid_pass(self, client, logged_in_user):
resp = client.post("/auth/pass/activate", json={
"pass_code": "NONEXISTENT-CODE"
}, headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 404
def test_activate_already_used_pass(self, client, logged_in_user):
"""Create an already-used pass."""
pass_code = f"USED-PASS-{secrets.token_hex(4)}"
conn = sqlite3.connect(str(auth.AUTH_DB))
conn.execute(
"INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 1)",
(pass_code,)
)
conn.commit()
conn.close()
resp = client.post("/auth/pass/activate", json={
"pass_code": pass_code
}, headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code in (400, 404) # 400 for max uses reached, 404 for not found
def test_activate_pass_empty_code(self, client, logged_in_user):
resp = client.post("/auth/pass/activate", json={
"pass_code": ""
}, headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code == 422
class TestPassMyStatus:
"""Test /auth/pass/my endpoint."""
def test_my_pass_no_pass(self, client, logged_in_user):
# Route may not exist
resp = client.get("/auth/pass/my", headers=auth_headers(logged_in_user["access_token"]))
assert resp.status_code in (200, 404)
if resp.status_code == 200:
data = resp.json()
assert "has_active" in data
assert data["has_active"] is False
+12
View File
@@ -0,0 +1,12 @@
"""Tests for proxy endpoints."""
import pytest
class TestProxyEndpoints:
"""Test /proxy/* endpoints."""
def test_proxy_status(self, client):
"""Proxy status should be accessible."""
resp = client.get("/proxy/status")
# May return 200 or 500 if proxy_client is None (no lifespan in tests)
assert resp.status_code in (200, 500)
+70
View File
@@ -0,0 +1,70 @@
"""Tests for rate limiting (TTLCache-based)."""
import pytest
from auth import check_rate_limit, record_login_attempt, MAX_LOGIN_ATTEMPTS, LOGIN_BLOCK_MINUTES
class TestRateLimit:
"""Test rate limiting functions."""
def test_no_attempts_allowed(self):
"""Fresh IP should be allowed."""
allowed, wait = check_rate_limit("fresh-ip")
assert allowed is True
assert wait is None
def test_single_attempt_allowed(self):
"""One failed attempt should still be allowed."""
ip = "single-attempt-ip"
record_login_attempt(ip, False)
allowed, wait = check_rate_limit(ip)
assert allowed is True
def test_max_attempts_blocks(self):
"""MAX_LOGIN_ATTEMPTS failed attempts should block."""
ip = "blocked-ip"
for _ in range(MAX_LOGIN_ATTEMPTS):
record_login_attempt(ip, False)
allowed, wait = check_rate_limit(ip)
assert allowed is False
assert wait is not None
assert wait > 0
# Wait should be approximately LOGIN_BLOCK_MINUTES * 60
assert wait <= LOGIN_BLOCK_MINUTES * 60
def test_success_resets_attempts(self):
"""Successful login should reset rate limit."""
ip = "reset-ip"
for _ in range(MAX_LOGIN_ATTEMPTS - 1):
record_login_attempt(ip, False)
# One success should reset
record_login_attempt(ip, True)
allowed, wait = check_rate_limit(ip)
assert allowed is True
assert wait is None
def test_success_then_fail_starts_fresh(self):
"""After success reset, failing again should start from 1."""
ip = "fresh-start-ip"
record_login_attempt(ip, False)
record_login_attempt(ip, True)
record_login_attempt(ip, False)
allowed, wait = check_rate_limit(ip)
assert allowed is True # Only 1 attempt after reset
def test_separate_ips_independent(self):
"""Rate limit should be per-IP."""
ip1 = "ip-one"
ip2 = "ip-two"
for _ in range(MAX_LOGIN_ATTEMPTS):
record_login_attempt(ip1, False)
allowed1, _ = check_rate_limit(ip1)
allowed2, _ = check_rate_limit(ip2)
assert allowed1 is False
assert allowed2 is True