"""DNS-01 authenticator plugin for Certbot using the yeil public API. Authenticates against dns.yeil.app/api/v1/auth/login with an email + app password, caches the Bearer token for the run, then adds/removes TXT records via the public records API. Any yeil user with an app password and an owned DNS zone can use it. The certbot host only needs HTTPS reachability to dns.yeil.app; no NetBird or shared admin key. """ import json import logging from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from certbot import errors, interfaces from certbot.plugins import dns_common from zope.interface import implementer logger = logging.getLogger(__name__) DEFAULT_BASE_URL = "https://dns.yeil.app" HTTP_TIMEOUT = 30 @implementer(interfaces.IAuthenticator) @implementer(interfaces.IPluginFactory) class Authenticator(dns_common.DNSAuthenticator): description = ( "Obtain certificates via DNS-01 using the yeil public DNS API." ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.credentials = None # Bearer token cached for the lifetime of this plugin instance. # The login route mints a 30-day session; we only need it for # the duration of one certbot run. self._token = None # (domain, validation_name, validation) -> (zone_id, record_id) self._records = {} @classmethod def add_parser_arguments(cls, add): super(Authenticator, cls).add_parser_arguments( add, default_propagation_seconds=20 ) add("credentials", help="Path to your yeil credentials INI file.") def more_info(self): return ( "Configures Certbot to perform DNS-01 challenges by adding TXT " "records via the yeil public DNS API at dns.yeil.app." ) def _setup_credentials(self): self.credentials = self._configure_credentials( "credentials", "yeil API credentials INI file", { "email": "yeil account email (e.g. you@yourdomain.com)", "app_password": "yeil app password (create one in account.yeil.app/security)", }, ) # ── HTTP ─────────────────────────────────────────────────────────── def _base_url(self): url = self.credentials.conf("base_url") or DEFAULT_BASE_URL return url.rstrip("/") def _request(self, method, path, body=None, auth=True): """Send a JSON request and return the parsed JSON response. Raises PluginError on transport or non-2xx HTTP responses. Returns None for 204 No Content. """ url = f"{self._base_url()}{path}" data = json.dumps(body).encode("utf-8") if body is not None else None req = Request(url, data=data, method=method) req.add_header("Accept", "application/json") if data is not None: req.add_header("Content-Type", "application/json") req.add_header("Content-Length", str(len(data))) if auth: if not self._token: self._login() req.add_header("Authorization", f"Bearer {self._token}") try: with urlopen(req, timeout=HTTP_TIMEOUT) as resp: if resp.status == 204: return None payload = resp.read().decode("utf-8") return json.loads(payload) if payload else None except HTTPError as e: self._raise_http_error(e, method, path) except URLError as e: raise errors.PluginError( f"yeil dns API unreachable ({method} {path}): {e}" ) @staticmethod def _raise_http_error(e, method, path): try: body = e.read().decode("utf-8") parsed = json.loads(body) msg = parsed.get("message") or parsed.get("error") or e.reason except Exception: msg = e.reason raise errors.PluginError( f"yeil dns API error ({method} {path}, {e.code}): {msg}" ) def _login(self): email = self.credentials.conf("email") password = self.credentials.conf("app_password") result = self._request( "POST", "/api/v1/auth/login", body={"email": email, "password": password}, auth=False, ) if not isinstance(result, dict) or "token" not in result: raise errors.PluginError( "yeil dns API login returned no token" ) self._token = result["token"] # ── Zone resolution ──────────────────────────────────────────────── def _find_zone(self, fqdn): """Resolve the longest-suffix zone owned by the caller. Returns (zone_id, zone_name). Raises PluginError if none. """ from urllib.parse import quote result = self._request( "GET", f"/api/v1/zones?suffix_of={quote(fqdn, safe='')}", ) if not isinstance(result, dict) or "id" not in result: raise errors.PluginError( f"yeil dns API: no owned zone covers {fqdn}" ) return result["id"], result["zoneName"] @staticmethod def _relative_name(validation_name, zone_name): """`_acme-challenge.smtp.yeil.org` in zone `yeil.org` -> `_acme-challenge.smtp`. If validation_name equals the zone, returns "@" (the apex). """ v = validation_name.rstrip(".") z = zone_name.rstrip(".") if v == z: return "@" suffix = "." + z if v.endswith(suffix): return v[: -len(suffix)] raise errors.PluginError( f"validation_name {v} is not within zone {z}" ) # ── certbot hooks ────────────────────────────────────────────────── def _perform(self, domain, validation_name, validation): zone_id, zone_name = self._find_zone(validation_name) rel_name = self._relative_name(validation_name, zone_name) result = self._request( "POST", f"/api/v1/zones/{zone_id}/records", body={ "name": rel_name, "type": "TXT", "content": validation, "ttl": 60, }, ) record_id = ( result.get("id") if isinstance(result, dict) else None ) if not record_id: raise errors.PluginError( "yeil dns API: addrecord did not return a record id" ) self._records[(domain, validation_name, validation)] = ( zone_id, record_id, ) def _cleanup(self, domain, validation_name, validation): entry = self._records.pop((domain, validation_name, validation), None) if not entry: logger.warning( "No stored record for %s; skipping cleanup", validation_name ) return zone_id, record_id = entry try: self._request( "DELETE", f"/api/v1/zones/{zone_id}/records/{record_id}", ) except errors.PluginError as e: # Don't fail the renewal because of a stale TXT we couldn't # delete; log and move on. Operator can prune by hand. logger.warning( "Failed to clean up TXT record %s (zone=%s record=%s): %s", validation_name, zone_id, record_id, e, )