2 Commits
v1.0.1 ... main

Author SHA1 Message Date
e198ea687b Add signature response for valid license keys 2025-10-21 22:42:56 -04:00
16385a6529 updated README 2025-10-18 19:25:59 -04:00
6 changed files with 205 additions and 20 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
#signing key directory
keys/
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@@ -18,6 +18,7 @@ POSTGRESQL_PASSWORD="PG_CHANGEME"
API_KEY="API_CHANGEME" API_KEY="API_CHANGEME"
NUM_KEY_CHUNKS=5 NUM_KEY_CHUNKS=5
KEY_CHUNK_LENGTH=5 KEY_CHUNK_LENGTH=5
SIGN_KEY=true
``` ```
### 3. Replace the placeholder secrets in `.env` with secure random values using `openssl rand`: ### 3. Replace the placeholder secrets in `.env` with secure random values using `openssl rand`:
@@ -29,7 +30,7 @@ sed -i \
.env .env
``` ```
You can change `NUM_KEY_CHUNKS` to adjust how many chunks a license key consists of, and `KEY_CHUNK_LENGTH` to change how many characters appear in each chunk. This only changes how *new* keys are generated, so previous keys won't become invalidated. You can change `NUM_KEY_CHUNKS` to adjust how many chunks a license key consists of, and `KEY_CHUNK_LENGTH` to change how many characters appear in each chunk. This only changes how *new* keys are generated, so previous keys won't become invalidated. If `SIGN_KEY` is enabled the server will generate an Ed25519 key pair on first start and place it in the `keys/` directory (mounted into the container); subsequent restarts reuse those files.
### 4. Run `docker-compose up -d` in the cloned repository folder to start the server. ### 4. Run `docker-compose up -d` in the cloned repository folder to start the server.
You will then have the server exposed on port 8000, if you need a different port, change it in the docker-compose file, then run `docker-compose up -d` again. You will then have the server exposed on port 8000, if you need a different port, change it in the docker-compose file, then run `docker-compose up -d` again.
@@ -41,12 +42,13 @@ All authenticated endpoints expect a `Bearer` token via the `Authorization` head
| Method | Path | Auth | Description | Key Request Parameters | Response Highlights | | Method | Path | Auth | Description | Key Request Parameters | Response Highlights |
| --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- |
| `GET` | `/` | No | Returns server metadata. | | `{"version": "Micro License Server vX.Y.Z"}` | | `GET` | `/` | No | Returns server metadata. | | `{"version": "Micro License Server vX.Y.Z"}` |
| `POST` | `/license` | Yes | Generates a new license key. | Query: `is_active` (bool, default `true`), `expiration_date` (ISO 8601, optional), `info` (string, optional metadata) | `license_key`, `expiration_timestamp`, `is_active`, `info` | | `POST` | `/license` | Yes | Generates a new license key. | Query: `is_active` (bool, default `true`), `expiration_date` (ISO 8601, optional), `info` (string, optional metadata) | Unsigned response includes `license_key`, `expiration_timestamp`, `is_active`, `info`. With signing enabled the response becomes `{ "license": { "license_key", "expiration_timestamp" }, "signature" }`. |
| `GET` | `/is_valid` | No | Validates a license key and records last usage time. | Query: `license_key` (required) | Boolean validity | | `GET` | `/is_valid` | No | Validates a license key and records last usage time. | Query: `license_key` (required) | Always returns `{ "valid": bool }`; with signing enabled and valid key also includes `license` payload and `signature`. |
| `POST` | `/license/{license_key}/disable` | Yes | Deactivates a license key. | Path: `license_key` | `license_key`, `is_active: false` | | `POST` | `/license/{license_key}/disable` | Yes | Deactivates a license key. | Path: `license_key` | `license_key`, `is_active: false` |
| `POST` | `/license/{license_key}/enable` | Yes | Reactivates a license key. | Path: `license_key` | `license_key`, `is_active: true` | | `POST` | `/license/{license_key}/enable` | Yes | Reactivates a license key. | Path: `license_key` | `license_key`, `is_active: true` |
| `POST` | `/license/{license_key}/expiration` | Yes | Sets or clears a license key expiration. | Path: `license_key`; Query: `expiration_date` (ISO 8601 or omit to clear) | `license_key`, `expiration_timestamp` (nullable) | | `POST` | `/license/{license_key}/expiration` | Yes | Sets or clears a license key expiration. | Path: `license_key`; Query: `expiration_date` (ISO 8601 or omit to clear) | `license_key`, `expiration_timestamp` (nullable) |
| `GET` | `/license/export` | Yes | Exports license inventory as CSV ordered by issue time. | | CSV with `license_key,issue_timestamp,expiration_timestamp,is_active` | | `GET` | `/license/export` | Yes | Exports license inventory as CSV ordered by issue time. | | CSV with `license_key,issue_timestamp,expiration_timestamp,info,is_active` |
| `GET` | `/public-key` | No (when signing enabled) | Returns the base64-encoded Ed25519 public key used for signatures. | | `{ "public_key": "..." }` |
| `GET` | `/history/export` | Yes | Exports audit history as CSV. | Query: `token` (optional substring filter) | CSV with `action,timestamp` | | `GET` | `/history/export` | Yes | Exports audit history as CSV. | Query: `token` (optional substring filter) | CSV with `action,timestamp` |
## Usage Examples ## Usage Examples
@@ -106,3 +108,44 @@ curl -H "Authorization: Bearer $API_KEY" \
"$BASE_URL/history/export?token=PUT-LICENSE-HERE" \ "$BASE_URL/history/export?token=PUT-LICENSE-HERE" \
-o history_export.csv -o history_export.csv
``` ```
## Verifying Signed Responses
When `SIGN_KEY=true`, successful `/is_valid` responses include a signature covering the license payload. The `/public-key` endpoint exposes the Ed25519 public key as base64-encoded bytes. The example below shows how to verify a response in Python using the `ecdsa` package.
```python
import base64
import json
import requests
from ecdsa import Ed25519, VerifyingKey
BASE_URL = "http://127.0.0.1:8000"
LICENSE_KEY = "PUT-YOUR-LICENSE-HERE"
# Fetch and cache the base64 public key once.
public_key_b64 = requests.get(f"{BASE_URL}/public-key", timeout=5).json()["public_key"]
public_key_bytes = base64.b64decode(public_key_b64)
verifying_key = VerifyingKey.from_string(public_key_bytes, curve=Ed25519)
# Validate the license key.
validation = requests.get(
f"{BASE_URL}/is_valid",
params={"license_key": LICENSE_KEY},
timeout=5,
).json()
if not validation.get("valid"):
raise RuntimeError("License is not valid")
if "signature" not in validation:
raise RuntimeError("Signing is disabled on the server")
license_payload = validation["license"]
message = json.dumps(license_payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
signature = base64.b64decode(validation["signature"])
# Raises ecdsa.BadSignatureError if verification fails.
verifying_key.verify(signature, message)
print("Signature verified")
```

View File

@@ -19,6 +19,8 @@ services:
- postgresql - postgresql
ports: ports:
- "8000:8000" - "8000:8000"
volumes:
- ./keys:/src/keys
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
healthcheck: healthcheck:
test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/', timeout=2)\""] test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/', timeout=2)\""]

View File

@@ -3,3 +3,4 @@ uvicorn
psycopg~=3.2.9 psycopg~=3.2.9
python-dotenv~=1.1.1 python-dotenv~=1.1.1
loguru~=0.7.3 loguru~=0.7.3
ecdsa~=0.19.1

View File

@@ -4,15 +4,20 @@ from dotenv import dotenv_values
from loguru import logger from loguru import logger
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Optional
from pathlib import Path
import secrets import secrets
import string import string
import psycopg import psycopg
import asyncio import asyncio
import base64
import json
from ecdsa import Ed25519, SigningKey, VerifyingKey
DEBUG = False # Reveal debug tools such as /docs swagger UI DEBUG = False # Reveal debug tools such as /docs swagger UI
ENV = dotenv_values(".env") # .env file ENV = dotenv_values(".env") # .env file
VERSION = "v1.0.1" # version number VERSION = "v1.0.2" # version number
ALPHABET = string.ascii_uppercase + string.digits #alphabet and numbers for token generation ALPHABET = string.ascii_uppercase + string.digits #alphabet and numbers for token generation
try: try:
LICENSE_KEY_PARTS = 5 if "NUM_KEY_CHUNKS" not in ENV else int(ENV["NUM_KEY_CHUNKS"]) #number of chunks in the new generated license keys LICENSE_KEY_PARTS = 5 if "NUM_KEY_CHUNKS" not in ENV else int(ENV["NUM_KEY_CHUNKS"]) #number of chunks in the new generated license keys
@@ -24,6 +29,21 @@ try:
except: except:
LICENSE_KEY_PART_LENGTH = 5 LICENSE_KEY_PART_LENGTH = 5
try:
sign_keys_raw = ENV.get("SIGN_KEY")
if sign_keys_raw is None:
sign_keys_raw = ENV.get("SIGN_KEYS")
SIGN_KEYS = False if sign_keys_raw is None else sign_keys_raw.strip().lower() in {"true", "1", "yes", "on"}
except Exception:
SIGN_KEYS = False
KEYS_DIR = Path("keys")
PRIVATE_KEY_PATH = KEYS_DIR / "ed25519_private.pem"
PUBLIC_KEY_PATH = KEYS_DIR / "ed25519_public.pem"
SIGNING_PRIVATE_KEY: Optional[SigningKey] = None
SIGNING_PUBLIC_KEY_B64: Optional[str] = None
api_key = "" api_key = ""
security = HTTPBearer(auto_error=False) security = HTTPBearer(auto_error=False)
@@ -48,6 +68,45 @@ except Exception as error:
logger.error("Failed to connect to PostgreSQL: {}", error) logger.error("Failed to connect to PostgreSQL: {}", error)
exit() exit()
def _load_or_create_signing_keys() -> None:
global SIGNING_PRIVATE_KEY, SIGNING_PUBLIC_KEY_B64
KEYS_DIR.mkdir(parents=True, exist_ok=True)
if not PRIVATE_KEY_PATH.exists():
signing_key = SigningKey.generate(curve=Ed25519)
verifying_key = signing_key.verifying_key
PRIVATE_KEY_PATH.write_bytes(signing_key.to_pem(format="pkcs8"))
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
SIGNING_PRIVATE_KEY = signing_key
SIGNING_PUBLIC_KEY_B64 = base64.b64encode(verifying_key.to_string()).decode("ascii")
logger.info("Generated new Ed25519 signing key pair at {}.", KEYS_DIR)
return
try:
SIGNING_PRIVATE_KEY = SigningKey.from_pem(PRIVATE_KEY_PATH.read_bytes())
except Exception as exc:
logger.error("Failed to read signing private key: {}", exc)
raise
verifying_key: Optional[VerifyingKey]
if PUBLIC_KEY_PATH.exists():
try:
verifying_key = VerifyingKey.from_pem(PUBLIC_KEY_PATH.read_bytes())
except Exception as exc:
logger.warning(
"Existing public key was invalid; regenerating from private key: {}",
exc,
)
verifying_key = SIGNING_PRIVATE_KEY.verifying_key
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
else:
verifying_key = SIGNING_PRIVATE_KEY.verifying_key
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
SIGNING_PUBLIC_KEY_B64 = base64.b64encode(verifying_key.to_string()).decode("ascii")
logger.debug("Signing key pair loaded from {}.", KEYS_DIR)
with psycopg.connect(connect_statement) as conn: with psycopg.connect(connect_statement) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute( cur.execute(
@@ -72,6 +131,13 @@ with psycopg.connect(connect_statement) as conn:
) )
conn.commit() conn.commit()
if SIGN_KEYS:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to initialize signing keys: {}", error)
exit()
if DEBUG: if DEBUG:
app = FastAPI() app = FastAPI()
else: else:
@@ -176,12 +242,32 @@ async def create_license_key(
"info": info, "info": info,
} }
@app.get("/public-key")
async def get_public_key():
if not SIGN_KEYS:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="License key signing is disabled.",
)
if SIGNING_PUBLIC_KEY_B64 is None:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to load public key: {}", error)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Signing is enabled but keys are unavailable.",
)
return {"public_key": SIGNING_PUBLIC_KEY_B64}
@app.get("/is_valid") @app.get("/is_valid")
async def is_license_key_valid(license_key: str) -> bool: async def is_license_key_valid(license_key: str):
"""Validate the supplied license key against the database.""" """Validate the supplied license key against the database."""
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
def _lookup() -> bool: def _lookup() -> tuple[bool, Optional[datetime]]:
try: try:
with psycopg.connect(connect_statement) as conn: with psycopg.connect(connect_statement) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
@@ -192,19 +278,44 @@ async def is_license_key_valid(license_key: str) -> bool:
WHERE key = %s WHERE key = %s
AND is_active = TRUE AND is_active = TRUE
AND (expiration_timestamp IS NULL OR expiration_timestamp > %s) AND (expiration_timestamp IS NULL OR expiration_timestamp > %s)
RETURNING 1 RETURNING expiration_timestamp
""", """,
(now, license_key, now), (now, license_key, now),
) )
if cur.fetchone() is None: row = cur.fetchone()
return False if row is None:
return False, None
conn.commit() conn.commit()
return True return True, row[0]
except Exception: except Exception:
logger.exception("Failed validating license key.") logger.exception("Failed validating license key.")
return False return False, None
return await asyncio.to_thread(_lookup) valid, expiration_ts = await asyncio.to_thread(_lookup)
if not valid:
return {"valid": False}
if not SIGN_KEYS:
return {"valid": True}
if SIGNING_PRIVATE_KEY is None:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to load signing key for validation: {}", error)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Signing is enabled but keys are unavailable.",
)
payload = {
"license_key": license_key,
"expiration_timestamp": expiration_ts.isoformat() if expiration_ts else None,
}
message = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
signature = base64.b64encode(SIGNING_PRIVATE_KEY.sign(message)).decode("ascii")
return {"valid": True, "license": payload, "signature": signature}
@app.post("/license/{license_key}/disable", status_code=status.HTTP_200_OK) @app.post("/license/{license_key}/disable", status_code=status.HTTP_200_OK)
async def disable_license_key( async def disable_license_key(

View File

@@ -72,6 +72,9 @@ class LicenseServerClient:
def is_valid(self, license_key: str) -> requests.Response: def is_valid(self, license_key: str) -> requests.Response:
return self.request("get", "/is_valid", params={"license_key": license_key}, auth=False) return self.request("get", "/is_valid", params={"license_key": license_key}, auth=False)
def public_key(self) -> requests.Response:
return self.request("get", "/public-key", auth=False)
def disable_license(self, license_key: str, *, auth: bool = True) -> requests.Response: def disable_license(self, license_key: str, *, auth: bool = True) -> requests.Response:
return self.request("post", f"/license/{license_key}/disable", auth=auth) return self.request("post", f"/license/{license_key}/disable", auth=auth)
@@ -133,6 +136,19 @@ def test_license_endpoint_requires_authentication(client: LicenseServerClient) -
assert history_export.status_code == 401 assert history_export.status_code == 401
def _assert_validity_payload(response: requests.Response, expected_valid: bool, *, license_key: str) -> None:
assert response.status_code == 200
payload = response.json()
assert isinstance(payload, dict)
assert payload.get("valid") is expected_valid
if expected_valid and "signature" in payload:
assert "license" in payload
license_payload = payload["license"]
assert license_payload.get("license_key") == license_key
assert "expiration_timestamp" in license_payload
def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None: def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
info_text = "QA test license" info_text = "QA test license"
create_response = client.create_license(info=info_text) create_response = client.create_license(info=info_text)
@@ -145,17 +161,13 @@ def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
assert len(parts) == 5 assert len(parts) == 5
assert all(len(part) == 5 for part in parts) assert all(len(part) == 5 for part in parts)
validity_response = client.is_valid(license_key) _assert_validity_payload(client.is_valid(license_key), True, license_key=license_key)
assert validity_response.status_code == 200
assert validity_response.json() is True
disable_response = client.disable_license(license_key) disable_response = client.disable_license(license_key)
assert disable_response.status_code == 200 assert disable_response.status_code == 200
assert disable_response.json()["is_active"] is False assert disable_response.json()["is_active"] is False
after_disable = client.is_valid(license_key) _assert_validity_payload(client.is_valid(license_key), False, license_key=license_key)
assert after_disable.status_code == 200
assert after_disable.json() is False
enable_response = client.enable_license(license_key) enable_response = client.enable_license(license_key)
assert enable_response.status_code == 200 assert enable_response.status_code == 200
@@ -186,3 +198,16 @@ def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
history_rows = list(csv.DictReader(history_response.text.splitlines())) history_rows = list(csv.DictReader(history_response.text.splitlines()))
assert history_rows, "Expected history entries for created license key." assert history_rows, "Expected history entries for created license key."
assert any(license_key in row["action"] for row in history_rows), "History does not reference the license key." assert any(license_key in row["action"] for row in history_rows), "History does not reference the license key."
def test_public_key_endpoint(client: LicenseServerClient) -> None:
response = client.public_key()
if response.status_code == 404:
detail = response.json().get("detail")
assert detail == "License key signing is disabled."
return
assert response.status_code == 200
body = response.text.strip()
assert "public_key" in body