"""Tests for client-facing endpoints — verifying server responses match what the Java launcher expects. This tests the full client-server contract: - AuthManager.java: login, register, refresh, logout, /admin/me for UserInfo - PackDownloader.java: /packs, /pack/{name}, /pack/{name}/diff, /pack/{name}/file/{path} - ZHttpClient.java: /launcher/version, /proxy/* - ServerPack.java: pack list fields - PackManifest inner class: manifest fields - DiffResponse inner class: diff fields - FileInfo inner class: file info fields """ import asyncio import hashlib import json import secrets import sqlite3 import time from pathlib import Path import pytest from tests.conftest import auth_headers import auth from pack_manager import scan_pack, PACKS_DIR def scan_pack_sync(pack_name): """Run scan_pack synchronously.""" loop = asyncio.new_event_loop() try: return loop.run_until_complete(scan_pack(pack_name)) finally: loop.close() @pytest.fixture def pack_fixture(tmp_path, logged_in_user): """Create a test pack with a mod file and scan it.""" pack_name = f"testpack_{secrets.token_hex(4)}" pack_dir = PACKS_DIR / pack_name pack_dir.mkdir(parents=True, exist_ok=True) mod_dir = pack_dir / "mods" mod_dir.mkdir() mod_content = b"fake mod content for testing" mod_file = mod_dir / "test-mod.jar" mod_file.write_bytes(mod_content) # Scan to generate .meta meta = scan_pack_sync(pack_name) yield { "name": pack_name, "dir": pack_dir, "mod_content": mod_content, "mod_path": "mods/test-mod.jar", "mod_hash": hashlib.sha256(mod_content).hexdigest(), "meta": meta, } # Cleanup import shutil shutil.rmtree(pack_dir, ignore_errors=True) meta_path = Path("data") / f"{pack_name}.meta" if meta_path.exists(): meta_path.unlink() class TestAuthFlowClient: """Test auth flow exactly as Java AuthManager.java does it.""" def test_full_auth_lifecycle(self, client): """Register → Login → Refresh → Logout, matching Java client behavior.""" username = f"lifecycle_{secrets.token_hex(4)}" password = "LifeCyclePass123" # 1. Register (AuthManager.authRequest) resp = client.post("/auth/register", json={"username": username, "password": password}) assert resp.status_code == 200 reg = resp.json() assert reg["access_token"] assert reg["refresh_token"] assert isinstance(reg["expires_in"], int) assert reg["uuid"] assert reg["username"] == username assert isinstance(reg["role"], int) # 2. Login (AuthManager.authRequest) resp = client.post("/auth/login", json={"username": username, "password": password}) assert resp.status_code == 200 login = resp.json() assert login["access_token"] assert login["refresh_token"] assert isinstance(login["expires_in"], int) assert login["uuid"] assert login["username"] == username assert isinstance(login["role"], int) access_token = login["access_token"] refresh_token = login["refresh_token"] # 3. Refresh (AuthManager.tryRefresh) resp = client.post("/auth/refresh", json={"refresh_token": refresh_token}) assert resp.status_code == 200 refresh = resp.json() assert refresh["access_token"] assert refresh["refresh_token"] assert isinstance(refresh["expires_in"], int) assert refresh["username"] == username assert refresh["uuid"] assert isinstance(refresh["role"], int) # 4. Validate token (used by Minecraft server auth) resp = client.post("/auth/validate", json={ "username": username, "uuid": refresh["uuid"] }, headers=auth_headers(refresh["access_token"])) assert resp.status_code == 200 validate = resp.json() assert validate["valid"] is True assert validate["username"] == username # 5. /admin/me (AuthManager.fetchUserInfo) resp = client.get("/admin/me", headers=auth_headers(refresh["access_token"])) assert resp.status_code == 200 me = resp.json() assert isinstance(me["id"], int) assert me["username"] == username assert me["uuid"] assert isinstance(me["role"], int) assert isinstance(me["role_name"], str) assert isinstance(me["has_pass"], bool) assert isinstance(me["permissions"], list) # 6. Logout resp = client.post("/auth/logout", headers=auth_headers(refresh["access_token"])) assert resp.status_code == 200 # 7. Refresh should fail after logout resp = client.post("/auth/refresh", json={"refresh_token": refresh["refresh_token"]}) assert resp.status_code == 401 class TestPacksClientContract: """Test /packs response matches Java ServerPack.java parsing.""" def test_packs_empty_list(self, client, logged_in_user_with_pass): """Client parses {"packs": [...]} — empty list should work.""" resp = client.get("/packs", headers=auth_headers(logged_in_user_with_pass["access_token"])) assert resp.status_code == 200 data = resp.json() assert "packs" in data assert isinstance(data["packs"], list) def test_packs_with_pack(self, client, logged_in_user_with_pass, pack_fixture): """Full pack with all fields that ServerPack.java expects.""" resp = client.get("/packs", headers=auth_headers(logged_in_user_with_pass["access_token"])) assert resp.status_code == 200 data = resp.json() assert len(data["packs"]) >= 1 # Find our pack pack = next((p for p in data["packs"] if p["name"] == pack_fixture["name"]), None) assert pack is not None # ServerPack.java fields assert "name" in pack assert "version" in pack assert isinstance(pack["version"], int) assert "minecraft_version" in pack assert isinstance(pack["minecraft_version"], str) assert "loader_type" in pack assert isinstance(pack["loader_type"], str) assert "loader_version" in pack assert pack["loader_version"] is None or isinstance(pack["loader_version"], str) assert "files_count" in pack assert isinstance(pack["files_count"], int) assert "updated_at" in pack class TestPackManifestClientContract: """Test /pack/{name} response matches Java PackDownloader.PackManifest.""" def test_pack_manifest_not_found(self, client, logged_in_user): resp = client.get("/pack/nonexistent", headers=auth_headers(logged_in_user["access_token"])) assert resp.status_code == 404 def test_pack_manifest_fields(self, client, logged_in_user_with_pass, pack_fixture): """All fields that PackManifest.java expects.""" pack_name = pack_fixture["name"] resp = client.get(f"/pack/{pack_name}", headers=auth_headers(logged_in_user_with_pass["access_token"])) assert resp.status_code == 200 data = resp.json() # PackManifest.java fields assert "pack_name" in data assert "version" in data assert isinstance(data["version"], int) assert "minecraft_version" in data assert isinstance(data["minecraft_version"], str) assert "loader_type" in data assert "loader_version" in data or data.get("loader_version") is None assert "asset_index" in data or data.get("asset_index") is None assert "files" in data assert isinstance(data["files"], dict) # Files in manifest have path, hash, size, added_at, modified_at # URL is only added in the diff response for path, entry in data["files"].items(): assert "hash" in entry assert isinstance(entry["hash"], str) assert "size" in entry assert isinstance(entry["size"], int) def test_pack_manifest_no_auth_is_public(self, client, pack_fixture): """/pack/{name} is public — doesn't require auth.""" resp = client.get(f"/pack/{pack_fixture['name']}") assert resp.status_code == 200 class TestPackDiffClientContract: """Test /pack/{name}/diff response matches Java PackDownloader.DiffResponse.""" def test_diff_all_files_new(self, client, logged_in_user_with_pass, pack_fixture): """Client sends empty file list — all files should be in to_download.""" pack_name = pack_fixture["name"] resp = client.post( f"/pack/{pack_name}/diff", json={}, headers=auth_headers(logged_in_user_with_pass["access_token"]) ) assert resp.status_code == 200 data = resp.json() # DiffResponse.java fields assert "version" in data assert isinstance(data["version"], int) assert "to_download" in data assert isinstance(data["to_download"], list) assert "to_delete" in data assert isinstance(data["to_delete"], list) assert "to_update" in data assert isinstance(data["to_update"], list) # All files should be new assert len(data["to_download"]) >= 1 for file_info in data["to_download"]: # FileInfo.java fields assert "path" in file_info assert "url" in file_info assert "size" in file_info assert isinstance(file_info["size"], int) assert "hash" in file_info assert isinstance(file_info["hash"], str) def test_diff_no_changes(self, client, logged_in_user_with_pass, pack_fixture): """Client sends correct hashes — no downloads needed.""" pack_name = pack_fixture["name"] local_files = {pack_fixture["mod_path"]: pack_fixture["mod_hash"]} resp = client.post( f"/pack/{pack_name}/diff", json=local_files, headers=auth_headers(logged_in_user_with_pass["access_token"]) ) assert resp.status_code == 200 data = resp.json() assert len(data["to_download"]) == 0 assert len(data["to_update"]) == 0 assert len(data["to_delete"]) == 0 def test_diff_with_outdated_file(self, client, logged_in_user_with_pass, pack_fixture): """Client sends wrong hash — file should be in to_download + to_update.""" pack_name = pack_fixture["name"] local_files = {pack_fixture["mod_path"]: "old_wrong_hash"} resp = client.post( f"/pack/{pack_name}/diff", json=local_files, headers=auth_headers(logged_in_user_with_pass["access_token"]) ) assert resp.status_code == 200 data = resp.json() assert len(data["to_download"]) == 1 assert len(data["to_update"]) == 1 assert data["to_update"][0] == pack_fixture["mod_path"] def test_diff_extra_local_file(self, client, logged_in_user_with_pass, pack_fixture): """Client has extra file — should be in to_delete.""" pack_name = pack_fixture["name"] local_files = {"mods/removed-mod.jar": "some_hash"} resp = client.post( f"/pack/{pack_name}/diff", json=local_files, headers=auth_headers(logged_in_user_with_pass["access_token"]) ) assert resp.status_code == 200 data = resp.json() assert "mods/removed-mod.jar" in data["to_delete"] class TestPackFileDownload: """Test /pack/{name}/file/{path} — file serving.""" def test_pack_file_download(self, client, logged_in_user_with_pass, pack_fixture): """Download a file from a pack.""" pack_name = pack_fixture["name"] resp = client.get( f"/pack/{pack_name}/file/{pack_fixture['mod_path']}", headers=auth_headers(logged_in_user_with_pass["access_token"]) ) assert resp.status_code == 200 assert resp.content == pack_fixture["mod_content"] def test_pack_file_not_found(self, client, logged_in_user): resp = client.get( "/pack/nonexistent/file/mods/mod.jar", headers=auth_headers(logged_in_user["access_token"]) ) assert resp.status_code == 404 def test_pack_file_path_traversal_blocked(self, client, logged_in_user): """Path traversal should be blocked.""" resp = client.get( "/pack/somepack/file/../../../etc/passwd", headers=auth_headers(logged_in_user["access_token"]) ) assert resp.status_code in (403, 404) class TestPackPermissions: """Test that packs require proper permissions (pass/role).""" def test_packs_no_auth(self, client): resp = client.get("/packs") assert resp.status_code in (401, 403) def test_pack_diff_no_auth(self, client): resp = client.post("/pack/test/diff", json={}) assert resp.status_code in (401, 403) def test_packs_user_without_pass(self, client, logged_in_user): """User without pass should get 403 on /packs.""" resp = client.get("/packs", headers=auth_headers(logged_in_user["access_token"])) assert resp.status_code == 403 def test_pack_diff_user_without_pass(self, client, logged_in_user): """User without pass should get 403 on /pack/{name}/diff.""" resp = client.post( "/pack/test/diff", json={}, headers=auth_headers(logged_in_user["access_token"]) ) assert resp.status_code == 403 class TestLauncherVersion: """Test /launcher/version endpoint.""" def test_launcher_version(self, client): """Should return version info.""" resp = client.get("/launcher/version") assert resp.status_code == 200 data = resp.json() assert "version" in data or "latest" in data class TestProxyEndpoints: """Test /proxy/* endpoints that ZHttpClient uses.""" def test_proxy_status(self, client): """Proxy status works without proxy_client.""" resp = client.get("/proxy/status") # May be 200 or 500 if proxy_client is None assert resp.status_code in (200, 500) def test_proxy_fabric_versions(self, client): """ZHttpClient uses this for Fabric loader versions.""" resp = client.get("/proxy/fabric/versions/loader") # Works if proxy_client is set up, fails otherwise assert resp.status_code in (200, 500, 502, 504)