2 Commits

Author SHA1 Message Date
e198ea687b Add signature response for valid license keys 2025-10-21 22:42:56 -04:00
16385a6529 updated README 2025-10-18 19:25:59 -04:00
6 changed files with 205 additions and 20 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
#signing key directory
keys/
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/

View File

@@ -18,6 +18,7 @@ POSTGRESQL_PASSWORD="PG_CHANGEME"
API_KEY="API_CHANGEME"
NUM_KEY_CHUNKS=5
KEY_CHUNK_LENGTH=5
SIGN_KEY=true
```
### 3. Replace the placeholder secrets in `.env` with secure random values using `openssl rand`:
@@ -29,7 +30,7 @@ sed -i \
.env
```
You can change `NUM_KEY_CHUNKS` to adjust how many chunks a license key consists of, and `KEY_CHUNK_LENGTH` to change how many characters appear in each chunk. This only changes how *new* keys are generated, so previous keys won't become invalidated.
You can change `NUM_KEY_CHUNKS` to adjust how many chunks a license key consists of, and `KEY_CHUNK_LENGTH` to change how many characters appear in each chunk. This only changes how *new* keys are generated, so previous keys won't become invalidated. If `SIGN_KEY` is enabled the server will generate an Ed25519 key pair on first start and place it in the `keys/` directory (mounted into the container); subsequent restarts reuse those files.
### 4. Run `docker-compose up -d` in the cloned repository folder to start the server.
You will then have the server exposed on port 8000, if you need a different port, change it in the docker-compose file, then run `docker-compose up -d` again.
@@ -41,12 +42,13 @@ All authenticated endpoints expect a `Bearer` token via the `Authorization` head
| Method | Path | Auth | Description | Key Request Parameters | Response Highlights |
| --- | --- | --- | --- | --- | --- |
| `GET` | `/` | No | Returns server metadata. | | `{"version": "Micro License Server vX.Y.Z"}` |
| `POST` | `/license` | Yes | Generates a new license key. | Query: `is_active` (bool, default `true`), `expiration_date` (ISO 8601, optional), `info` (string, optional metadata) | `license_key`, `expiration_timestamp`, `is_active`, `info` |
| `GET` | `/is_valid` | No | Validates a license key and records last usage time. | Query: `license_key` (required) | Boolean validity |
| `POST` | `/license` | Yes | Generates a new license key. | Query: `is_active` (bool, default `true`), `expiration_date` (ISO 8601, optional), `info` (string, optional metadata) | Unsigned response includes `license_key`, `expiration_timestamp`, `is_active`, `info`. With signing enabled the response becomes `{ "license": { "license_key", "expiration_timestamp" }, "signature" }`. |
| `GET` | `/is_valid` | No | Validates a license key and records last usage time. | Query: `license_key` (required) | Always returns `{ "valid": bool }`; with signing enabled and valid key also includes `license` payload and `signature`. |
| `POST` | `/license/{license_key}/disable` | Yes | Deactivates a license key. | Path: `license_key` | `license_key`, `is_active: false` |
| `POST` | `/license/{license_key}/enable` | Yes | Reactivates a license key. | Path: `license_key` | `license_key`, `is_active: true` |
| `POST` | `/license/{license_key}/expiration` | Yes | Sets or clears a license key expiration. | Path: `license_key`; Query: `expiration_date` (ISO 8601 or omit to clear) | `license_key`, `expiration_timestamp` (nullable) |
| `GET` | `/license/export` | Yes | Exports license inventory as CSV ordered by issue time. | | CSV with `license_key,issue_timestamp,expiration_timestamp,is_active` |
| `GET` | `/license/export` | Yes | Exports license inventory as CSV ordered by issue time. | | CSV with `license_key,issue_timestamp,expiration_timestamp,info,is_active` |
| `GET` | `/public-key` | No (when signing enabled) | Returns the base64-encoded Ed25519 public key used for signatures. | | `{ "public_key": "..." }` |
| `GET` | `/history/export` | Yes | Exports audit history as CSV. | Query: `token` (optional substring filter) | CSV with `action,timestamp` |
## Usage Examples
@@ -106,3 +108,44 @@ curl -H "Authorization: Bearer $API_KEY" \
"$BASE_URL/history/export?token=PUT-LICENSE-HERE" \
-o history_export.csv
```
## Verifying Signed Responses
When `SIGN_KEY=true`, successful `/is_valid` responses include a signature covering the license payload. The `/public-key` endpoint exposes the Ed25519 public key as base64-encoded bytes. The example below shows how to verify a response in Python using the `ecdsa` package.
```python
import base64
import json
import requests
from ecdsa import Ed25519, VerifyingKey
BASE_URL = "http://127.0.0.1:8000"
LICENSE_KEY = "PUT-YOUR-LICENSE-HERE"
# Fetch and cache the base64 public key once.
public_key_b64 = requests.get(f"{BASE_URL}/public-key", timeout=5).json()["public_key"]
public_key_bytes = base64.b64decode(public_key_b64)
verifying_key = VerifyingKey.from_string(public_key_bytes, curve=Ed25519)
# Validate the license key.
validation = requests.get(
f"{BASE_URL}/is_valid",
params={"license_key": LICENSE_KEY},
timeout=5,
).json()
if not validation.get("valid"):
raise RuntimeError("License is not valid")
if "signature" not in validation:
raise RuntimeError("Signing is disabled on the server")
license_payload = validation["license"]
message = json.dumps(license_payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
signature = base64.b64decode(validation["signature"])
# Raises ecdsa.BadSignatureError if verification fails.
verifying_key.verify(signature, message)
print("Signature verified")
```

View File

@@ -19,6 +19,8 @@ services:
- postgresql
ports:
- "8000:8000"
volumes:
- ./keys:/src/keys
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
healthcheck:
test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/', timeout=2)\""]

View File

@@ -2,4 +2,5 @@ fastapi~=0.116.1
uvicorn
psycopg~=3.2.9
python-dotenv~=1.1.1
loguru~=0.7.3
loguru~=0.7.3
ecdsa~=0.19.1

View File

@@ -4,15 +4,20 @@ from dotenv import dotenv_values
from loguru import logger
from datetime import datetime, timezone
from typing import Optional
from pathlib import Path
import secrets
import string
import psycopg
import asyncio
import base64
import json
from ecdsa import Ed25519, SigningKey, VerifyingKey
DEBUG = False # Reveal debug tools such as /docs swagger UI
ENV = dotenv_values(".env") # .env file
VERSION = "v1.0.1" # version number
VERSION = "v1.0.2" # version number
ALPHABET = string.ascii_uppercase + string.digits #alphabet and numbers for token generation
try:
LICENSE_KEY_PARTS = 5 if "NUM_KEY_CHUNKS" not in ENV else int(ENV["NUM_KEY_CHUNKS"]) #number of chunks in the new generated license keys
@@ -24,6 +29,21 @@ try:
except:
LICENSE_KEY_PART_LENGTH = 5
try:
sign_keys_raw = ENV.get("SIGN_KEY")
if sign_keys_raw is None:
sign_keys_raw = ENV.get("SIGN_KEYS")
SIGN_KEYS = False if sign_keys_raw is None else sign_keys_raw.strip().lower() in {"true", "1", "yes", "on"}
except Exception:
SIGN_KEYS = False
KEYS_DIR = Path("keys")
PRIVATE_KEY_PATH = KEYS_DIR / "ed25519_private.pem"
PUBLIC_KEY_PATH = KEYS_DIR / "ed25519_public.pem"
SIGNING_PRIVATE_KEY: Optional[SigningKey] = None
SIGNING_PUBLIC_KEY_B64: Optional[str] = None
api_key = ""
security = HTTPBearer(auto_error=False)
@@ -48,6 +68,45 @@ except Exception as error:
logger.error("Failed to connect to PostgreSQL: {}", error)
exit()
def _load_or_create_signing_keys() -> None:
global SIGNING_PRIVATE_KEY, SIGNING_PUBLIC_KEY_B64
KEYS_DIR.mkdir(parents=True, exist_ok=True)
if not PRIVATE_KEY_PATH.exists():
signing_key = SigningKey.generate(curve=Ed25519)
verifying_key = signing_key.verifying_key
PRIVATE_KEY_PATH.write_bytes(signing_key.to_pem(format="pkcs8"))
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
SIGNING_PRIVATE_KEY = signing_key
SIGNING_PUBLIC_KEY_B64 = base64.b64encode(verifying_key.to_string()).decode("ascii")
logger.info("Generated new Ed25519 signing key pair at {}.", KEYS_DIR)
return
try:
SIGNING_PRIVATE_KEY = SigningKey.from_pem(PRIVATE_KEY_PATH.read_bytes())
except Exception as exc:
logger.error("Failed to read signing private key: {}", exc)
raise
verifying_key: Optional[VerifyingKey]
if PUBLIC_KEY_PATH.exists():
try:
verifying_key = VerifyingKey.from_pem(PUBLIC_KEY_PATH.read_bytes())
except Exception as exc:
logger.warning(
"Existing public key was invalid; regenerating from private key: {}",
exc,
)
verifying_key = SIGNING_PRIVATE_KEY.verifying_key
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
else:
verifying_key = SIGNING_PRIVATE_KEY.verifying_key
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
SIGNING_PUBLIC_KEY_B64 = base64.b64encode(verifying_key.to_string()).decode("ascii")
logger.debug("Signing key pair loaded from {}.", KEYS_DIR)
with psycopg.connect(connect_statement) as conn:
with conn.cursor() as cur:
cur.execute(
@@ -72,6 +131,13 @@ with psycopg.connect(connect_statement) as conn:
)
conn.commit()
if SIGN_KEYS:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to initialize signing keys: {}", error)
exit()
if DEBUG:
app = FastAPI()
else:
@@ -176,12 +242,32 @@ async def create_license_key(
"info": info,
}
@app.get("/public-key")
async def get_public_key():
if not SIGN_KEYS:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="License key signing is disabled.",
)
if SIGNING_PUBLIC_KEY_B64 is None:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to load public key: {}", error)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Signing is enabled but keys are unavailable.",
)
return {"public_key": SIGNING_PUBLIC_KEY_B64}
@app.get("/is_valid")
async def is_license_key_valid(license_key: str) -> bool:
async def is_license_key_valid(license_key: str):
"""Validate the supplied license key against the database."""
now = datetime.now(timezone.utc)
def _lookup() -> bool:
def _lookup() -> tuple[bool, Optional[datetime]]:
try:
with psycopg.connect(connect_statement) as conn:
with conn.cursor() as cur:
@@ -192,19 +278,44 @@ async def is_license_key_valid(license_key: str) -> bool:
WHERE key = %s
AND is_active = TRUE
AND (expiration_timestamp IS NULL OR expiration_timestamp > %s)
RETURNING 1
RETURNING expiration_timestamp
""",
(now, license_key, now),
)
if cur.fetchone() is None:
return False
row = cur.fetchone()
if row is None:
return False, None
conn.commit()
return True
return True, row[0]
except Exception:
logger.exception("Failed validating license key.")
return False
return False, None
return await asyncio.to_thread(_lookup)
valid, expiration_ts = await asyncio.to_thread(_lookup)
if not valid:
return {"valid": False}
if not SIGN_KEYS:
return {"valid": True}
if SIGNING_PRIVATE_KEY is None:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to load signing key for validation: {}", error)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Signing is enabled but keys are unavailable.",
)
payload = {
"license_key": license_key,
"expiration_timestamp": expiration_ts.isoformat() if expiration_ts else None,
}
message = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
signature = base64.b64encode(SIGNING_PRIVATE_KEY.sign(message)).decode("ascii")
return {"valid": True, "license": payload, "signature": signature}
@app.post("/license/{license_key}/disable", status_code=status.HTTP_200_OK)
async def disable_license_key(

View File

@@ -72,6 +72,9 @@ class LicenseServerClient:
def is_valid(self, license_key: str) -> requests.Response:
return self.request("get", "/is_valid", params={"license_key": license_key}, auth=False)
def public_key(self) -> requests.Response:
return self.request("get", "/public-key", auth=False)
def disable_license(self, license_key: str, *, auth: bool = True) -> requests.Response:
return self.request("post", f"/license/{license_key}/disable", auth=auth)
@@ -133,6 +136,19 @@ def test_license_endpoint_requires_authentication(client: LicenseServerClient) -
assert history_export.status_code == 401
def _assert_validity_payload(response: requests.Response, expected_valid: bool, *, license_key: str) -> None:
assert response.status_code == 200
payload = response.json()
assert isinstance(payload, dict)
assert payload.get("valid") is expected_valid
if expected_valid and "signature" in payload:
assert "license" in payload
license_payload = payload["license"]
assert license_payload.get("license_key") == license_key
assert "expiration_timestamp" in license_payload
def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
info_text = "QA test license"
create_response = client.create_license(info=info_text)
@@ -145,17 +161,13 @@ def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
assert len(parts) == 5
assert all(len(part) == 5 for part in parts)
validity_response = client.is_valid(license_key)
assert validity_response.status_code == 200
assert validity_response.json() is True
_assert_validity_payload(client.is_valid(license_key), True, license_key=license_key)
disable_response = client.disable_license(license_key)
assert disable_response.status_code == 200
assert disable_response.json()["is_active"] is False
after_disable = client.is_valid(license_key)
assert after_disable.status_code == 200
assert after_disable.json() is False
_assert_validity_payload(client.is_valid(license_key), False, license_key=license_key)
enable_response = client.enable_license(license_key)
assert enable_response.status_code == 200
@@ -186,3 +198,16 @@ def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
history_rows = list(csv.DictReader(history_response.text.splitlines()))
assert history_rows, "Expected history entries for created license key."
assert any(license_key in row["action"] for row in history_rows), "History does not reference the license key."
def test_public_key_endpoint(client: LicenseServerClient) -> None:
response = client.public_key()
if response.status_code == 404:
detail = response.json().get("detail")
assert detail == "License key signing is disabled."
return
assert response.status_code == 200
body = response.text.strip()
assert "public_key" in body