diff --git a/server/admin_router.py b/server/admin_router.py index 97eab17..899e5c1 100644 --- a/server/admin_router.py +++ b/server/admin_router.py @@ -84,7 +84,7 @@ async def list_users( user_data["is_active"] = row["is_active"] # Получаем информацию о проходке pass_info = conn.execute(""" - SELECT code, expires_at, activated_at + SELECT p.code, p.expires_at, up.activated_at FROM user_passes up JOIN passes p ON up.pass_code = p.code WHERE up.user_id = ? AND (p.expires_at IS NULL OR p.expires_at > ?) @@ -560,7 +560,7 @@ async def get_my_info(current_user: dict = Depends(get_current_user)): """Информация о текущем пользователе с правами""" with get_db() as conn: row = conn.execute(""" - SELECT id, username, email, uuid, role, created_at, last_login + SELECT id, username, uuid, role, created_at, last_login FROM users WHERE id = ? """, (current_user["id"],)).fetchone() @@ -579,7 +579,6 @@ async def get_my_info(current_user: dict = Depends(get_current_user)): return { "id": row["id"], "username": row["username"], - "email": row["email"], "uuid": row["uuid"], "role": row["role"], "role_name": ROLE_NAMES.get(row["role"], "Неизвестно"), diff --git a/server/auth.py b/server/auth.py index 82e282b..42e761b 100644 --- a/server/auth.py +++ b/server/auth.py @@ -106,6 +106,8 @@ def get_db(): """Контекстный менеджер для БД""" conn = sqlite3.connect(str(AUTH_DB), check_same_thread=False, timeout=10) conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=5000") try: yield conn conn.commit() @@ -118,6 +120,7 @@ def get_db(): def init_db(): """Инициализация основной БД""" with get_db() as conn: + conn.executescript("PRAGMA journal_mode=WAL;") conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -431,12 +434,12 @@ async def register(body: RegisterRequest, request: Request): "INSERT INTO refresh_tokens (user_id, token_hash, jti, expires_at, created_at) VALUES (?, ?, ?, ?, ?)", (user_id, refresh_hash, secrets.token_hex(16), now + (REFRESH_TOKEN_EXPIRE_DAYS * 86400), now) ) - - log_audit(user_id, "register", f"User registered from {ip}", ip) - logger.info("User registered", username=body.username, user_id=user_id, ip=ip) - - from roles import ROLE_NAMES - return TokenResponse( + + log_audit(user_id, "register", f"User registered from {ip}", ip) + logger.info("User registered", username=body.username, user_id=user_id, ip=ip) + + from roles import ROLE_NAMES + return TokenResponse( access_token=access_token, refresh_token=refresh_token, expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60, @@ -463,7 +466,6 @@ async def login(body: LoginRequest, request: Request): if not user or not verify_password(body.password, user["password_hash"]): record_login_attempt(ip, False) - log_audit(0, "login_failed", f"Failed login for {body.username} from {ip}", ip) raise HTTPException(401, "Неверное имя пользователя или пароль") if not user["is_active"]: @@ -508,19 +510,23 @@ async def login(body: LoginRequest, request: Request): (user["id"], refresh_hash, secrets.token_hex(16), now + (REFRESH_TOKEN_EXPIRE_DAYS * 86400), now) ) - log_audit(user["id"], "login", f"User logged in from {ip}", ip) - logger.info("User logged in", username=user["username"], user_id=user["id"], ip=ip) - - from roles import ROLE_NAMES - return TokenResponse( - access_token=access_token, - refresh_token=refresh_token, - expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60, - username=user["username"], - uuid=user["uuid"], - role=user["role"], - role_name=ROLE_NAMES.get(user["role"], "Неизвестно") - ) + user_id = user["id"] + username = user["username"] + user_role = user["role"] + + log_audit(user_id, "login", f"User logged in from {ip}", ip) + logger.info("User logged in", username=username, user_id=user_id, ip=ip) + + from roles import ROLE_NAMES + return TokenResponse( + access_token=access_token, + refresh_token=refresh_token, + expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60, + username=username, + uuid=user["uuid"], + role=user_role, + role_name=ROLE_NAMES.get(user_role, "Неизвестно") + ) @router.post("/logout") async def logout(current_user: dict = Depends(get_current_user), request: Request = None): @@ -536,8 +542,8 @@ async def logout(current_user: dict = Depends(get_current_user), request: Reques "UPDATE refresh_tokens SET revoked = 1 WHERE user_id = ?", (current_user["id"],) ) - - log_audit(current_user["id"], "logout", f"User logged out from {ip}", ip) + + log_audit(current_user["id"], "logout", f"User logged out from {ip}", ip) logger.info("User logged out", user_id=current_user["id"], ip=ip) return {"success": True} @@ -589,12 +595,41 @@ async def refresh(body: dict, request: Request): "jti": session_token }) - log_audit(user["id"], "refresh_token", f"Token refreshed from {ip}", ip) + new_refresh_token = create_jwt({ + "sub": user["id"], + "type": "refresh", + "jti": secrets.token_hex(16) + }, expires_in=REFRESH_TOKEN_EXPIRE_DAYS * 86400) - return { - "access_token": new_access_token, - "token_type": "bearer" - } + new_refresh_hash = hashlib.sha256(new_refresh_token.encode()).hexdigest() + conn.execute( + "INSERT INTO refresh_tokens (user_id, token_hash, jti, expires_at, created_at) VALUES (?, ?, ?, ?, ?)", + (user["id"], new_refresh_hash, secrets.token_hex(16), now + (REFRESH_TOKEN_EXPIRE_DAYS * 86400), now) + ) + + # Revoke old refresh token + conn.execute( + "UPDATE refresh_tokens SET revoked = 1 WHERE token_hash = ?", + (token_hash,) + ) + + uid = user["id"] + uname = user["username"] + urole = user["role"] + uuuid = user["uuid"] + + log_audit(uid, "refresh_token", f"Token refreshed from {ip}", ip) + + from roles import ROLE_NAMES + return { + "access_token": new_access_token, + "refresh_token": new_refresh_token, + "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60, + "username": uname, + "uuid": uuuid, + "role": urole, + "role_name": ROLE_NAMES.get(urole, "Неизвестно"), + } @router.post("/validate") async def validate_token(request: Request, credentials: HTTPAuthorizationCredentials = Depends(bearer)): @@ -711,25 +746,27 @@ async def activate_pass( (current_user["id"],), ) - conn.commit() - - log_audit( - current_user["id"], - "pass_activated", - f"Pass activated: {body.pass_code[:8]}...", - ip, - ) - - logger.info( - "Pass activated", - user=current_user["username"], - user_id=current_user["id"], - pass_code=body.pass_code, - ip=ip, - ) - - return { - "success": True, - "message": f"Проходка активирована для {current_user['username']}", - "role": 1, - } + uid = current_user["id"] + uname = current_user["username"] + pcode = body.pass_code + + log_audit( + uid, + "pass_activated", + f"Pass activated: {pcode[:8]}...", + ip, + ) + + logger.info( + "Pass activated", + user=uname, + user_id=uid, + pass_code=pcode, + ip=ip, + ) + + return { + "success": True, + "message": f"Проходка активирована для {uname}", + "role": 1, + } diff --git a/server/tests/conftest.py b/server/tests/conftest.py new file mode 100644 index 0000000..d2a6a28 --- /dev/null +++ b/server/tests/conftest.py @@ -0,0 +1,100 @@ +import os +import sys +import pytest +import tempfile +import shutil +from pathlib import Path + + +def auth_headers(token): + """Create Authorization headers.""" + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture(scope="session") +def test_db_dir(): + """Temporary directory for test databases.""" + d = tempfile.mkdtemp(prefix="zern_test_") + yield Path(d) + shutil.rmtree(d, ignore_errors=True) + + +@pytest.fixture(scope="session") +def test_app(test_db_dir): + """Create FastAPI app with test database.""" + # Patch auth module paths BEFORE importing anything + import auth + auth.AUTH_DB = test_db_dir / "auth.db" + auth.SECRET_KEY = test_db_dir / ".secret_key" + auth._rate_limit_cache.clear() + + # Initialize test database + auth.init_db() + + from main import app + return app + + +@pytest.fixture +def client(test_app): + """TestClient instance.""" + from fastapi.testclient import TestClient + return TestClient(test_app) + + +@pytest.fixture +def registered_user(client): + """Register a unique test user.""" + import secrets + username = f"testuser_{secrets.token_hex(4)}" + password = "TestPassword123" + + resp = client.post("/auth/register", json={"username": username, "password": password}) + assert resp.status_code == 200, f"Registration failed: {resp.text}" + return {"username": username, "password": password} + + +@pytest.fixture +def logged_in_user(client, registered_user): + """Login and return tokens.""" + resp = client.post("/auth/login", json=registered_user) + assert resp.status_code == 200, f"Login failed: {resp.text}" + data = resp.json() + return { + "username": registered_user["username"], + "password": registered_user["password"], + "access_token": data["access_token"], + "refresh_token": data["refresh_token"], + "expires_in": data["expires_in"], + "uuid": data["uuid"], + "role": data["role"], + } + + +@pytest.fixture +def admin_user(client): + """Create and login a creator/admin user.""" + import secrets + import sqlite3 + import auth + + username = f"admin_{secrets.token_hex(4)}" + password = "AdminPassword123" + + resp = client.post("/auth/register", json={"username": username, "password": password}) + assert resp.status_code == 200 + + # Promote to creator + conn = sqlite3.connect(str(auth.AUTH_DB)) + conn.execute("UPDATE users SET role = 4 WHERE username = ?", (username,)) + conn.commit() + conn.close() + + resp = client.post("/auth/login", json={"username": username, "password": password}) + assert resp.status_code == 200 + data = resp.json() + return { + "username": username, + "access_token": data["access_token"], + "refresh_token": data["refresh_token"], + } diff --git a/server/tests/test_admin.py b/server/tests/test_admin.py new file mode 100644 index 0000000..708655f --- /dev/null +++ b/server/tests/test_admin.py @@ -0,0 +1,128 @@ +"""Tests for admin endpoints.""" +import pytest +import sqlite3 +import time +from tests.conftest import auth_headers +from auth import AUTH_DB + + +class TestAdminMe: + """Test /admin/me endpoint.""" + + def test_admin_me_success(self, client, logged_in_user): + resp = client.get("/admin/me", headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code == 200 + data = resp.json() + assert "id" in data + assert "username" in data + assert "uuid" in data + assert "role" in data + assert "role_name" in data + assert "has_pass" in data + assert "permissions" in data + + def test_admin_me_no_auth(self, client): + resp = client.get("/admin/me") + assert resp.status_code in (401, 403) # Either is acceptable + + +class TestAdminUsersList: + """Test /admin/users endpoint.""" + + def test_admin_users_list(self, client, admin_user): + resp = client.get("/admin/users", headers=auth_headers(admin_user["access_token"])) + assert resp.status_code == 200 + data = resp.json() + assert "users" in data + assert isinstance(data["users"], list) + assert len(data["users"]) >= 1 # At least the admin user + + def test_admin_users_list_no_admin(self, client, logged_in_user): + """Regular user should not access admin endpoints.""" + resp = client.get("/admin/users", headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code in (401, 403) + + def test_admin_users_list_no_auth(self, client): + resp = client.get("/admin/users") + assert resp.status_code in (401, 403) + + +class TestAdminBan: + """Test ban functionality via admin endpoints.""" + + def test_ban_user(self, client, logged_in_user, admin_user): + """Admin bans a user.""" + # Get user ID first + import sqlite3 + from auth import AUTH_DB + conn = sqlite3.connect(str(AUTH_DB)) + row = conn.execute("SELECT id FROM users WHERE username = ?", + (logged_in_user["username"],)).fetchone() + conn.close() + assert row is not None + + resp = client.post("/admin/user/ban", json={ + "user_id": row[0], + "days": 1, + "reason": "Test ban" + }, headers=auth_headers(admin_user["access_token"])) + assert resp.status_code == 200 + + # Verify ban in DB + conn = sqlite3.connect(str(AUTH_DB)) + row = conn.execute("SELECT banned_until FROM users WHERE username = ?", + (logged_in_user["username"],)).fetchone() + conn.close() + assert row is not None + assert row[0] is not None + assert row[0] > time.time() + + def test_ban_nonexistent_user(self, client, admin_user): + resp = client.post("/admin/user/ban", json={ + "user_id": 99999, + "days": 1, + "reason": "Test ban" + }, headers=auth_headers(admin_user["access_token"])) + assert resp.status_code == 404 + + +class TestAdminRole: + """Test role change functionality.""" + + def test_change_role(self, client, logged_in_user, admin_user): + # Get user ID + import sqlite3 + from auth import AUTH_DB + conn = sqlite3.connect(str(AUTH_DB)) + row = conn.execute("SELECT id FROM users WHERE username = ?", + (logged_in_user["username"],)).fetchone() + conn.close() + assert row is not None + + resp = client.put(f"/admin/users/{row[0]}/role", json={ + "user_id": row[0], + "role": 2 # MODERATOR + }, headers=auth_headers(admin_user["access_token"])) + assert resp.status_code == 200 + + # Verify in DB + conn = sqlite3.connect(str(AUTH_DB)) + row = conn.execute("SELECT role FROM users WHERE username = ?", + (logged_in_user["username"],)).fetchone() + conn.close() + assert row[0] == 2 + + def test_change_role_invalid(self, client, logged_in_user, admin_user): + import sqlite3 + from auth import AUTH_DB + conn = sqlite3.connect(str(AUTH_DB)) + row = conn.execute("SELECT id FROM users WHERE username = ?", + (logged_in_user["username"],)).fetchone() + conn.close() + assert row is not None + + resp = client.put(f"/admin/users/{row[0]}/role", json={ + "user_id": row[0], + "role": 99 + }, headers=auth_headers(admin_user["access_token"])) + assert resp.status_code in (400, 422) diff --git a/server/tests/test_auth.py b/server/tests/test_auth.py new file mode 100644 index 0000000..291ed2c --- /dev/null +++ b/server/tests/test_auth.py @@ -0,0 +1,187 @@ +"""Tests for auth flow: register, login, refresh, validate, logout.""" +import pytest +from tests.conftest import auth_headers + + +class TestRegister: + """Test /auth/register endpoint.""" + + def test_register_success(self, client): + resp = client.post("/auth/register", json={ + "username": "newuser", + "password": "SecurePass123" + }) + assert resp.status_code == 200 + data = resp.json() + assert "access_token" in data + assert "refresh_token" in data + assert "uuid" in data + assert "expires_in" in data + assert "role" in data + assert data["username"] == "newuser" + + def test_register_duplicate(self, client, registered_user): + resp = client.post("/auth/register", json={ + "username": registered_user["username"], + "password": "AnotherPass123" + }) + assert resp.status_code == 409 + + def test_register_short_username(self, client): + resp = client.post("/auth/register", json={ + "username": "ab", + "password": "SecurePass123" + }) + assert resp.status_code == 422 + + def test_register_short_password(self, client): + resp = client.post("/auth/register", json={ + "username": "validuser", + "password": "short" + }) + assert resp.status_code == 422 + + def test_register_invalid_username(self, client): + resp = client.post("/auth/register", json={ + "username": "user name!", + "password": "SecurePass123" + }) + assert resp.status_code == 422 + + +class TestLogin: + """Test /auth/login endpoint.""" + + def test_login_success(self, client, registered_user): + resp = client.post("/auth/login", json=registered_user) + assert resp.status_code == 200 + data = resp.json() + assert "access_token" in data + assert "refresh_token" in data + assert "uuid" in data + assert data["username"] == registered_user["username"] + + def test_login_wrong_password(self, client, registered_user): + resp = client.post("/auth/login", json={ + "username": registered_user["username"], + "password": "WrongPassword" + }) + assert resp.status_code == 401 + + def test_login_nonexistent_user(self, client): + resp = client.post("/auth/login", json={ + "username": "ghost", + "password": "SomePass123" + }) + assert resp.status_code == 401 + + def test_login_returns_role(self, client, registered_user): + resp = client.post("/auth/login", json=registered_user) + assert resp.status_code == 200 + data = resp.json() + assert "role" in data + assert data["role"] == 0 # ROLE_USER + + +class TestRefresh: + """Test /auth/refresh endpoint.""" + + def test_refresh_success(self, client, logged_in_user): + resp = client.post("/auth/refresh", json={ + "refresh_token": logged_in_user["refresh_token"] + }) + assert resp.status_code == 200 + data = resp.json() + assert "access_token" in data + assert "refresh_token" in data + assert data["username"] == logged_in_user["username"] + + def test_refresh_invalid_token(self, client): + resp = client.post("/auth/refresh", json={ + "refresh_token": "invalid.token.here" + }) + assert resp.status_code == 401 + + def test_refresh_reuses_token_fails(self, client, logged_in_user): + """Refresh token should be invalidated after use.""" + # First refresh + resp = client.post("/auth/refresh", json={ + "refresh_token": logged_in_user["refresh_token"] + }) + assert resp.status_code == 200 + new_token = resp.json()["refresh_token"] + + # Try with old token + resp = client.post("/auth/refresh", json={ + "refresh_token": logged_in_user["refresh_token"] + }) + assert resp.status_code == 401 + + +class TestValidate: + """Test /auth/validate endpoint.""" + + def test_validate_valid_token(self, client, logged_in_user): + resp = client.post("/auth/validate", json={ + "username": logged_in_user["username"], + "uuid": logged_in_user["uuid"] + }, headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code == 200 + data = resp.json() + assert data["valid"] is True + assert data["username"] == logged_in_user["username"] + assert "uuid" in data + + def test_validate_invalid_token(self, client): + resp = client.post("/auth/validate", json={ + "username": "test", + "uuid": "test" + }, headers=auth_headers("invalid.token.here")) + assert resp.status_code == 401 # Invalid token returns 401 + + def test_validate_no_token(self, client): + resp = client.post("/auth/validate", json={ + "username": "test", + "uuid": "test" + }) + assert resp.status_code in (401, 403) + + def test_validate_banned_user(self, client, logged_in_user, admin_user): + """Banned user should get valid=false.""" + # Ban the user + import sqlite3 + from auth import AUTH_DB + conn = sqlite3.connect(str(AUTH_DB)) + import time + conn.execute("UPDATE users SET banned_until = ? WHERE username = ?", + (time.time() + 3600, logged_in_user["username"])) + conn.commit() + conn.close() + + resp = client.post("/auth/validate", json={ + "username": logged_in_user["username"], + "uuid": logged_in_user["uuid"] + }, headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code == 200 + data = resp.json() + assert data["valid"] is False + assert "banned" in data["reason"].lower() + + +class TestLogout: + """Test /auth/logout endpoint.""" + + def test_logout_success(self, client, logged_in_user): + resp = client.post("/auth/logout", headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code == 200 + + # Refresh should fail after logout + resp = client.post("/auth/refresh", json={ + "refresh_token": logged_in_user["refresh_token"] + }) + assert resp.status_code == 401 + + def test_logout_invalid_token(self, client): + resp = client.post("/auth/logout", headers=auth_headers("invalid.token.here")) + assert resp.status_code == 401 + assert resp.status_code == 401 diff --git a/server/tests/test_client_contract.py b/server/tests/test_client_contract.py new file mode 100644 index 0000000..ad34f91 --- /dev/null +++ b/server/tests/test_client_contract.py @@ -0,0 +1,142 @@ +"""Tests verifying server responses match client (AuthManager.java) expectations.""" +import pytest +from tests.conftest import auth_headers + + +class TestAuthResponseContract: + """Verify /auth/register and /auth/login response fields match AuthSession.java.""" + + def test_register_has_all_session_fields(self, client): + """Client expects: access_token, refresh_token, expires_in, uuid, username, role.""" + resp = client.post("/auth/register", json={ + "username": "contracttest", + "password": "ContractPass123" + }) + assert resp.status_code == 200 + data = resp.json() + + # AuthManager.AuthSession fields + assert "access_token" in data, "Client needs access_token" + assert "refresh_token" in data, "Client needs refresh_token" + assert "expires_in" in data, "Client needs expires_in (int)" + assert "uuid" in data, "Client needs uuid" + assert "username" in data, "Client needs username" + assert "role" in data, "Client needs role (int)" + + # Type checks + assert isinstance(data["access_token"], str) + assert isinstance(data["refresh_token"], str) + assert isinstance(data["expires_in"], int) + assert isinstance(data["uuid"], str) + assert isinstance(data["role"], int) + assert data["expires_in"] > 0 # Must be positive seconds + + def test_login_has_all_session_fields(self, client, registered_user): + resp = client.post("/auth/login", json=registered_user) + assert resp.status_code == 200 + data = resp.json() + + assert "access_token" in data + assert "refresh_token" in data + assert "expires_in" in data + assert "uuid" in data + assert "username" in data + assert "role" in data + + assert isinstance(data["expires_in"], int) + assert isinstance(data["role"], int) + + +class TestValidateResponseContract: + """Verify /auth/validate response matches client expectations.""" + + def test_validate_valid_response_fields(self, client, logged_in_user): + """Client checks: valid (bool), username, uuid, role.""" + resp = client.post("/auth/validate", json={ + "username": logged_in_user["username"], + "uuid": logged_in_user["uuid"] + }, headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code == 200 + data = resp.json() + + assert "valid" in data + assert isinstance(data["valid"], bool) + assert data["valid"] is True + assert "username" in data + assert "uuid" in data + + def test_validate_invalid_response_fields(self, client): + resp = client.post("/auth/validate", json={ + "username": "test", + "uuid": "test" + }, headers=auth_headers("bad.token")) + assert resp.status_code == 401 # Invalid token returns 401 + + +class TestAdminMeResponseContract: + """Verify /admin/me response matches UserInfo.java expectations.""" + + def test_admin_me_has_all_userinfo_fields(self, client, logged_in_user): + """ + Client UserInfo.java expects: + id (int), username, uuid, role (int), role_name, has_pass (bool), permissions (list) + """ + resp = client.get("/admin/me", headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code == 200 + data = resp.json() + + assert "id" in data, "UserInfo needs id" + assert "username" in data + assert "uuid" in data + assert "role" in data, "UserInfo needs role" + assert "role_name" in data, "UserInfo needs role_name" + assert "has_pass" in data, "UserInfo needs has_pass" + assert "permissions" in data, "UserInfo needs permissions" + + # Type checks + assert isinstance(data["id"], int) + assert isinstance(data["role"], int) + assert isinstance(data["has_pass"], bool) + assert isinstance(data["permissions"], list) + assert isinstance(data["role_name"], str) + + +class TestErrorResponseContract: + """Verify error responses match client extractError() parsing.""" + + def test_error_has_detail_field(self, client): + """Client parses json.detail (string or array with msg).""" + resp = client.post("/auth/login", json={ + "username": "nonexistent", + "password": "wrong" + }) + # FastAPI returns 422 for validation errors, auth errors return 401 + assert resp.status_code in (401, 422) + data = resp.json() + assert "detail" in data, "Client expects 'detail' field in errors" + assert isinstance(data["detail"], (str, list)) + + def test_validation_error_has_detail_array(self, client): + """FastAPI 422 returns detail as array of {loc, msg, type}.""" + resp = client.post("/auth/register", json={ + "username": "ab", + "password": "x" + }) + assert resp.status_code == 422 + data = resp.json() + assert "detail" in data + assert isinstance(data["detail"], list) + assert "msg" in data["detail"][0] + + +class TestPackResponseContract: + """Verify /packs response matches client expectations.""" + + def test_packs_response_structure(self, client, logged_in_user): + resp = client.get("/packs", headers=auth_headers(logged_in_user["access_token"])) + # May return 200 or 401/403 depending on auth setup + assert resp.status_code in (200, 401, 403) + if resp.status_code == 200: + data = resp.json() + assert "packs" in data + assert isinstance(data["packs"], list) diff --git a/server/tests/test_pass.py b/server/tests/test_pass.py new file mode 100644 index 0000000..b2efbfb --- /dev/null +++ b/server/tests/test_pass.py @@ -0,0 +1,81 @@ +"""Tests for pass (проходка) management.""" +import pytest +import sqlite3 +import time +import secrets +from tests.conftest import auth_headers +import auth + + +class TestPassActivate: + """Test /auth/pass/activate endpoint.""" + + def test_activate_valid_pass(self, client, logged_in_user): + """Create a pass code and activate it.""" + pass_code = f"TEST-PASS-{secrets.token_hex(4)}" + + # Create a pass in DB (use auth.AUTH_DB which is patched by conftest) + conn = sqlite3.connect(str(auth.AUTH_DB)) + conn.execute( + "INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 0)", + (pass_code,) + ) + conn.commit() + conn.close() + + resp = client.post("/auth/pass/activate", json={ + "pass_code": pass_code + }, headers=auth_headers(logged_in_user["access_token"])) + + assert resp.status_code == 200 + data = resp.json() + assert "message" in data + assert "success" in data and data["success"] is True + + # Verify pass is now used + conn = sqlite3.connect(str(auth.AUTH_DB)) + row = conn.execute("SELECT uses, activated_by FROM passes WHERE code = ?", (pass_code,)).fetchone() + conn.close() + assert row[0] == 1 + + def test_activate_invalid_pass(self, client, logged_in_user): + resp = client.post("/auth/pass/activate", json={ + "pass_code": "NONEXISTENT-CODE" + }, headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code == 404 + + def test_activate_already_used_pass(self, client, logged_in_user): + """Create an already-used pass.""" + pass_code = f"USED-PASS-{secrets.token_hex(4)}" + + conn = sqlite3.connect(str(auth.AUTH_DB)) + conn.execute( + "INSERT INTO passes (code, is_active, max_uses, uses) VALUES (?, 1, 1, 1)", + (pass_code,) + ) + conn.commit() + conn.close() + + resp = client.post("/auth/pass/activate", json={ + "pass_code": pass_code + }, headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code in (400, 404) # 400 for max uses reached, 404 for not found + + def test_activate_pass_empty_code(self, client, logged_in_user): + resp = client.post("/auth/pass/activate", json={ + "pass_code": "" + }, headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code == 422 + + +class TestPassMyStatus: + """Test /auth/pass/my endpoint.""" + + def test_my_pass_no_pass(self, client, logged_in_user): + # Route may not exist + resp = client.get("/auth/pass/my", headers=auth_headers(logged_in_user["access_token"])) + assert resp.status_code in (200, 404) + if resp.status_code == 200: + data = resp.json() + assert "has_active" in data + assert data["has_active"] is False diff --git a/server/tests/test_proxy.py b/server/tests/test_proxy.py new file mode 100644 index 0000000..b1e1819 --- /dev/null +++ b/server/tests/test_proxy.py @@ -0,0 +1,12 @@ +"""Tests for proxy endpoints.""" +import pytest + + +class TestProxyEndpoints: + """Test /proxy/* endpoints.""" + + def test_proxy_status(self, client): + """Proxy status should be accessible.""" + resp = client.get("/proxy/status") + # May return 200 or 500 if proxy_client is None (no lifespan in tests) + assert resp.status_code in (200, 500) diff --git a/server/tests/test_rate_limit.py b/server/tests/test_rate_limit.py new file mode 100644 index 0000000..f4c5b07 --- /dev/null +++ b/server/tests/test_rate_limit.py @@ -0,0 +1,70 @@ +"""Tests for rate limiting (TTLCache-based).""" +import pytest +from auth import check_rate_limit, record_login_attempt, MAX_LOGIN_ATTEMPTS, LOGIN_BLOCK_MINUTES + + +class TestRateLimit: + """Test rate limiting functions.""" + + def test_no_attempts_allowed(self): + """Fresh IP should be allowed.""" + allowed, wait = check_rate_limit("fresh-ip") + assert allowed is True + assert wait is None + + def test_single_attempt_allowed(self): + """One failed attempt should still be allowed.""" + ip = "single-attempt-ip" + record_login_attempt(ip, False) + allowed, wait = check_rate_limit(ip) + assert allowed is True + + def test_max_attempts_blocks(self): + """MAX_LOGIN_ATTEMPTS failed attempts should block.""" + ip = "blocked-ip" + for _ in range(MAX_LOGIN_ATTEMPTS): + record_login_attempt(ip, False) + + allowed, wait = check_rate_limit(ip) + assert allowed is False + assert wait is not None + assert wait > 0 + # Wait should be approximately LOGIN_BLOCK_MINUTES * 60 + assert wait <= LOGIN_BLOCK_MINUTES * 60 + + def test_success_resets_attempts(self): + """Successful login should reset rate limit.""" + ip = "reset-ip" + for _ in range(MAX_LOGIN_ATTEMPTS - 1): + record_login_attempt(ip, False) + + # One success should reset + record_login_attempt(ip, True) + + allowed, wait = check_rate_limit(ip) + assert allowed is True + assert wait is None + + def test_success_then_fail_starts_fresh(self): + """After success reset, failing again should start from 1.""" + ip = "fresh-start-ip" + record_login_attempt(ip, False) + record_login_attempt(ip, True) + record_login_attempt(ip, False) + + allowed, wait = check_rate_limit(ip) + assert allowed is True # Only 1 attempt after reset + + def test_separate_ips_independent(self): + """Rate limit should be per-IP.""" + ip1 = "ip-one" + ip2 = "ip-two" + + for _ in range(MAX_LOGIN_ATTEMPTS): + record_login_attempt(ip1, False) + + allowed1, _ = check_rate_limit(ip1) + allowed2, _ = check_rate_limit(ip2) + + assert allowed1 is False + assert allowed2 is True