added tests
This commit is contained in:
181
tests/tests.py
181
tests/tests.py
@@ -0,0 +1,181 @@
|
||||
import csv
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
|
||||
def _resolve_base_url() -> str:
|
||||
explicit = os.getenv("LICENSE_SERVER_BASE_URL")
|
||||
if explicit:
|
||||
return explicit.rstrip("/")
|
||||
|
||||
host = os.getenv("LICENSE_SERVER_IP")
|
||||
if host:
|
||||
scheme = os.getenv("LICENSE_SERVER_SCHEME", "http")
|
||||
port = os.getenv("LICENSE_SERVER_PORT", "8000")
|
||||
return f"{scheme}://{host}:{port}".rstrip("/")
|
||||
|
||||
return "http://127.0.0.1:8000"
|
||||
|
||||
|
||||
def _resolve_timeout() -> float:
|
||||
raw_timeout = os.getenv("LICENSE_SERVER_TIMEOUT", "5")
|
||||
try:
|
||||
return float(raw_timeout)
|
||||
except ValueError:
|
||||
return 5.0
|
||||
|
||||
|
||||
class LicenseServerClient:
|
||||
"""Minimal helper around requests for exercising the license server API."""
|
||||
|
||||
def __init__(self, base_url: str, api_key: str, timeout: float) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.api_key = api_key
|
||||
self.timeout = timeout
|
||||
self.session = requests.Session()
|
||||
|
||||
def close(self) -> None:
|
||||
self.session.close()
|
||||
|
||||
def request(self, method: str, path: str, *, auth: bool = True, **kwargs) -> requests.Response:
|
||||
url = f"{self.base_url}{path}"
|
||||
headers = kwargs.pop("headers", None) or {}
|
||||
if auth and self.api_key:
|
||||
headers = {"Authorization": f"Bearer {self.api_key}", **headers}
|
||||
kwargs.setdefault("timeout", self.timeout)
|
||||
return self.session.request(method=method, url=url, headers=headers, **kwargs)
|
||||
|
||||
def server_info(self) -> requests.Response:
|
||||
return self.request("get", "/", auth=False)
|
||||
|
||||
def create_license(
|
||||
self,
|
||||
*,
|
||||
is_active: bool = True,
|
||||
expiration_iso: Optional[str] = None,
|
||||
auth: bool = True,
|
||||
) -> requests.Response:
|
||||
params = {}
|
||||
if is_active is False:
|
||||
params["is_active"] = "false"
|
||||
if expiration_iso is not None:
|
||||
params["expiration_date"] = expiration_iso
|
||||
return self.request("post", "/license", params=params or None, auth=auth)
|
||||
|
||||
def is_valid(self, license_key: str) -> requests.Response:
|
||||
return self.request("get", "/is_valid", params={"license_key": license_key}, auth=False)
|
||||
|
||||
def disable_license(self, license_key: str, *, auth: bool = True) -> requests.Response:
|
||||
return self.request("post", f"/license/{license_key}/disable", auth=auth)
|
||||
|
||||
def enable_license(self, license_key: str, *, auth: bool = True) -> requests.Response:
|
||||
return self.request("post", f"/license/{license_key}/enable", auth=auth)
|
||||
|
||||
def update_expiration(
|
||||
self,
|
||||
license_key: str,
|
||||
expiration_iso: Optional[str],
|
||||
*,
|
||||
auth: bool = True,
|
||||
) -> requests.Response:
|
||||
params = {"expiration_date": expiration_iso} if expiration_iso is not None else None
|
||||
return self.request("post", f"/license/{license_key}/expiration", params=params, auth=auth)
|
||||
|
||||
def export_licenses(self, *, auth: bool = True) -> requests.Response:
|
||||
return self.request("get", "/license/export", auth=auth)
|
||||
|
||||
def export_history(self, token: Optional[str] = None, *, auth: bool = True) -> requests.Response:
|
||||
params = {"token": token} if token else None
|
||||
return self.request("get", "/history/export", params=params, auth=auth)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_url() -> str:
|
||||
return _resolve_base_url()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def api_key() -> str:
|
||||
return os.getenv("LICENSE_SERVER_API_KEY", "API_CHANGEME")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client(base_url: str, api_key: str) -> LicenseServerClient:
|
||||
resolved_timeout = _resolve_timeout()
|
||||
license_client = LicenseServerClient(base_url, api_key, timeout=resolved_timeout)
|
||||
yield license_client
|
||||
license_client.close()
|
||||
|
||||
|
||||
def test_server_info_endpoint(client: LicenseServerClient) -> None:
|
||||
response = client.server_info()
|
||||
assert response.status_code == 200
|
||||
payload = response.json()
|
||||
assert "version" in payload
|
||||
assert "Micro License Server" in payload["version"]
|
||||
|
||||
|
||||
def test_license_endpoint_requires_authentication(client: LicenseServerClient) -> None:
|
||||
response = client.create_license(auth=False)
|
||||
assert response.status_code == 401
|
||||
|
||||
license_export = client.export_licenses(auth=False)
|
||||
assert license_export.status_code == 401
|
||||
|
||||
history_export = client.export_history(auth=False)
|
||||
assert history_export.status_code == 401
|
||||
|
||||
|
||||
def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
|
||||
create_response = client.create_license()
|
||||
assert create_response.status_code == 201
|
||||
created_payload = create_response.json()
|
||||
license_key = created_payload["license_key"]
|
||||
assert created_payload["is_active"] is True
|
||||
parts = license_key.split("-")
|
||||
assert len(parts) == 5
|
||||
assert all(len(part) == 5 for part in parts)
|
||||
|
||||
validity_response = client.is_valid(license_key)
|
||||
assert validity_response.status_code == 200
|
||||
assert validity_response.json() is True
|
||||
|
||||
disable_response = client.disable_license(license_key)
|
||||
assert disable_response.status_code == 200
|
||||
assert disable_response.json()["is_active"] is False
|
||||
|
||||
after_disable = client.is_valid(license_key)
|
||||
assert after_disable.status_code == 200
|
||||
assert after_disable.json() is False
|
||||
|
||||
enable_response = client.enable_license(license_key)
|
||||
assert enable_response.status_code == 200
|
||||
assert enable_response.json()["is_active"] is True
|
||||
|
||||
future_expiration = (datetime.now(timezone.utc) + timedelta(hours=1)).replace(microsecond=0)
|
||||
expiration_response = client.update_expiration(license_key, future_expiration.isoformat())
|
||||
assert expiration_response.status_code == 200
|
||||
expiration_payload = expiration_response.json()
|
||||
returned_expiration = datetime.fromisoformat(expiration_payload["expiration_timestamp"])
|
||||
assert abs((returned_expiration - future_expiration).total_seconds()) <= 1
|
||||
|
||||
clear_expiration_response = client.update_expiration(license_key, None)
|
||||
assert clear_expiration_response.status_code == 200
|
||||
assert clear_expiration_response.json()["expiration_timestamp"] is None
|
||||
|
||||
export_response = client.export_licenses()
|
||||
assert export_response.status_code == 200
|
||||
export_rows = list(csv.DictReader(export_response.text.splitlines()))
|
||||
matching_rows = [row for row in export_rows if row["license_key"] == license_key]
|
||||
assert matching_rows, "Created license key not found in export."
|
||||
assert matching_rows[0]["is_active"] == "true"
|
||||
|
||||
history_response = client.export_history(token=license_key)
|
||||
assert history_response.status_code == 200
|
||||
history_rows = list(csv.DictReader(history_response.text.splitlines()))
|
||||
assert history_rows, "Expected history entries for created license key."
|
||||
assert any(license_key in row["action"] for row in history_rows), "History does not reference the license key."
|
||||
|
||||
Reference in New Issue
Block a user