214 lines
7.9 KiB
Python
214 lines
7.9 KiB
Python
import csv
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
|
|
def _resolve_base_url() -> str:
|
|
explicit = os.getenv("LICENSE_SERVER_BASE_URL")
|
|
if explicit:
|
|
return explicit.rstrip("/")
|
|
|
|
host = os.getenv("LICENSE_SERVER_IP")
|
|
if host:
|
|
scheme = os.getenv("LICENSE_SERVER_SCHEME", "http")
|
|
port = os.getenv("LICENSE_SERVER_PORT", "8000")
|
|
return f"{scheme}://{host}:{port}".rstrip("/")
|
|
|
|
return "http://127.0.0.1:8000"
|
|
|
|
|
|
def _resolve_timeout() -> float:
|
|
raw_timeout = os.getenv("LICENSE_SERVER_TIMEOUT", "5")
|
|
try:
|
|
return float(raw_timeout)
|
|
except ValueError:
|
|
return 5.0
|
|
|
|
|
|
class LicenseServerClient:
|
|
"""Minimal helper around requests for exercising the license server API."""
|
|
|
|
def __init__(self, base_url: str, api_key: str, timeout: float) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.api_key = api_key
|
|
self.timeout = timeout
|
|
self.session = requests.Session()
|
|
|
|
def close(self) -> None:
|
|
self.session.close()
|
|
|
|
def request(self, method: str, path: str, *, auth: bool = True, **kwargs) -> requests.Response:
|
|
url = f"{self.base_url}{path}"
|
|
headers = kwargs.pop("headers", None) or {}
|
|
if auth and self.api_key:
|
|
headers = {"Authorization": f"Bearer {self.api_key}", **headers}
|
|
kwargs.setdefault("timeout", self.timeout)
|
|
return self.session.request(method=method, url=url, headers=headers, **kwargs)
|
|
|
|
def server_info(self) -> requests.Response:
|
|
return self.request("get", "/", auth=False)
|
|
|
|
def create_license(
|
|
self,
|
|
*,
|
|
is_active: bool = True,
|
|
expiration_iso: Optional[str] = None,
|
|
info: Optional[str] = None,
|
|
auth: bool = True,
|
|
) -> requests.Response:
|
|
params = {}
|
|
if is_active is False:
|
|
params["is_active"] = "false"
|
|
if expiration_iso is not None:
|
|
params["expiration_date"] = expiration_iso
|
|
if info is not None:
|
|
params["info"] = info
|
|
return self.request("post", "/license", params=params or None, auth=auth)
|
|
|
|
def is_valid(self, license_key: str) -> requests.Response:
|
|
return self.request("get", "/is_valid", params={"license_key": license_key}, auth=False)
|
|
|
|
def public_key(self) -> requests.Response:
|
|
return self.request("get", "/public-key", auth=False)
|
|
|
|
def disable_license(self, license_key: str, *, auth: bool = True) -> requests.Response:
|
|
return self.request("post", f"/license/{license_key}/disable", auth=auth)
|
|
|
|
def enable_license(self, license_key: str, *, auth: bool = True) -> requests.Response:
|
|
return self.request("post", f"/license/{license_key}/enable", auth=auth)
|
|
|
|
def update_expiration(
|
|
self,
|
|
license_key: str,
|
|
expiration_iso: Optional[str],
|
|
*,
|
|
auth: bool = True,
|
|
) -> requests.Response:
|
|
params = {"expiration_date": expiration_iso} if expiration_iso is not None else None
|
|
return self.request("post", f"/license/{license_key}/expiration", params=params, auth=auth)
|
|
|
|
def export_licenses(self, *, auth: bool = True) -> requests.Response:
|
|
return self.request("get", "/license/export", auth=auth)
|
|
|
|
def export_history(self, token: Optional[str] = None, *, auth: bool = True) -> requests.Response:
|
|
params = {"token": token} if token else None
|
|
return self.request("get", "/history/export", params=params, auth=auth)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def base_url() -> str:
|
|
return _resolve_base_url()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def api_key() -> str:
|
|
return os.getenv("LICENSE_SERVER_API_KEY", "API_CHANGEME")
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def client(base_url: str, api_key: str) -> LicenseServerClient:
|
|
resolved_timeout = _resolve_timeout()
|
|
license_client = LicenseServerClient(base_url, api_key, timeout=resolved_timeout)
|
|
yield license_client
|
|
license_client.close()
|
|
|
|
|
|
def test_server_info_endpoint(client: LicenseServerClient) -> None:
|
|
response = client.server_info()
|
|
assert response.status_code == 200
|
|
payload = response.json()
|
|
assert "version" in payload
|
|
assert "Micro License Server" in payload["version"]
|
|
|
|
|
|
def test_license_endpoint_requires_authentication(client: LicenseServerClient) -> None:
|
|
response = client.create_license(auth=False)
|
|
assert response.status_code == 401
|
|
|
|
license_export = client.export_licenses(auth=False)
|
|
assert license_export.status_code == 401
|
|
|
|
history_export = client.export_history(auth=False)
|
|
assert history_export.status_code == 401
|
|
|
|
|
|
def _assert_validity_payload(response: requests.Response, expected_valid: bool, *, license_key: str) -> None:
|
|
assert response.status_code == 200
|
|
payload = response.json()
|
|
assert isinstance(payload, dict)
|
|
assert payload.get("valid") is expected_valid
|
|
|
|
if expected_valid and "signature" in payload:
|
|
assert "license" in payload
|
|
license_payload = payload["license"]
|
|
assert license_payload.get("license_key") == license_key
|
|
assert "expiration_timestamp" in license_payload
|
|
|
|
|
|
def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
|
|
info_text = "QA test license"
|
|
create_response = client.create_license(info=info_text)
|
|
assert create_response.status_code == 201
|
|
created_payload = create_response.json()
|
|
license_key = created_payload["license_key"]
|
|
assert created_payload["is_active"] is True
|
|
assert created_payload["info"] == info_text
|
|
parts = license_key.split("-")
|
|
assert len(parts) == 5
|
|
assert all(len(part) == 5 for part in parts)
|
|
|
|
_assert_validity_payload(client.is_valid(license_key), True, license_key=license_key)
|
|
|
|
disable_response = client.disable_license(license_key)
|
|
assert disable_response.status_code == 200
|
|
assert disable_response.json()["is_active"] is False
|
|
|
|
_assert_validity_payload(client.is_valid(license_key), False, license_key=license_key)
|
|
|
|
enable_response = client.enable_license(license_key)
|
|
assert enable_response.status_code == 200
|
|
assert enable_response.json()["is_active"] is True
|
|
|
|
future_expiration = (datetime.now(timezone.utc) + timedelta(hours=1)).replace(microsecond=0)
|
|
expiration_response = client.update_expiration(license_key, future_expiration.isoformat())
|
|
assert expiration_response.status_code == 200
|
|
expiration_payload = expiration_response.json()
|
|
returned_expiration = datetime.fromisoformat(expiration_payload["expiration_timestamp"])
|
|
assert abs((returned_expiration - future_expiration).total_seconds()) <= 1
|
|
|
|
clear_expiration_response = client.update_expiration(license_key, None)
|
|
assert clear_expiration_response.status_code == 200
|
|
assert clear_expiration_response.json()["expiration_timestamp"] is None
|
|
|
|
export_response = client.export_licenses()
|
|
assert export_response.status_code == 200
|
|
export_rows = list(csv.DictReader(export_response.text.splitlines()))
|
|
matching_rows = [row for row in export_rows if row["license_key"] == license_key]
|
|
assert matching_rows, "Created license key not found in export."
|
|
exported_row = matching_rows[0]
|
|
assert exported_row["is_active"] == "true"
|
|
assert exported_row["info"] == info_text
|
|
|
|
history_response = client.export_history(token=license_key)
|
|
assert history_response.status_code == 200
|
|
history_rows = list(csv.DictReader(history_response.text.splitlines()))
|
|
assert history_rows, "Expected history entries for created license key."
|
|
assert any(license_key in row["action"] for row in history_rows), "History does not reference the license key."
|
|
|
|
|
|
def test_public_key_endpoint(client: LicenseServerClient) -> None:
|
|
response = client.public_key()
|
|
|
|
if response.status_code == 404:
|
|
detail = response.json().get("detail")
|
|
assert detail == "License key signing is disabled."
|
|
return
|
|
|
|
assert response.status_code == 200
|
|
body = response.text.strip()
|
|
assert "public_key" in body
|