Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ src/certapi.egg-info
!/.vscode/launch.json
.venv
.env
test_db_temp/
test_db_temp/
*.key
1 change: 1 addition & 0 deletions src/certapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
)
from .issuers import CertIssuer, SelfCertIssuer, AcmeCertIssuer
from .client import CertManagerClient
from .domain_batching import create_safe_domain_batches
68 changes: 57 additions & 11 deletions src/certapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _ensure_port_80_available() -> None:
sys.exit(1)


def obtain_certificate(domains: List[str], api_key: Optional[str] = None):
def obtain_certificate(domains: List[str], api_key: Optional[str] = None, keystore_path: str = "/etc/ssl"):
challenge_solver = None
server = None

Expand All @@ -124,7 +124,6 @@ def obtain_certificate(domains: List[str], api_key: Optional[str] = None):
print("Starting HTTP challenge server on port 80...")
server, _ = _start_http_challenge_server(challenge_solver, port=80)

keystore_path = "/etc/ssl"
key_store = FileSystemKeyStore(keystore_path)
cert_issuer = AcmeCertIssuer.with_keystore(
key_store,
Expand All @@ -140,7 +139,8 @@ def obtain_certificate(domains: List[str], api_key: Optional[str] = None):

cert_chain = certs_from_pem(cert.encode("utf-8"))
leaf_cert = cert_chain[0] if cert_chain else None
expiry = leaf_cert.not_valid_after.isoformat() if leaf_cert else "unknown"
expiry_dt = leaf_cert.not_valid_after_utc if leaf_cert else None
expiry = expiry_dt.isoformat() if expiry_dt else "unknown"

key_path = os.path.join(key_store.keys_dir, f"{key_name}.key")
cert_path = os.path.join(key_store.certs_dir, f"{key_name}.crt")
Expand Down Expand Up @@ -219,14 +219,60 @@ def verify_environment(domains: List[str], api_key: Optional[str] = None) -> Non


def main():
parser = argparse.ArgumentParser(prog="certapi")
subparsers = parser.add_subparsers(dest="command")

verify_parser = subparsers.add_parser("verify", help="Verify certapi installation and environment.")
verify_parser.add_argument("domains", nargs="*", help="Optional domain(s) to verify.")
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter):
pass

parser = argparse.ArgumentParser(
prog="certapi",
formatter_class=HelpFormatter,
description=(
"Issue and verify ACME certificates.\n\n"
"Challenge mode selection:\n"
"- If CLOUDFLARE_API_KEY or CLOUDFLARE_API_TOKEN is set, DNS-01 is used.\n"
"- Otherwise HTTP-01 is used (requires binding to port 80)."
),
epilog=(
"Cloudflare credentials are read from environment variables: "
"CLOUDFLARE_API_KEY or CLOUDFLARE_API_TOKEN.\n\n"
"Examples:\n"
" certapi verify example.com\n"
" certapi obtain --path ./certs example.com www.example.com\n"
" CLOUDFLARE_API_TOKEN=<token> certapi obtain --path . example.com"
),
)
subparsers = parser.add_subparsers(
dest="command",
title="commands",
metavar="{verify,obtain}",
)

obtain_parser = subparsers.add_parser("obtain", help="Obtain certificate for domains.")
obtain_parser.add_argument("domains", nargs="+", help="Domain(s) to obtain certificate for.")
verify_parser = subparsers.add_parser(
"verify",
help="Verify certapi installation and challenge readiness.",
description=(
"Verify environment readiness.\n"
"- With Cloudflare credentials: checks DNS zone ownership for provided domains.\n"
"- Without credentials: checks local HTTP challenge serving/routing for domains."
),
formatter_class=HelpFormatter,
)
verify_parser.add_argument("domains", nargs="*", help="Optional domain(s) to validate.")

obtain_parser = subparsers.add_parser(
"obtain",
help="Issue a certificate for one or more domains.",
description=(
"Obtain a certificate and write key/cert files to the selected path.\n"
"When Cloudflare credentials are set, DNS-01 challenge is used."
),
formatter_class=HelpFormatter,
)
obtain_parser.add_argument("domains", nargs="+", help="Domain(s) to include in the certificate.")
obtain_parser.add_argument(
"--path",
default="/etc/ssl",
help="Directory where key/cert files will be written.",
)
Comment on lines +271 to +275
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The CLI obtain command is missing a flag to enable the new batching functionality. While the API and CertManagerClient were updated to support batch_domains, the CLI still performs a single issuance for all domains because it calls AcmeCertIssuer directly instead of using the AcmeCertManager wrapper. To ensure consistency and make the feature accessible to CLI users, consider adding a --batch argument and updating the implementation to use AcmeCertManager.issue_certificate_in_batches.


args = parser.parse_args()

Expand All @@ -236,7 +282,7 @@ def main():
sys.exit(0)
elif args.command == "obtain":
api_key = _resolve_cloudflare_api_key()
obtain_certificate(args.domains, api_key=api_key)
obtain_certificate(args.domains, api_key=api_key, keystore_path=args.path)
else:
parser.print_help()
sys.exit(1)
2 changes: 2 additions & 0 deletions src/certapi/client/cert_manager_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def issue_certificate(
user_id: Optional[str] = None,
renew_threshold_days: Optional[int] = None,
skip_failing: bool = True,
batch_domains: bool = False,
) -> CertificateResponse:
params = {
"hostname": hosts if isinstance(hosts, str) else hosts,
Expand All @@ -68,6 +69,7 @@ def issue_certificate(
if renew_threshold_days is not None:
params["renew_threshold_days"] = renew_threshold_days
params["skip_failing"] = skip_failing
params["batch_domains"] = batch_domains

data = self._get("/api/obtain", params=params)
res = CertificateResponse.from_json(data)
Expand Down
82 changes: 82 additions & 0 deletions src/certapi/domain_batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
def _normalize_domain(domain: str) -> str:
domain = domain.strip().lower().rstrip(".")
if domain.startswith("*."):
domain = domain[2:]
return domain


def _split_labels(domain: str) -> list[str]:
normalized = _normalize_domain(domain)
return [label for label in normalized.split(".") if label]


def would_trigger(labels: list[str], blocked: set[str]) -> bool:
"""
Boulder-style recursive on-demand trigger check used by tests.
"""
if len(labels) < 4:
return False

blocked = {label.lower() for label in blocked}

consecutive_blocked = 0
prev_label: str | None = None
for label in labels:
label = label.lower()
is_blocked = label in blocked
if is_blocked:
consecutive_blocked += 1
if prev_label is not None and prev_label == label:
return True
if consecutive_blocked >= 3:
return True
else:
consecutive_blocked = 0
prev_label = label
return False


def split_domain_to_safe_groups(domain: str, blocked_labels: list[str] | None = None) -> list[list[str]]:
"""
Split labels into contiguous chunks of at most 3 labels.
This mirrors the "3 labels are always safe for this specific LE heuristic"
invariant, but these label groups are not used directly as issuance domains.
"""
labels = _split_labels(domain)
n = len(labels)
if n == 0:
return []
return [labels[i : i + 3] for i in range(0, n, 3)]


def create_safe_domain_batches(domains: list[str], blocked_labels: list[str] | None = None) -> list[list[str]]:
"""
Create compact issuance batches without any blocked-label list assumptions.

Rules:
- Domains with <= 3 labels are grouped together into one compact batch.
- Longer domains are split into safe label groups and emitted as singleton batches.

This avoids guessed label lists and keeps behavior deterministic.
"""
compact_batch: list[str] = []
singleton_batches: list[list[str]] = []
seen: set[str] = set()

for domain in domains:
normalized = _normalize_domain(domain)
if not normalized or normalized in seen:
continue
seen.add(normalized)

labels = [label for label in normalized.split(".") if label]
if len(labels) <= 3:
compact_batch.append(normalized)
continue

for i in range(0, len(labels), 3):
singleton_batches.append([".".join(labels[i : i + 3])])
Comment on lines +72 to +78
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The logic for handling domains with more than 3 labels is fundamentally incorrect for ACME certificate issuance.

  1. Invalid Hostnames: It splits a single FQDN into label fragments (e.g., a.b.c.d.e.f becomes a.b.c and d.e.f). ACME requires the full FQDN for validation; you cannot obtain a certificate for a fragment of a domain to cover the original FQDN.
  2. Loss of Wildcards: The _normalize_domain function removes *. prefixes. If a user requests a wildcard certificate, this logic will attempt to issue a standard certificate for the base domain instead.
  3. Unused Logic: The function ignores the blocked_labels parameter and the would_trigger logic defined earlier in the file, blindly splitting any domain with more than 3 labels.

If the goal is to isolate potentially 'unsafe' domains to avoid triggering the safeguard for the entire order, you should group the full domain strings into separate batches (singleton batches) rather than splitting the labels of a single domain.

Suggested change
labels = [label for label in normalized.split(".") if label]
if len(labels) <= 3:
compact_batch.append(normalized)
continue
for i in range(0, len(labels), 3):
singleton_batches.append([".".join(labels[i : i + 3])])
labels = _split_labels(domain)
# Preserve the original domain (stripped/lowered) for issuance to keep wildcards
issuance_domain = domain.strip().lower().rstrip(".")
if len(labels) <= 3:
compact_batch.append(issuance_domain)
continue
singleton_batches.append([issuance_domain])


if compact_batch:
return [compact_batch, *singleton_batches]
return singleton_batches
97 changes: 77 additions & 20 deletions src/certapi/manager/acme_cert_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from typing import List, Literal, Optional, Tuple, Union, Dict
from typing import Callable, List, Literal, Optional, Tuple, Union, Dict
from datetime import datetime, timezone, timedelta

from certapi import crypto
Expand All @@ -11,6 +11,7 @@
from ..keystore.KeyStore import KeyStore
from cryptography.x509 import Certificate, CertificateSigningRequest
from ..crypto import Key, certs_to_pem, cert_to_pem, get_csr_hostnames
from ..domain_batching import create_safe_domain_batches

DEFAULT_RENEW_THRESHOLD_DAYS = 62

Expand Down Expand Up @@ -75,8 +76,59 @@ def issue_certificate(
user_id: Optional[str] = None,
renew_threshold_days: Optional[int] = None,
) -> CertificateResponse:
return self._issue_certificate_internal(
hosts=hosts,
key_type=key_type,
expiry_days=expiry_days,
country=country,
state=state,
locality=locality,
organization=organization,
user_id=user_id,
renew_threshold_days=renew_threshold_days,
batch_generator=None,
)

def issue_certificate_in_batches(
self,
hosts: Union[str, List[str]],
key_type: Literal["rsa", "ecdsa", "ed25519"] = "ecdsa",
expiry_days: int = 90,
country: Optional[str] = None,
state: Optional[str] = None,
locality: Optional[str] = None,
organization: Optional[str] = None,
user_id: Optional[str] = None,
renew_threshold_days: Optional[int] = None,
batch_generator: Callable[[List[str]], List[List[str]]] = create_safe_domain_batches,
) -> CertificateResponse:
return self._issue_certificate_internal(
hosts=hosts,
key_type=key_type,
expiry_days=expiry_days,
country=country,
state=state,
locality=locality,
organization=organization,
user_id=user_id,
renew_threshold_days=renew_threshold_days,
batch_generator=batch_generator,
)

if type(hosts) == str:
def _issue_certificate_internal(
self,
hosts: Union[str, List[str]],
key_type: Literal["rsa", "ecdsa", "ed25519"] = "ecdsa",
expiry_days: int = 90,
country: Optional[str] = None,
state: Optional[str] = None,
locality: Optional[str] = None,
organization: Optional[str] = None,
user_id: Optional[str] = None,
renew_threshold_days: Optional[int] = None,
batch_generator: Optional[Callable[[List[str]], List[List[str]]]] = None,
) -> CertificateResponse:
if isinstance(hosts, str):
hosts = [hosts]

existing: Dict[str, Tuple[int | str, Key, List[Certificate] | str]] = {}
Expand Down Expand Up @@ -109,25 +161,30 @@ def issue_certificate(
print(f"Warning: No challenge solver found that supports domain: {host}. Skipping.")

for store, domains_to_issue in domains_by_store.items():

private_key, fullchain_cert = self.cert_issuer.generate_key_and_cert_for_domains(
domains_to_issue,
key_type=key_type,
expiry_days=expiry_days,
country=country,
state=state,
locality=locality,
organization=organization,
user_id=user_id,
challenge_solver=store,
issuance_batches = (
batch_generator(domains_to_issue) if batch_generator is not None else [domains_to_issue]
)

if fullchain_cert:
key_id = self.key_store.save_key(private_key, domains_to_issue[0])
self.key_store.save_cert(key_id, fullchain_cert, domains_to_issue)
issued_certs_list.append(IssuedCert(key=private_key, cert=fullchain_cert, domains=domains_to_issue))
else:
print(f"Failed to issue certificate for domains: {domains_to_issue}")
for batch in issuance_batches:
if not batch:
continue
private_key, fullchain_cert = self.cert_issuer.generate_key_and_cert_for_domains(
batch,
key_type=key_type,
expiry_days=expiry_days,
country=country,
state=state,
locality=locality,
organization=organization,
user_id=user_id,
challenge_solver=store,
)

if fullchain_cert:
key_id = self.key_store.save_key(private_key, batch[0])
self.key_store.save_cert(key_id, fullchain_cert, batch)
issued_certs_list.append(IssuedCert(key=private_key, cert=fullchain_cert, domains=batch))
else:
print(f"Failed to issue certificate for domains: {batch}")

# self.cert_issuer.challenge_solver = original_challenge_solver # Restore original
return createExistingResponse(existing, issued_certs_list)
Expand Down
13 changes: 12 additions & 1 deletion src/certapi/server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ def create_api_resources(api_ns, cert_manager: AcmeCertManager, renew_queue_size
default=False,
help="Allow issuance to proceed if some domains fail verification",
)
obtain_parser.add_argument(
"batch_domains",
type=inputs.boolean,
default=False,
help="Issue certificates in separate safe batches instead of one combined order",
)

@api_ns.route("/obtain")
class ObtainCert(Resource):
Expand Down Expand Up @@ -128,7 +134,12 @@ def get(self):
if not hostnames:
return CertificateResponse().to_json()

data = cert_manager.issue_certificate(
issue_fn = (
cert_manager.issue_certificate_in_batches
if args.get("batch_domains")
else cert_manager.issue_certificate
)
data = issue_fn(
hostnames,
key_type=args["key_type"],
expiry_days=args["expiry_days"],
Expand Down
Loading
Loading