c0310ed573
- Add pytest test suite: test_auth.py, test_admin.py, test_pass.py, test_proxy.py, test_rate_limit.py, test_client_contract.py - Fix SQLite 'database is locked' errors: moved log_audit() calls outside with get_db() blocks in register, login, logout, refresh, activate_pass - Enable WAL mode and busy_timeout in get_db() for concurrent access - Fix /admin/me: removed non-existent 'email' column from query - Fix /admin/users list: disambiguated activated_at column in JOIN query - Fix /auth/refresh: now returns refresh_token + expires_in + username/uuid/role to match AuthManager.AuthSession expectations; revokes old refresh token - Fix conftest.py: unique usernames per test to avoid conflicts - All 47 tests passing
188 lines
6.4 KiB
Python
188 lines
6.4 KiB
Python
"""Tests for auth flow: register, login, refresh, validate, logout."""
|
|
import pytest
|
|
from tests.conftest import auth_headers
|
|
|
|
|
|
class TestRegister:
|
|
"""Test /auth/register endpoint."""
|
|
|
|
def test_register_success(self, client):
|
|
resp = client.post("/auth/register", json={
|
|
"username": "newuser",
|
|
"password": "SecurePass123"
|
|
})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "access_token" in data
|
|
assert "refresh_token" in data
|
|
assert "uuid" in data
|
|
assert "expires_in" in data
|
|
assert "role" in data
|
|
assert data["username"] == "newuser"
|
|
|
|
def test_register_duplicate(self, client, registered_user):
|
|
resp = client.post("/auth/register", json={
|
|
"username": registered_user["username"],
|
|
"password": "AnotherPass123"
|
|
})
|
|
assert resp.status_code == 409
|
|
|
|
def test_register_short_username(self, client):
|
|
resp = client.post("/auth/register", json={
|
|
"username": "ab",
|
|
"password": "SecurePass123"
|
|
})
|
|
assert resp.status_code == 422
|
|
|
|
def test_register_short_password(self, client):
|
|
resp = client.post("/auth/register", json={
|
|
"username": "validuser",
|
|
"password": "short"
|
|
})
|
|
assert resp.status_code == 422
|
|
|
|
def test_register_invalid_username(self, client):
|
|
resp = client.post("/auth/register", json={
|
|
"username": "user name!",
|
|
"password": "SecurePass123"
|
|
})
|
|
assert resp.status_code == 422
|
|
|
|
|
|
class TestLogin:
|
|
"""Test /auth/login endpoint."""
|
|
|
|
def test_login_success(self, client, registered_user):
|
|
resp = client.post("/auth/login", json=registered_user)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "access_token" in data
|
|
assert "refresh_token" in data
|
|
assert "uuid" in data
|
|
assert data["username"] == registered_user["username"]
|
|
|
|
def test_login_wrong_password(self, client, registered_user):
|
|
resp = client.post("/auth/login", json={
|
|
"username": registered_user["username"],
|
|
"password": "WrongPassword"
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_login_nonexistent_user(self, client):
|
|
resp = client.post("/auth/login", json={
|
|
"username": "ghost",
|
|
"password": "SomePass123"
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_login_returns_role(self, client, registered_user):
|
|
resp = client.post("/auth/login", json=registered_user)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "role" in data
|
|
assert data["role"] == 0 # ROLE_USER
|
|
|
|
|
|
class TestRefresh:
|
|
"""Test /auth/refresh endpoint."""
|
|
|
|
def test_refresh_success(self, client, logged_in_user):
|
|
resp = client.post("/auth/refresh", json={
|
|
"refresh_token": logged_in_user["refresh_token"]
|
|
})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "access_token" in data
|
|
assert "refresh_token" in data
|
|
assert data["username"] == logged_in_user["username"]
|
|
|
|
def test_refresh_invalid_token(self, client):
|
|
resp = client.post("/auth/refresh", json={
|
|
"refresh_token": "invalid.token.here"
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_refresh_reuses_token_fails(self, client, logged_in_user):
|
|
"""Refresh token should be invalidated after use."""
|
|
# First refresh
|
|
resp = client.post("/auth/refresh", json={
|
|
"refresh_token": logged_in_user["refresh_token"]
|
|
})
|
|
assert resp.status_code == 200
|
|
new_token = resp.json()["refresh_token"]
|
|
|
|
# Try with old token
|
|
resp = client.post("/auth/refresh", json={
|
|
"refresh_token": logged_in_user["refresh_token"]
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
class TestValidate:
|
|
"""Test /auth/validate endpoint."""
|
|
|
|
def test_validate_valid_token(self, client, logged_in_user):
|
|
resp = client.post("/auth/validate", json={
|
|
"username": logged_in_user["username"],
|
|
"uuid": logged_in_user["uuid"]
|
|
}, headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["valid"] is True
|
|
assert data["username"] == logged_in_user["username"]
|
|
assert "uuid" in data
|
|
|
|
def test_validate_invalid_token(self, client):
|
|
resp = client.post("/auth/validate", json={
|
|
"username": "test",
|
|
"uuid": "test"
|
|
}, headers=auth_headers("invalid.token.here"))
|
|
assert resp.status_code == 401 # Invalid token returns 401
|
|
|
|
def test_validate_no_token(self, client):
|
|
resp = client.post("/auth/validate", json={
|
|
"username": "test",
|
|
"uuid": "test"
|
|
})
|
|
assert resp.status_code in (401, 403)
|
|
|
|
def test_validate_banned_user(self, client, logged_in_user, admin_user):
|
|
"""Banned user should get valid=false."""
|
|
# Ban the user
|
|
import sqlite3
|
|
from auth import AUTH_DB
|
|
conn = sqlite3.connect(str(AUTH_DB))
|
|
import time
|
|
conn.execute("UPDATE users SET banned_until = ? WHERE username = ?",
|
|
(time.time() + 3600, logged_in_user["username"]))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
resp = client.post("/auth/validate", json={
|
|
"username": logged_in_user["username"],
|
|
"uuid": logged_in_user["uuid"]
|
|
}, headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["valid"] is False
|
|
assert "banned" in data["reason"].lower()
|
|
|
|
|
|
class TestLogout:
|
|
"""Test /auth/logout endpoint."""
|
|
|
|
def test_logout_success(self, client, logged_in_user):
|
|
resp = client.post("/auth/logout", headers=auth_headers(logged_in_user["access_token"]))
|
|
assert resp.status_code == 200
|
|
|
|
# Refresh should fail after logout
|
|
resp = client.post("/auth/refresh", json={
|
|
"refresh_token": logged_in_user["refresh_token"]
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_logout_invalid_token(self, client):
|
|
resp = client.post("/auth/logout", headers=auth_headers("invalid.token.here"))
|
|
assert resp.status_code == 401
|
|
assert resp.status_code == 401
|