343 lines
14 KiB
343 lines
14 KiB
{ self, ... }:
perSystem =
{ pkgs, self', ... }:
checks =
testDomain = "webnstest.example";
lastIPPath = "/var/lib/webnsupdate/last-ip.json";
zoneFile = pkgs.writeText "${testDomain}.zoneinfo" ''
$TTL 600 ; 10 minutes
$ORIGIN ${testDomain}.
@ IN SOA ns1.${testDomain}. admin.${testDomain}. (
1 ; serial
6h ; refresh
1h ; retry
1w ; expire
1d) ; negative caching TTL
IN NS ns1.${testDomain}.
@ IN A
ns1 IN A
nsupdate IN A
@ IN AAAA ::1
ns1 IN AAAA ::1
nsupdate IN AAAA ::1
bindDynamicZone =
{ config, ... }:
bindCfg = config.services.bind;
bindData = bindCfg.directory;
dynamicZonesDir = "${bindData}/zones";
services.bind.zones.${testDomain} = {
master = true;
file = "${dynamicZonesDir}/${testDomain}";
extraConfig = ''
allow-update { key rndc-key; };
systemd.services.bind.preStart = ''
# shellcheck disable=SC2211,SC1127
rm -f ${dynamicZonesDir}/* # reset dynamic zones
# create a dynamic zones dir
mkdir -m 0755 -p ${dynamicZonesDir}
# copy dynamic zone's file to the dynamic zones dir
cp ${zoneFile} ${dynamicZonesDir}/${testDomain}
webnsupdate-ipv4-machine =
{ lib, ... }:
imports = [
config = {
environment.systemPackages = [
services = {
bind.enable = true;
webnsupdate = {
enable = true;
package = self'.packages.webnsupdate;
extraArgs = [ "-vvv" ]; # debug messages
settings = {
address = lib.mkDefault "";
key_file = "/etc/bind/rndc.key";
password_file = pkgs.writeText "webnsupdate.pass" "FQoNmuU1BKfg8qsU96F6bK5ykp2b0SLe3ZpB3nbtfZA"; # test:test
ip_source = lib.mkDefault "ConnectInfo";
records = [
webnsupdate-ipv6-machine = {
imports = [
config.services.webnsupdate.settings.address = "[::1]:5353";
webnsupdate-nginx-machine =
{ lib, config, ... }:
imports = [
config.services = {
# Use default IP Source
webnsupdate.settings.ip_source = "RightmostXForwardedFor";
nginx = {
enable = true;
recommendedProxySettings = true;
virtualHosts.webnsupdate.locations."/".proxyPass =
webnsupdate-ipv4-only-machine = {
imports = [ webnsupdate-nginx-machine ];
config.services.webnsupdate.settings.ip_type = "Ipv4Only";
webnsupdate-ipv6-only-machine = {
imports = [ webnsupdate-nginx-machine ];
config.services.webnsupdate.settings.ip_type = "Ipv6Only";
# "A" for IPv4, "AAAA" for IPv6, "ANY" for any
testTemplate =
ipv4 ? false,
ipv6 ? false,
nginx ? false,
exclusive ? false,
if exclusive && (ipv4 == ipv6) then
builtins.throw "exclusive means one of ipv4 or ipv6 must be set, but not both"
IPV4: bool = ${if ipv4 then "True" else "False"}
IPV6: bool = ${if ipv6 then "True" else "False"}
NGINX: bool = ${if nginx then "True" else "False"}
EXCLUSIVE: bool = ${if exclusive then "True" else "False"}
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
CURL: str = "curl --fail --no-progress-meter --show-error"
STATIC_DOMAINS: list[str] = ["${testDomain}", "ns1.${testDomain}", "nsupdate.${testDomain}"]
DYNAMIC_DOMAINS: list[str] = ["test1.${testDomain}", "test2.${testDomain}", "test3.${testDomain}"]
def dig_cmd(domain: str, record: str, ip: str | None) -> tuple[str, str]:
match_ip = "" if ip is None else f"\\s\\+600\\s\\+IN\\s\\+{record}\\s\\+{ip}$"
return f"dig @localhost {record} {domain} +noall +answer", f"grep '^{domain}.{match_ip}'"
def curl_cmd(domain: str, identity: str, path: str, query: dict[str, str]) -> str:
from urllib.parse import urlencode
q= f"?{urlencode(query)}" if query else ""
return f"{CURL} -u {identity} -X GET 'http://{domain}{"" if NGINX else ":5353"}/{path}{q}'"
def domain_available(domain: str, record: str, ip: str | None=None):
dig, grep = dig_cmd(domain, record, ip)
rc, output = machine.execute(dig)
print(f"{dig}[{rc}]: {output}")
machine.succeed(f"{dig} | {grep}")
def domain_missing(domain: str, record: str, ip: str | None=None):
dig, grep = dig_cmd(domain, record, ip)
rc, output = machine.execute(dig)
print(f"{dig}[{rc}]: {output}")
machine.fail(f"{dig} | {grep}")
def update_records(domain: str="localhost", /, *, path: str="update", **kwargs):
machine.succeed(curl_cmd(domain, "test:test", path, kwargs))
machine.succeed("cat ${lastIPPath}")
def update_records_fail(domain: str="localhost", /, *, identity: str="test:test", path: str="update", **kwargs):
machine.fail(curl_cmd(domain, identity, path, kwargs))
machine.fail("cat ${lastIPPath}")
def invalid_update(domain: str="localhost"):
update_records_fail(domain, identity="bad_user:test")
update_records_fail(domain, identity="test:bad_pass")
# Tests
with subtest("static DNS records are available"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
for domain in STATIC_DOMAINS:
domain_available(domain, "A", "") # IPv4
domain_available(domain, "AAAA", "::1") # IPv6
with subtest("dynamic DNS records are missing"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
for domain in DYNAMIC_DOMAINS:
domain_missing(domain, "A") # IPv4
domain_missing(domain, "AAAA") # IPv6
with subtest("invalid auth fails to update records"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
for domain in DYNAMIC_DOMAINS:
domain_missing(domain, "A") # IPv4
domain_missing(domain, "AAAA") # IPv6
with subtest("exclusive IP version fails to update with invalid version"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
if IPV6:
if IPV4:
with subtest("valid auth updates records"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
if IPV4:
if IPV6:
for domain in DYNAMIC_DOMAINS:
if IPV4:
domain_available(domain, "A", "")
elif IPV6 and EXCLUSIVE:
domain_missing(domain, "A")
if IPV6:
domain_available(domain, "AAAA", "::1")
elif IPV4 and EXCLUSIVE:
domain_missing(domain, "AAAA")
with subtest("valid auth fritzbox compatible updates records"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
if IPV4 and IPV6:
update_records("", domain="test", ipv4="", ipv6="::1234")
elif IPV4:
update_records("", ipv4="", ipv6="")
elif IPV6:
update_records("[::1]", ipv4="", ipv6="::1234")
for domain in DYNAMIC_DOMAINS:
if IPV4:
domain_available(domain, "A", "")
elif IPV6 and EXCLUSIVE:
domain_missing(domain, "A")
if IPV6:
domain_available(domain, "AAAA", "::1234")
elif IPV4 and EXCLUSIVE:
domain_missing(domain, "AAAA")
with subtest("valid auth replaces records"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
if IPV4:
if IPV6:
for domain in DYNAMIC_DOMAINS:
if IPV4:
domain_available(domain, "A", "")
elif IPV6 and EXCLUSIVE:
domain_missing(domain, "A")
if IPV6:
domain_available(domain, "AAAA", "::1")
elif IPV4 and EXCLUSIVE:
domain_missing(domain, "AAAA")
machine.succeed("cat ${lastIPPath}")
machine.succeed("cat ${lastIPPath}")
with subtest("static DNS records are available after reboot"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
for domain in STATIC_DOMAINS:
domain_available(domain, "A", "") # IPv4
domain_available(domain, "AAAA", "::1") # IPv6
with subtest("dynamic DNS records are available after reboot"):
print(f"{IPV4=} {IPV6=} {EXCLUSIVE=}")
for domain in DYNAMIC_DOMAINS:
if IPV4:
domain_available(domain, "A", "")
elif IPV6 and EXCLUSIVE:
domain_missing(domain, "A")
if IPV6:
domain_available(domain, "AAAA", "::1")
elif IPV4 and EXCLUSIVE:
domain_missing(domain, "AAAA")
module-ipv4-test = pkgs.testers.nixosTest {
name = "webnsupdate-ipv4-module";
nodes.machine = webnsupdate-ipv4-machine;
testScript = testTemplate { ipv4 = true; };
module-ipv6-test = pkgs.testers.nixosTest {
name = "webnsupdate-ipv6-module";
nodes.machine = webnsupdate-ipv6-machine;
testScript = testTemplate { ipv6 = true; };
module-nginx-test = pkgs.testers.nixosTest {
name = "webnsupdate-nginx-module";
nodes.machine = webnsupdate-nginx-machine;
testScript = testTemplate {
ipv4 = true;
ipv6 = true;
nginx = true;
module-ipv4-only-test = pkgs.testers.nixosTest {
name = "webnsupdate-ipv4-only-module";
nodes.machine = webnsupdate-ipv4-only-machine;
testScript = testTemplate {
ipv4 = true;
nginx = true;
exclusive = true;
module-ipv6-only-test = pkgs.testers.nixosTest {
name = "webnsupdate-ipv6-only-module";
nodes.machine = webnsupdate-ipv6-only-machine;
testScript = testTemplate {
ipv6 = true;
nginx = true;
exclusive = true;