Files
certbot-dns-yeil/certbot_dns_yeil/dns_yeil.py
eskimo 456f034efb v2: authenticate via app password + use dns.yeil.app public API
Replaces direct dns-server RPC calls (admin shared key, NetBird-only
reachability) with calls to the public /api/v1 surface. The plugin
now logs in with an email + app password, caches the returned Bearer
for the run, then findZone/addRecord/deleteRecord through HTTPS.
Any yeil user with an owned DNS zone can use it from anywhere with
internet access — no more shared key, no NetBird requirement.

INI shape:
  dns_yeil_email = you@yourdomain.com
  dns_yeil_app_password = abcd-efgh-ijkl-mnop
  # dns_yeil_base_url = https://dns.yeil.app  (optional override)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 14:46:22 -04:00

215 lines
7.7 KiB
Python

"""DNS-01 authenticator plugin for Certbot using the yeil public API.
Authenticates against dns.yeil.app/api/v1/auth/login with an
email + app password, caches the Bearer token for the run, then
adds/removes TXT records via the public records API. Any yeil user
with an app password and an owned DNS zone can use it.
The certbot host only needs HTTPS reachability to dns.yeil.app; no
NetBird or shared admin key.
"""
import json
import logging
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from certbot import errors, interfaces
from certbot.plugins import dns_common
from zope.interface import implementer
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = "https://dns.yeil.app"
HTTP_TIMEOUT = 30
@implementer(interfaces.IAuthenticator)
@implementer(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
description = (
"Obtain certificates via DNS-01 using the yeil public DNS API."
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.credentials = None
# Bearer token cached for the lifetime of this plugin instance.
# The login route mints a 30-day session; we only need it for
# the duration of one certbot run.
self._token = None
# (domain, validation_name, validation) -> (zone_id, record_id)
self._records = {}
@classmethod
def add_parser_arguments(cls, add):
super(Authenticator, cls).add_parser_arguments(
add, default_propagation_seconds=20
)
add("credentials", help="Path to your yeil credentials INI file.")
def more_info(self):
return (
"Configures Certbot to perform DNS-01 challenges by adding TXT "
"records via the yeil public DNS API at dns.yeil.app."
)
def _setup_credentials(self):
self.credentials = self._configure_credentials(
"credentials",
"yeil API credentials INI file",
{
"email": "yeil account email (e.g. you@yourdomain.com)",
"app_password": "yeil app password (create one in account.yeil.app/security)",
},
)
# ── HTTP ───────────────────────────────────────────────────────────
def _base_url(self):
url = self.credentials.conf("base_url") or DEFAULT_BASE_URL
return url.rstrip("/")
def _request(self, method, path, body=None, auth=True):
"""Send a JSON request and return the parsed JSON response.
Raises PluginError on transport or non-2xx HTTP responses.
Returns None for 204 No Content.
"""
url = f"{self._base_url()}{path}"
data = json.dumps(body).encode("utf-8") if body is not None else None
req = Request(url, data=data, method=method)
req.add_header("Accept", "application/json")
if data is not None:
req.add_header("Content-Type", "application/json")
req.add_header("Content-Length", str(len(data)))
if auth:
if not self._token:
self._login()
req.add_header("Authorization", f"Bearer {self._token}")
try:
with urlopen(req, timeout=HTTP_TIMEOUT) as resp:
if resp.status == 204:
return None
payload = resp.read().decode("utf-8")
return json.loads(payload) if payload else None
except HTTPError as e:
self._raise_http_error(e, method, path)
except URLError as e:
raise errors.PluginError(
f"yeil dns API unreachable ({method} {path}): {e}"
)
@staticmethod
def _raise_http_error(e, method, path):
try:
body = e.read().decode("utf-8")
parsed = json.loads(body)
msg = parsed.get("message") or parsed.get("error") or e.reason
except Exception:
msg = e.reason
raise errors.PluginError(
f"yeil dns API error ({method} {path}, {e.code}): {msg}"
)
def _login(self):
email = self.credentials.conf("email")
password = self.credentials.conf("app_password")
result = self._request(
"POST",
"/api/v1/auth/login",
body={"email": email, "password": password},
auth=False,
)
if not isinstance(result, dict) or "token" not in result:
raise errors.PluginError(
"yeil dns API login returned no token"
)
self._token = result["token"]
# ── Zone resolution ────────────────────────────────────────────────
def _find_zone(self, fqdn):
"""Resolve the longest-suffix zone owned by the caller.
Returns (zone_id, zone_name). Raises PluginError if none.
"""
from urllib.parse import quote
result = self._request(
"GET",
f"/api/v1/zones?suffix_of={quote(fqdn, safe='')}",
)
if not isinstance(result, dict) or "id" not in result:
raise errors.PluginError(
f"yeil dns API: no owned zone covers {fqdn}"
)
return result["id"], result["zoneName"]
@staticmethod
def _relative_name(validation_name, zone_name):
"""`_acme-challenge.smtp.yeil.org` in zone `yeil.org` -> `_acme-challenge.smtp`.
If validation_name equals the zone, returns "@" (the apex).
"""
v = validation_name.rstrip(".")
z = zone_name.rstrip(".")
if v == z:
return "@"
suffix = "." + z
if v.endswith(suffix):
return v[: -len(suffix)]
raise errors.PluginError(
f"validation_name {v} is not within zone {z}"
)
# ── certbot hooks ──────────────────────────────────────────────────
def _perform(self, domain, validation_name, validation):
zone_id, zone_name = self._find_zone(validation_name)
rel_name = self._relative_name(validation_name, zone_name)
result = self._request(
"POST",
f"/api/v1/zones/{zone_id}/records",
body={
"name": rel_name,
"type": "TXT",
"content": validation,
"ttl": 60,
},
)
record_id = (
result.get("id") if isinstance(result, dict) else None
)
if not record_id:
raise errors.PluginError(
"yeil dns API: addrecord did not return a record id"
)
self._records[(domain, validation_name, validation)] = (
zone_id,
record_id,
)
def _cleanup(self, domain, validation_name, validation):
entry = self._records.pop((domain, validation_name, validation), None)
if not entry:
logger.warning(
"No stored record for %s; skipping cleanup", validation_name
)
return
zone_id, record_id = entry
try:
self._request(
"DELETE",
f"/api/v1/zones/{zone_id}/records/{record_id}",
)
except errors.PluginError as e:
# Don't fail the renewal because of a stale TXT we couldn't
# delete; log and move on. Operator can prune by hand.
logger.warning(
"Failed to clean up TXT record %s (zone=%s record=%s): %s",
validation_name, zone_id, record_id, e,
)