added info column
This commit is contained in:
@@ -2,6 +2,8 @@
|
||||
|
||||
A very small, self-hosted license server intended for lower traffic projects.
|
||||
|
||||
This license server has no concept of separate devices, so use only if you don't intend to restrict the amount of devices that a license key can be used on.
|
||||
|
||||
## Environment Setup
|
||||
|
||||
Replace the placeholder secrets in `.env` with secure random values using `openssl rand`:
|
||||
@@ -20,7 +22,7 @@ All authenticated endpoints expect a `Bearer` token via the `Authorization` head
|
||||
| Method | Path | Auth | Description | Key Request Parameters | Response Highlights |
|
||||
| --- | --- | --- | --- | --- | --- |
|
||||
| `GET` | `/` | No | Returns server metadata. | – | `{"version": "Micro License Server vX.Y.Z"}` |
|
||||
| `POST` | `/license` | Yes | Generates a new license key. | Query: `is_active` (bool, default `true`), `expiration_date` (ISO 8601, optional) | `license_key`, `expiration_timestamp`, `is_active` |
|
||||
| `POST` | `/license` | Yes | Generates a new license key. | Query: `is_active` (bool, default `true`), `expiration_date` (ISO 8601, optional), `info` (string, optional metadata) | `license_key`, `expiration_timestamp`, `is_active`, `info` |
|
||||
| `GET` | `/is_valid` | No | Validates a license key and records last usage time. | Query: `license_key` (required) | Boolean validity |
|
||||
| `POST` | `/license/{license_key}/disable` | Yes | Deactivates a license key. | Path: `license_key` | `license_key`, `is_active: false` |
|
||||
| `POST` | `/license/{license_key}/enable` | Yes | Reactivates a license key. | Path: `license_key` | `license_key`, `is_active: true` |
|
||||
|
||||
36
src/main.py
36
src/main.py
@@ -9,8 +9,10 @@ import string
|
||||
import psycopg
|
||||
import asyncio
|
||||
|
||||
DEBUG = False # Reveal debug tools such as /docs swagger UI
|
||||
|
||||
ENV = dotenv_values(".env") # .env file
|
||||
VERSION = "v1.0.0" # version number
|
||||
VERSION = "v1.0.1" # version number
|
||||
ALPHABET = string.ascii_uppercase + string.digits #alphabet and numbers for token generation
|
||||
try:
|
||||
LICENSE_KEY_PARTS = 5 if "NUM_KEY_CHUNKS" not in ENV else int(ENV["NUM_KEY_CHUNKS"]) #number of chunks in the new generated license keys
|
||||
@@ -48,6 +50,7 @@ with psycopg.connect(connect_statement) as conn:
|
||||
issue_timestamp TIMESTAMPTZ NOT NULL,
|
||||
expiration_timestamp TIMESTAMPTZ,
|
||||
last_used_timestamp TIMESTAMPTZ,
|
||||
info TEXT,
|
||||
is_active BOOLEAN NOT NULL DEFAULT TRUE
|
||||
)
|
||||
"""
|
||||
@@ -62,8 +65,10 @@ with psycopg.connect(connect_statement) as conn:
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
#app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
|
||||
if DEBUG:
|
||||
app = FastAPI()
|
||||
else:
|
||||
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
|
||||
logger.info("Started FastAPI.")
|
||||
|
||||
|
||||
@@ -83,6 +88,7 @@ async def create_license_key(
|
||||
is_active: bool = True,
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
expiration_date: Optional[datetime] = None,
|
||||
info: Optional[str] = None,
|
||||
):
|
||||
if (
|
||||
credentials is None
|
||||
@@ -114,15 +120,17 @@ async def create_license_key(
|
||||
issue_timestamp,
|
||||
expiration_timestamp,
|
||||
last_used_timestamp,
|
||||
info,
|
||||
is_active
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)
|
||||
""",
|
||||
(
|
||||
license_key,
|
||||
issued_at,
|
||||
expiration_ts,
|
||||
None,
|
||||
info,
|
||||
is_active,
|
||||
),
|
||||
)
|
||||
@@ -132,7 +140,17 @@ async def create_license_key(
|
||||
VALUES (%s, %s)
|
||||
""",
|
||||
(
|
||||
f"create_license_key key={license_key} active={is_active} expiration={expiration_ts.isoformat() if expiration_ts else 'none'}",
|
||||
(
|
||||
"create_license_key key={license_key} active={is_active} expiration={expiration} info={info}"
|
||||
.format(
|
||||
license_key=license_key,
|
||||
is_active=is_active,
|
||||
expiration=expiration_ts.isoformat()
|
||||
if expiration_ts
|
||||
else "none",
|
||||
info=info if info is not None else "none",
|
||||
)
|
||||
),
|
||||
issued_at,
|
||||
),
|
||||
)
|
||||
@@ -151,6 +169,7 @@ async def create_license_key(
|
||||
"license_key": license_key,
|
||||
"expiration_timestamp": expiration_ts.isoformat() if expiration_ts else None,
|
||||
"is_active": is_active,
|
||||
"info": info,
|
||||
}
|
||||
|
||||
@app.get("/is_valid")
|
||||
@@ -401,7 +420,7 @@ async def export_license_keys(
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT key, issue_timestamp, expiration_timestamp, is_active
|
||||
SELECT key, issue_timestamp, expiration_timestamp, info, is_active
|
||||
FROM license_keys
|
||||
ORDER BY issue_timestamp ASC
|
||||
"""
|
||||
@@ -417,15 +436,16 @@ async def export_license_keys(
|
||||
detail="Failed to export license keys.",
|
||||
) from exc
|
||||
|
||||
lines = ["license_key,issue_timestamp,expiration_timestamp,is_active"]
|
||||
for license_key, issue_ts, expiration_ts, is_active in rows:
|
||||
lines = ["license_key,issue_timestamp,expiration_timestamp,info,is_active"]
|
||||
for license_key, issue_ts, expiration_ts, info_value, is_active in rows:
|
||||
lines.append(
|
||||
"{key},{issue},{expiration},{active}".format(
|
||||
"{key},{issue},{expiration},{info},{active}".format(
|
||||
key=license_key,
|
||||
issue=issue_ts.isoformat(),
|
||||
expiration=expiration_ts.isoformat()
|
||||
if expiration_ts
|
||||
else "",
|
||||
info=(info_value or "").replace("\n", " ").replace("\r", " ").replace(",", " "),
|
||||
active="true" if is_active else "false",
|
||||
)
|
||||
)
|
||||
|
||||
@@ -57,6 +57,7 @@ class LicenseServerClient:
|
||||
*,
|
||||
is_active: bool = True,
|
||||
expiration_iso: Optional[str] = None,
|
||||
info: Optional[str] = None,
|
||||
auth: bool = True,
|
||||
) -> requests.Response:
|
||||
params = {}
|
||||
@@ -64,6 +65,8 @@ class LicenseServerClient:
|
||||
params["is_active"] = "false"
|
||||
if expiration_iso is not None:
|
||||
params["expiration_date"] = expiration_iso
|
||||
if info is not None:
|
||||
params["info"] = info
|
||||
return self.request("post", "/license", params=params or None, auth=auth)
|
||||
|
||||
def is_valid(self, license_key: str) -> requests.Response:
|
||||
@@ -131,11 +134,13 @@ def test_license_endpoint_requires_authentication(client: LicenseServerClient) -
|
||||
|
||||
|
||||
def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
|
||||
create_response = client.create_license()
|
||||
info_text = "QA test license"
|
||||
create_response = client.create_license(info=info_text)
|
||||
assert create_response.status_code == 201
|
||||
created_payload = create_response.json()
|
||||
license_key = created_payload["license_key"]
|
||||
assert created_payload["is_active"] is True
|
||||
assert created_payload["info"] == info_text
|
||||
parts = license_key.split("-")
|
||||
assert len(parts) == 5
|
||||
assert all(len(part) == 5 for part in parts)
|
||||
@@ -172,7 +177,9 @@ def test_license_lifecycle_and_exports(client: LicenseServerClient) -> None:
|
||||
export_rows = list(csv.DictReader(export_response.text.splitlines()))
|
||||
matching_rows = [row for row in export_rows if row["license_key"] == license_key]
|
||||
assert matching_rows, "Created license key not found in export."
|
||||
assert matching_rows[0]["is_active"] == "true"
|
||||
exported_row = matching_rows[0]
|
||||
assert exported_row["is_active"] == "true"
|
||||
assert exported_row["info"] == info_text
|
||||
|
||||
history_response = client.export_history(token=license_key)
|
||||
assert history_response.status_code == 200
|
||||
|
||||
Reference in New Issue
Block a user