nginx-configurator/ncc/main.py
2024-11-04 22:25:46 +01:00

272 lines
7.5 KiB
Python

import os
import tempfile
from collections import defaultdict
from pathlib import Path
import click
import sysrsync
import yaml
from . import certs, sysaction, templating
from .error import NccException
CONFIG = {
"main_config": "nginx.conf",
"conf_dir": ".",
"targets": [],
"dehydrated_dir": "dehydrated",
"sites_dir": "sites",
}
class Step:
"""Fancy printing of steps with option to hide the details after it finishes"""
def __init__(self, text, hide=False):
self.text = text
self.lines = 1
self.on_start = True
self.hide = hide
def echo(self, message, *args, **kwargs) -> None:
nl = int(kwargs.get("nl", 1))
self.lines += message.count("\n") + nl
if self.on_start:
click.echo(" => ", nl=False)
self.on_start = nl or (bool(message) and message[-1] == "\n")
click.echo("\n => ".join(message.splitlines()), *args, **kwargs)
def __enter__(self) -> "Step":
click.secho("[ ] " + self.text)
return self
def __exit__(self, exc, _exc2, _exc3) -> None:
click.echo(
f'\r\x1b[{self.lines}F[{click.style("K" if not exc else "E", fg="green" if not exc else "red")}'
)
if self.hide and not exc:
click.echo("\x1b[0J", nl=False)
elif self.lines > 1:
click.echo(f"\x1b[{self.lines-1}E", nl=False)
def load_config():
cwd = Path(os.getcwd())
conf_file = None
root = Path(cwd.root)
while cwd != root:
conf_file = cwd / "ncc.yml"
if conf_file.exists():
os.chdir(cwd)
break
else:
conf_file = None
cwd = cwd.parent
if not conf_file:
click.echo(
"Failed to find ncc configuration (searched all parent folders)", err=True
)
exit(1)
with conf_file.open() as f:
if yml := yaml.safe_load(f):
CONFIG.update(yml)
os.chdir(CONFIG["conf_dir"])
@click.group()
def cli():
"""Update the nginx cluster configuration
MUST BE RAN ON MASTER (will detect automatically)
"""
load_config()
@cli.command()
@click.option(
"--certs-only",
type=bool,
is_flag=True,
help="Only fetches and deploys new certificates",
)
@click.option("--skip-master-check", type=bool, is_flag=True)
def up(certs_only: bool, skip_master_check: bool):
"""Deploy the configuration to the cluster
Does the following:
1. generates ssl config in genssl folder
2. runs dehydrated
3. checks the configuration
4. deploys to the cluster
"""
if not skip_master_check and not sysaction.is_keepalived_master():
click.echo("Refusing to start. Not running on master.", err=True)
exit(1)
with Step("Preparing configuration..."):
dehydrated_dir = Path(CONFIG["dehydrated_dir"])
if not certs_only:
certs.generate_ssl(
Path(CONFIG["main_config"]),
Path(CONFIG["dehydrated_dir"]) / "domains.txt",
)
ec, stdout = sysaction.run_shell(
(str(dehydrated_dir / "dehydrated.sh"), "-c"), window_height=5
)
if ec != 0:
log = Path(tempfile.mktemp())
log.write_text(stdout)
raise NccException(
f"dehydrated returned {ec} (log: {click.format_filename(log)})"
)
if not certs_only:
ec, stdout = sysaction.run_shell(
("nginx", "-t", "-p", ".", "-c", CONFIG["main_config"]), window_height=5
)
if ec != 0:
raise NccException("configuration did not pass nginx test", log=stdout)
with Step("Deploying to cluster...") as step:
for target in CONFIG["targets"]:
step.echo('deploying to "' + target + '"', nl=False)
try:
if certs_only:
sysrsync.run(
source=os.getcwd() + "/" + str(dehydrated_dir / "certs"),
destination=target + "/" + str(dehydrated_dir / "certs"),
options=["-a", "--delete"],
)
else:
sysrsync.run(
source=os.getcwd(),
destination=target,
exclusions=[str(dehydrated_dir / "archive/**")],
options=["-a", "--delete"],
)
except Exception as e:
step.echo(click.style(" E", fg="red"))
raise NccException("failed to rsync configuration") from e
# technically we could be misinterpreting a path here...
if (first_part := target.split("/")[0]) and (
remote := first_part.split(":")[0]
):
ec, stdout = sysaction.run_shell(("ssh", "-T", remote, "nginx -s reload"))
else:
ec, stdout = sysaction.run_shell(("nginx", "-s", "reload"))
if ec != 0:
step.echo(click.style(" E", fg="red"))
raise NccException("failed to reload nginx", log=stdout)
step.echo(click.style(" K", fg="green"))
@cli.command()
def test():
"""Run nginx -t on the configuration"""
ec, stdout = sysaction.run_shell(
("nginx", "-t", "-p", ".", "-c", CONFIG["main_config"])
)
click.echo(stdout, nl=False)
exit(ec)
def shell_complete_service(_ctx, _param, incomplete):
load_config()
return [
site
for _, site in certs.get_sites(Path(CONFIG["main_config"]))
if incomplete in site
]
@cli.command()
@click.argument("site", shell_complete=shell_complete_service)
def edit(site: str):
"""Edit a site"""
file = next(
(f for f, s in certs.get_sites(Path(CONFIG["main_config"])) if site == s), None
)
if not file or not file.exists():
raise NccException("could not find site")
click.edit(filename=str(file))
@cli.command("list")
def list_():
"""List all sites and the files they are located in"""
file_sites = defaultdict(list)
for f, s in certs.get_sites(Path(CONFIG["main_config"])):
file_sites[f].append(s)
for file, sites in file_sites.items():
click.echo(str(file) + ": " + " ".join(sites))
@cli.command()
def new():
"""Create a new site"""
sites_dir = Path(CONFIG["sites_dir"])
if not sites_dir.exists():
raise NccException("sites_dir does not exist")
templates = [f.name for f in Path("templates").glob("*.conf")]
templates.sort()
printed = len(templates)
for i, t in enumerate(templates):
click.echo(f" {i+1}) {t}")
while True:
try:
printed += 1
choice = int(input("Template number: ")) - 1
except ValueError:
continue
if 0 <= choice < len(templates):
break
click.echo(f"\r\x1b[{printed}F\x1b[0J", nl=False)
click.secho(f"=== {templates[choice]}", bold=True)
name, filled = templating.fill_template(
Path(f"templates/{templates[choice]}").read_text()
)
if not name:
name = input("File name (without .conf): ").removesuffix(".conf")
while not name or (file := sites_dir / (name + ".conf")).exists():
if name and click.confirm(
f"File {click.format_filename(file)} already exists. Overwrite?"
):
break
name = input("File name (without .conf): ").removesuffix(".conf")
file.write_text(filled)
click.echo(f"Site written to {click.format_filename(file)}")
if click.confirm("Continue editing?"):
click.edit(filename=str(file))