"""DNS-01 authenticator plugin for Certbot using the yeil dns-server RPC. Talks to the dns-server (ShakeStation fork) over HTTP Basic auth. The RPC shape is `{ method, params }` POST, returning `{ result } | { result: null, error: { message } }`. Same protocol the yeil DNS web app uses; see yeil/dns/src/lib/rpc.ts. """ import base64 import json import logging from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from certbot import errors, interfaces from certbot.plugins import dns_common from zope.interface import implementer logger = logging.getLogger(__name__) @implementer(interfaces.IAuthenticator) @implementer(interfaces.IPluginFactory) class Authenticator(dns_common.DNSAuthenticator): description = "Obtain certificates via DNS-01 using the yeil dns-server RPC." def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.credentials = None # (domain, validation_name, validation) -> (zone_id, record_id) self._records = {} @classmethod def add_parser_arguments(cls, add): super(Authenticator, cls).add_parser_arguments( add, default_propagation_seconds=20 ) add("credentials", help="Path to your yeil credentials INI file.") def more_info(self): return ( "Configures Certbot to perform DNS-01 challenges by adding TXT " "records via the yeil dns-server RPC." ) def _setup_credentials(self): self.credentials = self._configure_credentials( "credentials", "yeil dns-server credentials INI file", { "rpc_url": "Base URL of the dns-server RPC (e.g. http://100.123.x.x:6969)", "rpc_key": "Shared RPC key (config.rpc.key on the dns-server)", }, ) # ── RPC ──────────────────────────────────────────────────────────── def _rpc(self, method, params=None): """POST {method, params} to the dns-server RPC and return result. Raises errors.PluginError on transport or RPC error. """ url = self.credentials.conf("rpc_url") key = self.credentials.conf("rpc_key") token = base64.b64encode(f"x:{key}".encode("utf-8")).decode("ascii") body = json.dumps({"method": method, "params": params or {}}).encode("utf-8") req = Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") req.add_header("Content-Length", str(len(body))) req.add_header("Authorization", f"Basic {token}") try: with urlopen(req, timeout=30) as resp: data = json.loads(resp.read().decode("utf-8")) except HTTPError as e: try: err_body = json.loads(e.read().decode("utf-8")) msg = err_body.get("error", {}).get("message") or e.reason except Exception: msg = e.reason raise errors.PluginError(f"yeil dns-server error ({e.code}): {msg}") except URLError as e: raise errors.PluginError(f"yeil dns-server unreachable: {e}") if isinstance(data, dict) and data.get("error"): err = data["error"] msg = err.get("message") if isinstance(err, dict) else str(err) raise errors.PluginError(f"yeil dns-server error: {msg}") return data.get("result") if isinstance(data, dict) else None # ── Zone resolution ──────────────────────────────────────────────── def _find_zone_id(self, fqdn): """Walk up the labels of fqdn, calling findzone, until one matches. Returns (zone_id, zone_name). Raises PluginError if nothing found. """ labels = fqdn.rstrip(".").split(".") # Need at least a domain.tld pair to be a candidate zone. while len(labels) >= 2: candidate = ".".join(labels) try: result = self._rpc("findzone", {"domain": candidate}) except errors.PluginError as e: # A "zone not found" response surfaces as a PluginError; that's # the signal to walk up. Anything else (auth, network) we'd # rather re-raise after exhausting candidates, so swallow here. logger.debug("findzone(%s) miss: %s", candidate, e) result = None if isinstance(result, dict) and result.get("id"): return result["id"], candidate labels = labels[1:] raise errors.PluginError( f"No registered zone found in dns-server for any suffix of {fqdn}" ) @staticmethod def _relative_name(validation_name, zone_name): """`_acme-challenge.smtp.yeil.org` in zone `yeil.org` -> `_acme-challenge.smtp`. If validation_name equals the zone, returns "@" (the apex). """ v = validation_name.rstrip(".") z = zone_name.rstrip(".") if v == z: return "@" suffix = "." + z if v.endswith(suffix): return v[: -len(suffix)] # Shouldn't happen if _find_zone_id picked the zone from this fqdn. raise errors.PluginError( f"validation_name {v} is not within zone {z}" ) # ── certbot hooks ────────────────────────────────────────────────── def _perform(self, domain, validation_name, validation): zone_id, zone_name = self._find_zone_id(validation_name) rel_name = self._relative_name(validation_name, zone_name) result = self._rpc( "addrecord", { "zone": zone_id, "name": rel_name, "type": "TXT", "content": validation, "ttl": 60, }, ) record_id = ( result.get("id") if isinstance(result, dict) else None ) if not record_id: raise errors.PluginError( "dns-server addrecord did not return a record id" ) self._records[(domain, validation_name, validation)] = (zone_id, record_id) def _cleanup(self, domain, validation_name, validation): entry = self._records.pop((domain, validation_name, validation), None) if not entry: logger.warning( "No stored record for %s; skipping cleanup", validation_name ) return zone_id, record_id = entry try: self._rpc( "deleterecord", {"zone": zone_id, "record": record_id}, ) except errors.PluginError as e: # Don't fail the renewal because of a stale TXT we couldn't # delete; log and move on. Operator can prune by hand. logger.warning( "Failed to clean up TXT record %s (zone=%s record=%s): %s", validation_name, zone_id, record_id, e, )