Add signature response for valid license keys

This commit is contained in:
2025-10-21 22:42:56 -04:00
parent 16385a6529
commit e198ea687b
6 changed files with 204 additions and 19 deletions

View File

@@ -4,15 +4,20 @@ from dotenv import dotenv_values
from loguru import logger
from datetime import datetime, timezone
from typing import Optional
from pathlib import Path
import secrets
import string
import psycopg
import asyncio
import base64
import json
from ecdsa import Ed25519, SigningKey, VerifyingKey
DEBUG = False # Reveal debug tools such as /docs swagger UI
ENV = dotenv_values(".env") # .env file
VERSION = "v1.0.1" # version number
VERSION = "v1.0.2" # version number
ALPHABET = string.ascii_uppercase + string.digits #alphabet and numbers for token generation
try:
LICENSE_KEY_PARTS = 5 if "NUM_KEY_CHUNKS" not in ENV else int(ENV["NUM_KEY_CHUNKS"]) #number of chunks in the new generated license keys
@@ -24,6 +29,21 @@ try:
except:
LICENSE_KEY_PART_LENGTH = 5
try:
sign_keys_raw = ENV.get("SIGN_KEY")
if sign_keys_raw is None:
sign_keys_raw = ENV.get("SIGN_KEYS")
SIGN_KEYS = False if sign_keys_raw is None else sign_keys_raw.strip().lower() in {"true", "1", "yes", "on"}
except Exception:
SIGN_KEYS = False
KEYS_DIR = Path("keys")
PRIVATE_KEY_PATH = KEYS_DIR / "ed25519_private.pem"
PUBLIC_KEY_PATH = KEYS_DIR / "ed25519_public.pem"
SIGNING_PRIVATE_KEY: Optional[SigningKey] = None
SIGNING_PUBLIC_KEY_B64: Optional[str] = None
api_key = ""
security = HTTPBearer(auto_error=False)
@@ -48,6 +68,45 @@ except Exception as error:
logger.error("Failed to connect to PostgreSQL: {}", error)
exit()
def _load_or_create_signing_keys() -> None:
global SIGNING_PRIVATE_KEY, SIGNING_PUBLIC_KEY_B64
KEYS_DIR.mkdir(parents=True, exist_ok=True)
if not PRIVATE_KEY_PATH.exists():
signing_key = SigningKey.generate(curve=Ed25519)
verifying_key = signing_key.verifying_key
PRIVATE_KEY_PATH.write_bytes(signing_key.to_pem(format="pkcs8"))
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
SIGNING_PRIVATE_KEY = signing_key
SIGNING_PUBLIC_KEY_B64 = base64.b64encode(verifying_key.to_string()).decode("ascii")
logger.info("Generated new Ed25519 signing key pair at {}.", KEYS_DIR)
return
try:
SIGNING_PRIVATE_KEY = SigningKey.from_pem(PRIVATE_KEY_PATH.read_bytes())
except Exception as exc:
logger.error("Failed to read signing private key: {}", exc)
raise
verifying_key: Optional[VerifyingKey]
if PUBLIC_KEY_PATH.exists():
try:
verifying_key = VerifyingKey.from_pem(PUBLIC_KEY_PATH.read_bytes())
except Exception as exc:
logger.warning(
"Existing public key was invalid; regenerating from private key: {}",
exc,
)
verifying_key = SIGNING_PRIVATE_KEY.verifying_key
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
else:
verifying_key = SIGNING_PRIVATE_KEY.verifying_key
PUBLIC_KEY_PATH.write_bytes(verifying_key.to_pem())
SIGNING_PUBLIC_KEY_B64 = base64.b64encode(verifying_key.to_string()).decode("ascii")
logger.debug("Signing key pair loaded from {}.", KEYS_DIR)
with psycopg.connect(connect_statement) as conn:
with conn.cursor() as cur:
cur.execute(
@@ -72,6 +131,13 @@ with psycopg.connect(connect_statement) as conn:
)
conn.commit()
if SIGN_KEYS:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to initialize signing keys: {}", error)
exit()
if DEBUG:
app = FastAPI()
else:
@@ -176,12 +242,32 @@ async def create_license_key(
"info": info,
}
@app.get("/public-key")
async def get_public_key():
if not SIGN_KEYS:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="License key signing is disabled.",
)
if SIGNING_PUBLIC_KEY_B64 is None:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to load public key: {}", error)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Signing is enabled but keys are unavailable.",
)
return {"public_key": SIGNING_PUBLIC_KEY_B64}
@app.get("/is_valid")
async def is_license_key_valid(license_key: str) -> bool:
async def is_license_key_valid(license_key: str):
"""Validate the supplied license key against the database."""
now = datetime.now(timezone.utc)
def _lookup() -> bool:
def _lookup() -> tuple[bool, Optional[datetime]]:
try:
with psycopg.connect(connect_statement) as conn:
with conn.cursor() as cur:
@@ -192,19 +278,44 @@ async def is_license_key_valid(license_key: str) -> bool:
WHERE key = %s
AND is_active = TRUE
AND (expiration_timestamp IS NULL OR expiration_timestamp > %s)
RETURNING 1
RETURNING expiration_timestamp
""",
(now, license_key, now),
)
if cur.fetchone() is None:
return False
row = cur.fetchone()
if row is None:
return False, None
conn.commit()
return True
return True, row[0]
except Exception:
logger.exception("Failed validating license key.")
return False
return False, None
return await asyncio.to_thread(_lookup)
valid, expiration_ts = await asyncio.to_thread(_lookup)
if not valid:
return {"valid": False}
if not SIGN_KEYS:
return {"valid": True}
if SIGNING_PRIVATE_KEY is None:
try:
_load_or_create_signing_keys()
except Exception as error:
logger.error("Failed to load signing key for validation: {}", error)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Signing is enabled but keys are unavailable.",
)
payload = {
"license_key": license_key,
"expiration_timestamp": expiration_ts.isoformat() if expiration_ts else None,
}
message = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
signature = base64.b64encode(SIGNING_PRIVATE_KEY.sign(message)).decode("ascii")
return {"valid": True, "license": payload, "signature": signature}
@app.post("/license/{license_key}/disable", status_code=status.HTTP_200_OK)
async def disable_license_key(