diff --git a/.gitignore b/.gitignore index 36b13f1..efdde0a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +#signing key directory +keys/ + # ---> Python # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/README.md b/README.md index 7d4825c..ffc9c13 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ POSTGRESQL_PASSWORD="PG_CHANGEME" API_KEY="API_CHANGEME" NUM_KEY_CHUNKS=5 KEY_CHUNK_LENGTH=5 +SIGN_KEY=true ``` ### 3. Replace the placeholder secrets in `.env` with secure random values using `openssl rand`: @@ -29,7 +30,7 @@ sed -i \ .env ``` -You can change `NUM_KEY_CHUNKS` to adjust how many chunks a license key consists of, and `KEY_CHUNK_LENGTH` to change how many characters appear in each chunk. This only changes how *new* keys are generated, so previous keys won't become invalidated. +You can change `NUM_KEY_CHUNKS` to adjust how many chunks a license key consists of, and `KEY_CHUNK_LENGTH` to change how many characters appear in each chunk. This only changes how *new* keys are generated, so previous keys won't become invalidated. If `SIGN_KEY` is enabled the server will generate an Ed25519 key pair on first start and place it in the `keys/` directory (mounted into the container); subsequent restarts reuse those files. ### 4. Run `docker-compose up -d` in the cloned repository folder to start the server. You will then have the server exposed on port 8000, if you need a different port, change it in the docker-compose file, then run `docker-compose up -d` again. @@ -41,12 +42,13 @@ All authenticated endpoints expect a `Bearer` token via the `Authorization` head | Method | Path | Auth | Description | Key Request Parameters | Response Highlights | | --- | --- | --- | --- | --- | --- | | `GET` | `/` | No | Returns server metadata. | – | `{"version": "Micro License Server vX.Y.Z"}` | -| `POST` | `/license` | Yes | Generates a new license key. | Query: `is_active` (bool, default `true`), `expiration_date` (ISO 8601, optional), `info` (string, optional metadata) | `license_key`, `expiration_timestamp`, `is_active`, `info` | -| `GET` | `/is_valid` | No | Validates a license key and records last usage time. | Query: `license_key` (required) | Boolean validity | +| `POST` | `/license` | Yes | Generates a new license key. | Query: `is_active` (bool, default `true`), `expiration_date` (ISO 8601, optional), `info` (string, optional metadata) | Unsigned response includes `license_key`, `expiration_timestamp`, `is_active`, `info`. With signing enabled the response becomes `{ "license": { "license_key", "expiration_timestamp" }, "signature" }`. | +| `GET` | `/is_valid` | No | Validates a license key and records last usage time. | Query: `license_key` (required) | Always returns `{ "valid": bool }`; with signing enabled and valid key also includes `license` payload and `signature`. | | `POST` | `/license/{license_key}/disable` | Yes | Deactivates a license key. | Path: `license_key` | `license_key`, `is_active: false` | | `POST` | `/license/{license_key}/enable` | Yes | Reactivates a license key. | Path: `license_key` | `license_key`, `is_active: true` | | `POST` | `/license/{license_key}/expiration` | Yes | Sets or clears a license key expiration. | Path: `license_key`; Query: `expiration_date` (ISO 8601 or omit to clear) | `license_key`, `expiration_timestamp` (nullable) | | `GET` | `/license/export` | Yes | Exports license inventory as CSV ordered by issue time. | – | CSV with `license_key,issue_timestamp,expiration_timestamp,info,is_active` | +| `GET` | `/public-key` | No (when signing enabled) | Returns the base64-encoded Ed25519 public key used for signatures. | – | `{ "public_key": "..." }` | | `GET` | `/history/export` | Yes | Exports audit history as CSV. | Query: `token` (optional substring filter) | CSV with `action,timestamp` | ## Usage Examples @@ -106,3 +108,44 @@ curl -H "Authorization: Bearer $API_KEY" \ "$BASE_URL/history/export?token=PUT-LICENSE-HERE" \ -o history_export.csv ``` + + +## Verifying Signed Responses + +When `SIGN_KEY=true`, successful `/is_valid` responses include a signature covering the license payload. The `/public-key` endpoint exposes the Ed25519 public key as base64-encoded bytes. The example below shows how to verify a response in Python using the `ecdsa` package. + +```python +import base64 +import json +import requests +from ecdsa import Ed25519, VerifyingKey + +BASE_URL = "http://127.0.0.1:8000" +LICENSE_KEY = "PUT-YOUR-LICENSE-HERE" + +# Fetch and cache the base64 public key once. +public_key_b64 = requests.get(f"{BASE_URL}/public-key", timeout=5).json()["public_key"] +public_key_bytes = base64.b64decode(public_key_b64) +verifying_key = VerifyingKey.from_string(public_key_bytes, curve=Ed25519) + +# Validate the license key. +validation = requests.get( + f"{BASE_URL}/is_valid", + params={"license_key": LICENSE_KEY}, + timeout=5, +).json() + +if not validation.get("valid"): + raise RuntimeError("License is not valid") + +if "signature" not in validation: + raise RuntimeError("Signing is disabled on the server") + +license_payload = validation["license"] +message = json.dumps(license_payload, sort_keys=True, separators=(",", ":")).encode("utf-8") +signature = base64.b64decode(validation["signature"]) + +# Raises ecdsa.BadSignatureError if verification fails. +verifying_key.verify(signature, message) +print("Signature verified") +``` diff --git a/docker-compose.yml b/docker-compose.yml index 08fee79..f7deae3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,8 @@ services: - postgresql ports: - "8000:8000" + volumes: + - ./keys:/src/keys command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] healthcheck: test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/', timeout=2)\""] diff --git a/requirements.txt b/requirements.txt index e46d914..8da6fc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ fastapi~=0.116.1 uvicorn psycopg~=3.2.9 python-dotenv~=1.1.1 -loguru~=0.7.3 \ No newline at end of file +loguru~=0.7.3 +ecdsa~=0.19.1 diff --git a/src/main.py b/src/main.py index 7cfccb5..4f0487c 100644 --- a/src/main.py +++ b/src/main.py @@ -4,15 +4,20 @@ from dotenv import dotenv_values from loguru import logger from datetime import datetime, timezone from typing import Optional +from pathlib import Path import secrets import string import psycopg import asyncio +import base64 +import json + +from ecdsa import Ed25519, SigningKey, VerifyingKey DEBUG = False # Reveal debug tools such as /docs swagger UI ENV = dotenv_values(".env") # .env file -VERSION = "v1.0.1" # version number +VERSION = "v1.0.2" # version number ALPHABET = string.ascii_uppercase + string.digits #alphabet and numbers for token generation try: LICENSE_KEY_PARTS = 5 if "NUM_KEY_CHUNKS" not in ENV else int(ENV["NUM_KEY_CHUNKS"]) #number of chunks in the new generated license keys @@ -24,6 +29,21 @@ try: except: LICENSE_KEY_PART_LENGTH = 5 +try: + sign_keys_raw = ENV.get("SIGN_KEY") + if sign_keys_raw is None: + sign_keys_raw = ENV.get("SIGN_KEYS") + SIGN_KEYS = False if sign_keys_raw is None else sign_keys_raw.strip().lower() in {"true", "1", "yes", "on"} +except Exception: + SIGN_KEYS = False + +KEYS_DIR = Path("keys") +PRIVATE_KEY_PATH = KEYS_DIR / "ed25519_private.pem" +PUBLIC_KEY_PATH = KEYS_DIR / "ed25519_public.pem" + +SIGNING_PRIVATE_KEY: Optional[SigningKey] = None +SIGNING_PUBLIC_KEY_B64: Optional[str] = None + api_key = "" security = HTTPBearer(auto_error=False) @@ -48,6 +68,45 @@ except Exception as error: logger.error("Failed to connect to PostgreSQL: {}", error) exit() +def _load_or_create_signing_keys() -> None: + global SIGNING_PRIVATE_KEY, SIGNING_PUBLIC_KEY_B64 + + KEYS_DIR.mkdir(parents=True, exist_ok=True) + + if not PRIVATE_KEY_PATH.exists(): + signing_key = SigningKey.generate(curve=Ed25519) + verifying_key = signing_key.verifying_key + PRIVATE_KEY_PATH.write_bytes(signing_key.to_pem(format="pkcs8")) + PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem()) + SIGNING_PRIVATE_KEY = signing_key + SIGNING_PUBLIC_KEY_B64 = base64.b64encode(verifying_key.to_string()).decode("ascii") + logger.info("Generated new Ed25519 signing key pair at {}.", KEYS_DIR) + return + + try: + SIGNING_PRIVATE_KEY = SigningKey.from_pem(PRIVATE_KEY_PATH.read_bytes()) + except Exception as exc: + logger.error("Failed to read signing private key: {}", exc) + raise + + verifying_key: Optional[VerifyingKey] + if PUBLIC_KEY_PATH.exists(): + try: + verifying_key = VerifyingKey.from_pem(PUBLIC_KEY_PATH.read_bytes()) + except Exception as exc: + logger.warning( + "Existing public key was invalid; regenerating from private key: {}", + exc, + ) + verifying_key = SIGNING_PRIVATE_KEY.verifying_key + PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem()) + else: + verifying_key = SIGNING_PRIVATE_KEY.verifying_key + PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem()) + + SIGNING_PUBLIC_KEY_B64 = base64.b64encode(verifying_key.to_string()).decode("ascii") + logger.debug("Signing key pair loaded from {}.", KEYS_DIR) + with psycopg.connect(connect_statement) as conn: with conn.cursor() as cur: cur.execute( @@ -72,6 +131,13 @@ with psycopg.connect(connect_statement) as conn: ) conn.commit() +if SIGN_KEYS: + try: + _load_or_create_signing_keys() + except Exception as error: + logger.error("Failed to initialize signing keys: {}", error) + exit() + if DEBUG: app = FastAPI() else: @@ -176,12 +242,32 @@ async def create_license_key( "info": info, } +@app.get("/public-key") +async def get_public_key(): + if not SIGN_KEYS: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="License key signing is disabled.", + ) + + if SIGNING_PUBLIC_KEY_B64 is None: + try: + _load_or_create_signing_keys() + except Exception as error: + logger.error("Failed to load public key: {}", error) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Signing is enabled but keys are unavailable.", + ) + + return {"public_key": SIGNING_PUBLIC_KEY_B64} + @app.get("/is_valid") -async def is_license_key_valid(license_key: str) -> bool: +async def is_license_key_valid(license_key: str): """Validate the supplied license key against the database.""" now = datetime.now(timezone.utc) - def _lookup() -> bool: + def _lookup() -> tuple[bool, Optional[datetime]]: try: with psycopg.connect(connect_statement) as conn: with conn.cursor() as cur: @@ -192,19 +278,44 @@ async def is_license_key_valid(license_key: str) -> bool: WHERE key = %s AND is_active = TRUE AND (expiration_timestamp IS NULL OR expiration_timestamp > %s) - RETURNING 1 + RETURNING expiration_timestamp """, (now, license_key, now), ) - if cur.fetchone() is None: - return False + row = cur.fetchone() + if row is None: + return False, None conn.commit() - return True + return True, row[0] except Exception: logger.exception("Failed validating license key.") - return False + return False, None - return await asyncio.to_thread(_lookup) + valid, expiration_ts = await asyncio.to_thread(_lookup) + + if not valid: + return {"valid": False} + + if not SIGN_KEYS: + return {"valid": True} + + if SIGNING_PRIVATE_KEY is None: + try: + _load_or_create_signing_keys() + except Exception as error: + logger.error("Failed to load signing key for validation: {}", error) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Signing is enabled but keys are unavailable.", + ) + + payload = { + "license_key": license_key, + "expiration_timestamp": expiration_ts.isoformat() if expiration_ts else None, + } + message = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + signature = base64.b64encode(SIGNING_PRIVATE_KEY.sign(message)).decode("ascii") + return {"valid": True, "license": payload, "signature": signature} @app.post("/license/{license_key}/disable", status_code=status.HTTP_200_OK) async def disable_license_key( diff --git a/tests/tests.py b/tests/tests.py index 3ad1638..4be9112 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -72,6 +72,9 @@ class LicenseServerClient: def is_valid(self, license_key: str) -> requests.Response: return self.request("get", "/is_valid", params={"license_key": license_key}, auth=False) + def public_key(self) -> requests.Response: + return self.request("get", "/public-key", auth=False) + def disable_license(self, license_key: str, *, auth: bool = True) -> requests.Response: return self.request("post", f"/license/{license_key}/disable", auth=auth) @@ -133,6 +136,19 @@ def test_license_endpoint_requires_authentication(client: LicenseServerClient) - assert history_export.status_code == 401 +def _assert_validity_payload(response: requests.Response, expected_valid: bool, *, license_key: str) -> None: + assert response.status_code == 200 + payload = response.json() + assert isinstance(payload, dict) + assert payload.get("valid") is expected_valid + + if expected_valid and "signature" in payload: + assert "license" in payload + license_payload = payload["license"] + assert license_payload.get("license_key") == license_key + assert "expiration_timestamp" in license_payload + + def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None: info_text = "QA test license" create_response = client.create_license(info=info_text) @@ -145,17 +161,13 @@ def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None: assert len(parts) == 5 assert all(len(part) == 5 for part in parts) - validity_response = client.is_valid(license_key) - assert validity_response.status_code == 200 - assert validity_response.json() is True + _assert_validity_payload(client.is_valid(license_key), True, license_key=license_key) disable_response = client.disable_license(license_key) assert disable_response.status_code == 200 assert disable_response.json()["is_active"] is False - after_disable = client.is_valid(license_key) - assert after_disable.status_code == 200 - assert after_disable.json() is False + _assert_validity_payload(client.is_valid(license_key), False, license_key=license_key) enable_response = client.enable_license(license_key) assert enable_response.status_code == 200 @@ -186,3 +198,16 @@ def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None: history_rows = list(csv.DictReader(history_response.text.splitlines())) assert history_rows, "Expected history entries for created license key." assert any(license_key in row["action"] for row in history_rows), "History does not reference the license key." + + +def test_public_key_endpoint(client: LicenseServerClient) -> None: + response = client.public_key() + + if response.status_code == 404: + detail = response.json().get("detail") + assert detail == "License key signing is disabled." + return + + assert response.status_code == 200 + body = response.text.strip() + assert "public_key" in body