v2: authenticate via app password + use dns.yeil.app public API

Replaces direct dns-server RPC calls (admin shared key, NetBird-only
reachability) with calls to the public /api/v1 surface. The plugin
now logs in with an email + app password, caches the returned Bearer
for the run, then findZone/addRecord/deleteRecord through HTTPS.
Any yeil user with an owned DNS zone can use it from anywhere with
internet access — no more shared key, no NetBird requirement.

INI shape:
  dns_yeil_email = you@yourdomain.com
  dns_yeil_app_password = abcd-efgh-ijkl-mnop
  # dns_yeil_base_url = https://dns.yeil.app  (optional override)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
eskimo
2026-05-11 14:46:22 -04:00
parent 2ccd6d9f14
commit 456f034efb
3 changed files with 129 additions and 85 deletions

View File

@@ -1,12 +1,14 @@
"""DNS-01 authenticator plugin for Certbot using the yeil dns-server RPC.
"""DNS-01 authenticator plugin for Certbot using the yeil public API.
Talks to the dns-server (ShakeStation fork) over HTTP Basic auth. The RPC
shape is `{ method, params }` POST, returning `{ result } | { result: null,
error: { message } }`. Same protocol the yeil DNS web app uses; see
yeil/dns/src/lib/rpc.ts.
Authenticates against dns.yeil.app/api/v1/auth/login with an
email + app password, caches the Bearer token for the run, then
adds/removes TXT records via the public records API. Any yeil user
with an app password and an owned DNS zone can use it.
The certbot host only needs HTTPS reachability to dns.yeil.app; no
NetBird or shared admin key.
"""
import base64
import json
import logging
from urllib.error import HTTPError, URLError
@@ -18,15 +20,24 @@ from zope.interface import implementer
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = "https://dns.yeil.app"
HTTP_TIMEOUT = 30
@implementer(interfaces.IAuthenticator)
@implementer(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
description = "Obtain certificates via DNS-01 using the yeil dns-server RPC."
description = (
"Obtain certificates via DNS-01 using the yeil public DNS API."
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.credentials = None
# Bearer token cached for the lifetime of this plugin instance.
# The login route mints a 30-day session; we only need it for
# the duration of one certbot run.
self._token = None
# (domain, validation_name, validation) -> (zone_id, record_id)
self._records = {}
@@ -40,80 +51,102 @@ class Authenticator(dns_common.DNSAuthenticator):
def more_info(self):
return (
"Configures Certbot to perform DNS-01 challenges by adding TXT "
"records via the yeil dns-server RPC."
"records via the yeil public DNS API at dns.yeil.app."
)
def _setup_credentials(self):
self.credentials = self._configure_credentials(
"credentials",
"yeil dns-server credentials INI file",
"yeil API credentials INI file",
{
"rpc_url": "Base URL of the dns-server RPC (e.g. http://100.123.x.x:6969)",
"rpc_key": "Shared RPC key (config.rpc.key on the dns-server)",
"email": "yeil account email (e.g. you@yourdomain.com)",
"app_password": "yeil app password (create one in account.yeil.app/security)",
},
)
# ── RPC ────────────────────────────────────────────────────────────
# ── HTTP ───────────────────────────────────────────────────────────
def _rpc(self, method, params=None):
"""POST {method, params} to the dns-server RPC and return result.
def _base_url(self):
url = self.credentials.conf("base_url") or DEFAULT_BASE_URL
return url.rstrip("/")
Raises errors.PluginError on transport or RPC error.
def _request(self, method, path, body=None, auth=True):
"""Send a JSON request and return the parsed JSON response.
Raises PluginError on transport or non-2xx HTTP responses.
Returns None for 204 No Content.
"""
url = self.credentials.conf("rpc_url")
key = self.credentials.conf("rpc_key")
token = base64.b64encode(f"x:{key}".encode("utf-8")).decode("ascii")
body = json.dumps({"method": method, "params": params or {}}).encode("utf-8")
url = f"{self._base_url()}{path}"
data = json.dumps(body).encode("utf-8") if body is not None else None
req = Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Content-Length", str(len(body)))
req.add_header("Authorization", f"Basic {token}")
req = Request(url, data=data, method=method)
req.add_header("Accept", "application/json")
if data is not None:
req.add_header("Content-Type", "application/json")
req.add_header("Content-Length", str(len(data)))
if auth:
if not self._token:
self._login()
req.add_header("Authorization", f"Bearer {self._token}")
try:
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode("utf-8"))
with urlopen(req, timeout=HTTP_TIMEOUT) as resp:
if resp.status == 204:
return None
payload = resp.read().decode("utf-8")
return json.loads(payload) if payload else None
except HTTPError as e:
try:
err_body = json.loads(e.read().decode("utf-8"))
msg = err_body.get("error", {}).get("message") or e.reason
except Exception:
msg = e.reason
raise errors.PluginError(f"yeil dns-server error ({e.code}): {msg}")
self._raise_http_error(e, method, path)
except URLError as e:
raise errors.PluginError(f"yeil dns-server unreachable: {e}")
raise errors.PluginError(
f"yeil dns API unreachable ({method} {path}): {e}"
)
if isinstance(data, dict) and data.get("error"):
err = data["error"]
msg = err.get("message") if isinstance(err, dict) else str(err)
raise errors.PluginError(f"yeil dns-server error: {msg}")
return data.get("result") if isinstance(data, dict) else None
@staticmethod
def _raise_http_error(e, method, path):
try:
body = e.read().decode("utf-8")
parsed = json.loads(body)
msg = parsed.get("message") or parsed.get("error") or e.reason
except Exception:
msg = e.reason
raise errors.PluginError(
f"yeil dns API error ({method} {path}, {e.code}): {msg}"
)
def _login(self):
email = self.credentials.conf("email")
password = self.credentials.conf("app_password")
result = self._request(
"POST",
"/api/v1/auth/login",
body={"email": email, "password": password},
auth=False,
)
if not isinstance(result, dict) or "token" not in result:
raise errors.PluginError(
"yeil dns API login returned no token"
)
self._token = result["token"]
# ── Zone resolution ────────────────────────────────────────────────
def _find_zone_id(self, fqdn):
"""Walk up the labels of fqdn, calling findzone, until one matches.
def _find_zone(self, fqdn):
"""Resolve the longest-suffix zone owned by the caller.
Returns (zone_id, zone_name). Raises PluginError if nothing found.
Returns (zone_id, zone_name). Raises PluginError if none.
"""
labels = fqdn.rstrip(".").split(".")
# Need at least a domain.tld pair to be a candidate zone.
while len(labels) >= 2:
candidate = ".".join(labels)
try:
result = self._rpc("findzone", {"domain": candidate})
except errors.PluginError as e:
# A "zone not found" response surfaces as a PluginError; that's
# the signal to walk up. Anything else (auth, network) we'd
# rather re-raise after exhausting candidates, so swallow here.
logger.debug("findzone(%s) miss: %s", candidate, e)
result = None
if isinstance(result, dict) and result.get("id"):
return result["id"], candidate
labels = labels[1:]
raise errors.PluginError(
f"No registered zone found in dns-server for any suffix of {fqdn}"
from urllib.parse import quote
result = self._request(
"GET",
f"/api/v1/zones?suffix_of={quote(fqdn, safe='')}",
)
if not isinstance(result, dict) or "id" not in result:
raise errors.PluginError(
f"yeil dns API: no owned zone covers {fqdn}"
)
return result["id"], result["zoneName"]
@staticmethod
def _relative_name(validation_name, zone_name):
@@ -128,7 +161,6 @@ class Authenticator(dns_common.DNSAuthenticator):
suffix = "." + z
if v.endswith(suffix):
return v[: -len(suffix)]
# Shouldn't happen if _find_zone_id picked the zone from this fqdn.
raise errors.PluginError(
f"validation_name {v} is not within zone {z}"
)
@@ -136,12 +168,12 @@ class Authenticator(dns_common.DNSAuthenticator):
# ── certbot hooks ──────────────────────────────────────────────────
def _perform(self, domain, validation_name, validation):
zone_id, zone_name = self._find_zone_id(validation_name)
zone_id, zone_name = self._find_zone(validation_name)
rel_name = self._relative_name(validation_name, zone_name)
result = self._rpc(
"addrecord",
{
"zone": zone_id,
result = self._request(
"POST",
f"/api/v1/zones/{zone_id}/records",
body={
"name": rel_name,
"type": "TXT",
"content": validation,
@@ -153,9 +185,12 @@ class Authenticator(dns_common.DNSAuthenticator):
)
if not record_id:
raise errors.PluginError(
"dns-server addrecord did not return a record id"
"yeil dns API: addrecord did not return a record id"
)
self._records[(domain, validation_name, validation)] = (zone_id, record_id)
self._records[(domain, validation_name, validation)] = (
zone_id,
record_id,
)
def _cleanup(self, domain, validation_name, validation):
entry = self._records.pop((domain, validation_name, validation), None)
@@ -166,9 +201,9 @@ class Authenticator(dns_common.DNSAuthenticator):
return
zone_id, record_id = entry
try:
self._rpc(
"deleterecord",
{"zone": zone_id, "record": record_id},
self._request(
"DELETE",
f"/api/v1/zones/{zone_id}/records/{record_id}",
)
except errors.PluginError as e:
# Don't fail the renewal because of a stale TXT we couldn't