backupbot formatting
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Moritz 2025-05-14 12:26:20 +02:00
parent 4cda3c1018
commit 54e32ab422
Signed by: moritz
GPG Key ID: 1020A035E5DD0824
2 changed files with 230 additions and 170 deletions

2
.gitignore vendored
View File

@ -1 +1 @@
/testing
.venv

View File

@ -17,47 +17,53 @@ from pathlib import Path
from shutil import copyfile, rmtree
VOLUME_PATH = "/var/lib/docker/volumes/"
SECRET_PATH = '/secrets/'
SERVICE = 'ALL'
SECRET_PATH = "/secrets/"
SERVICE = "ALL"
logger = logging.getLogger("backupbot")
logging.addLevelName(55, 'SUMMARY')
setattr(logging, 'SUMMARY', 55)
setattr(logger, 'summary', lambda message, *args, **
kwargs: logger.log(55, message, *args, **kwargs))
logging.addLevelName(55, "SUMMARY")
setattr(logging, "SUMMARY", 55)
setattr(
logger,
"summary",
lambda message, *args, **kwargs: logger.log(55, message, *args, **kwargs),
)
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.critical("Uncaught exception", exc_info=(
exc_type, exc_value, exc_traceback))
logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
@click.group()
@click.option('-l', '--log', 'loglevel')
@click.option('-m', '--machine-logs', 'machine_logs', is_flag=True, envvar='MACHINE_LOGS')
@click.option('service', '--host', '-h', envvar='SERVICE')
@click.option('repository', '--repo', '-r', envvar='RESTIC_REPOSITORY')
@click.option("-l", "--log", "loglevel")
@click.option(
"-m", "--machine-logs", "machine_logs", is_flag=True, envvar="MACHINE_LOGS"
)
@click.option("service", "--host", "-h", envvar="SERVICE")
@click.option("repository", "--repo", "-r", envvar="RESTIC_REPOSITORY")
def cli(loglevel, service, repository, machine_logs):
global SERVICE
if service:
SERVICE = service.replace('.', '_')
SERVICE = service.replace(".", "_")
if repository:
os.environ['RESTIC_REPOSITORY'] = repository
os.environ["RESTIC_REPOSITORY"] = repository
if loglevel:
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % loglevel)
raise ValueError("Invalid log level: %s" % loglevel)
logger.setLevel(numeric_level)
logHandler = logging.StreamHandler()
if machine_logs:
formatter = jsonlogger.JsonFormatter(
"%(levelname)s %(filename)s %(lineno)s %(process)d %(message)s", rename_fields={"levelname": "message_type"})
"%(levelname)s %(filename)s %(lineno)s %(process)d %(message)s",
rename_fields={"levelname": "message_type"},
)
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
@ -66,18 +72,18 @@ def cli(loglevel, service, repository, machine_logs):
def init_repo():
if repo:= os.environ.get('RESTIC_REPOSITORY_FILE'):
if repo := os.environ.get("RESTIC_REPOSITORY_FILE"):
# RESTIC_REPOSITORY_FILE and RESTIC_REPOSITORY are mutually exclusive
del os.environ['RESTIC_REPOSITORY']
del os.environ["RESTIC_REPOSITORY"]
else:
repo = os.environ['RESTIC_REPOSITORY']
repo = os.environ["RESTIC_REPOSITORY"]
restic.repository = repo
logger.debug(f"set restic repository location: {repo}")
restic.password_file = '/var/run/secrets/restic_password'
restic.password_file = "/var/run/secrets/restic_password"
try:
restic.cat.config()
except ResticFailedError as error:
if 'unable to open config file' in str(error):
if "unable to open config file" in str(error):
result = restic.init()
logger.info(f"Initialized restic repo: {result}")
else:
@ -86,19 +92,21 @@ def init_repo():
def export_secrets():
for env in os.environ:
if env.endswith('FILE') and not "COMPOSE_FILE" in env:
if env.endswith("FILE") and not "COMPOSE_FILE" in env:
logger.debug(f"exported secret: {env}")
with open(os.environ[env]) as file:
secret = file.read()
os.environ[env.removesuffix('_FILE')] = secret
os.environ[env.removesuffix("_FILE")] = secret
# logger.debug(f"Read secret value: {secret}")
@cli.command()
@click.option('retries', '--retries', '-r', envvar='RETRIES', default=1)
@click.option("retries", "--retries", "-r", envvar="RETRIES", default=1)
def create(retries):
app_settings = parse_backup_labels()
pre_commands, post_commands, backup_paths, apps_versions = get_backup_details(app_settings)
pre_commands, post_commands, backup_paths, apps_versions = get_backup_details(
app_settings
)
copy_secrets(apps_versions)
backup_paths.append(Path(SECRET_PATH))
run_commands(pre_commands)
@ -107,29 +115,37 @@ def create(retries):
@cli.command()
@click.option('snapshot_id', '--snapshot', '-s', envvar='SNAPSHOT', default='latest')
@click.option('target', '--target', '-t', envvar='TARGET', default='/')
@click.option('noninteractive', '--noninteractive', envvar='NONINTERACTIVE', is_flag=True)
@click.option('volumes', '--volumes', '-v', envvar='VOLUMES', multiple=True)
@click.option('container', '--container', '-c', envvar='CONTAINER', multiple=True)
@click.option('no_commands', '--no-commands', envvar='NO_COMMANDS', is_flag=True)
@click.option("snapshot_id", "--snapshot", "-s", envvar="SNAPSHOT", default="latest")
@click.option("target", "--target", "-t", envvar="TARGET", default="/")
@click.option(
"noninteractive", "--noninteractive", envvar="NONINTERACTIVE", is_flag=True
)
@click.option("volumes", "--volumes", "-v", envvar="VOLUMES", multiple=True)
@click.option("container", "--container", "-c", envvar="CONTAINER", multiple=True)
@click.option("no_commands", "--no-commands", envvar="NO_COMMANDS", is_flag=True)
def restore(snapshot_id, target, noninteractive, volumes, container, no_commands):
app_settings = parse_backup_labels('restore', container)
if SERVICE != 'ALL':
app_settings = parse_backup_labels("restore", container)
if SERVICE != "ALL":
if not app_settings.get(SERVICE):
logger.error(f"The app {SERVICE} is not running, use the restore-path argument to restore paths of undeployed apps")
logger.error(
f"The app {SERVICE} is not running, use the restore-path argument to restore paths of undeployed apps"
)
exit(1)
app_settings = {SERVICE: app_settings.get(SERVICE)}
pre_commands, post_commands, backup_paths, apps_versions = get_backup_details(app_settings, volumes)
pre_commands, post_commands, backup_paths, apps_versions = get_backup_details(
app_settings, volumes
)
snapshots = get_snapshots(snapshot_id)
if not snapshots:
logger.error(f"No Snapshots with ID {snapshot_id} for {apps_versions.keys()} found.")
logger.error(
f"No Snapshots with ID {snapshot_id} for {apps_versions.keys()} found."
)
exit(1)
snapshot = snapshots[0]
snapshot_id = snapshot['short_id']
snapshot_id = snapshot["short_id"]
if not noninteractive:
print(f"Snapshot to restore: \t{snapshot_id}")
restore_app_versions = app_versions_from_tags(snapshot.get('tags'))
restore_app_versions = app_versions_from_tags(snapshot.get("tags"))
print("Apps:")
for app, version in apps_versions.items():
restore_version = restore_app_versions.get(app)
@ -138,17 +154,19 @@ def restore(snapshot_id, target, noninteractive, volumes, container, no_commands
print(f"WARNING!!! The running app is deployed with version {version}")
print("The following volume paths will be restored:")
for p in backup_paths:
print(f'\t{p}')
print(f"\t{p}")
if not no_commands:
print("The following commands will be executed:")
for container, cmd in list(pre_commands.items()) + list(post_commands.items()):
for container, cmd in list(pre_commands.items()) + list(
post_commands.items()
):
print(f"\t{container.labels['com.docker.swarm.service.name']}:\t{cmd}")
snapshot_date = datetime.fromisoformat(snapshot['time'])
snapshot_date = datetime.fromisoformat(snapshot["time"])
delta = datetime.now(tz=timezone.utc) - snapshot_date
print(f"This snapshot is {delta} old")
print("\nTHIS COMMAND WILL IRREVERSIBLY OVERWRITES FILES")
prompt = input("Type YES (uppercase) to continue: ")
if prompt != 'YES':
if prompt != "YES":
logger.error("Restore aborted")
exit(1)
print(f"Restoring Snapshot {snapshot_id} at {target}")
@ -156,20 +174,24 @@ def restore(snapshot_id, target, noninteractive, volumes, container, no_commands
print(f"Run pre commands.")
run_commands(pre_commands)
if backup_paths:
result = restic_restore(snapshot_id=snapshot_id, include=backup_paths, target_dir=target)
result = restic_restore(
snapshot_id=snapshot_id, include=backup_paths, target_dir=target
)
logger.debug(result)
else:
print('No paths to restore.')
print("No paths to restore.")
if not no_commands and post_commands:
print(f"Run post commands.")
run_commands(post_commands)
@cli.command()
@click.option('snapshot_id', '--snapshot', '-s', envvar='SNAPSHOT', default='latest')
@click.option('target', '--target', '-t', envvar='TARGET', default='/')
@click.option('noninteractive', '--noninteractive', envvar='NONINTERACTIVE', is_flag=True)
@click.argument('paths', nargs=-1, required=True, envvar='INCLUDE_PATH')
@click.option("snapshot_id", "--snapshot", "-s", envvar="SNAPSHOT", default="latest")
@click.option("target", "--target", "-t", envvar="TARGET", default="/")
@click.option(
"noninteractive", "--noninteractive", envvar="NONINTERACTIVE", is_flag=True
)
@click.argument("paths", nargs=-1, required=True, envvar="INCLUDE_PATH")
def restore_path(snapshot_id, target, noninteractive, paths):
"""PATHS: list of paths to restore"""
snapshots = get_snapshots(snapshot_id)
@ -177,48 +199,53 @@ def restore_path(snapshot_id, target, noninteractive, paths):
logger.error(f"No Snapshots with ID {snapshot_id} for app {SERVICE} found.")
exit(1)
snapshot = snapshots[0]
snapshot_id = snapshot['short_id']
snapshot_id = snapshot["short_id"]
if not noninteractive:
print(f"Snapshot to restore: \t{snapshot_id}")
restore_app_versions = app_versions_from_tags(snapshot.get('tags'))
restore_app_versions = app_versions_from_tags(snapshot.get("tags"))
print("Apps:")
for app, version in restore_app_versions.items():
if SERVICE == 'ALL' or SERVICE == app:
if SERVICE == "ALL" or SERVICE == app:
print(f"\t{app} \t {version}")
print("The following paths will be restored:")
for p in paths:
print(f'\t{p}')
snapshot_date = datetime.fromisoformat(snapshot['time'])
print(f"\t{p}")
snapshot_date = datetime.fromisoformat(snapshot["time"])
delta = datetime.now(tz=timezone.utc) - snapshot_date
print(f"This snapshot is {delta} old")
print("\nTHIS COMMAND WILL IRREVERSIBLY OVERWRITES FILES")
prompt = input("Type YES (uppercase) to continue: ")
if prompt != 'YES':
if prompt != "YES":
logger.error("Restore aborted")
exit(1)
print(f"Restoring Snapshot {snapshot_id} at {target}")
result = restic_restore(snapshot_id=snapshot_id, include=paths, target_dir=target)
logger.debug(result)
def restic_restore(snapshot_id, include=[], target_dir=None):
cmd = restic.cat.base_command() + ['restore', snapshot_id]
cmd = restic.cat.base_command() + ["restore", snapshot_id]
for path in include:
cmd.extend(['--include', path])
cmd.extend(["--include", path])
if target_dir:
cmd.extend(['--target', target_dir])
cmd.extend(["--target", target_dir])
return restic.internal.command_executor.execute(cmd)
def get_snapshots(snapshot_id=None):
if snapshot_id and snapshot_id != 'latest':
if snapshot_id and snapshot_id != "latest":
snapshots = restic.snapshots(snapshot_id=snapshot_id)
if not SERVICE in app_versions_from_tags(snapshots[0].get('tags')):
logger.error(f'Snapshot with ID {snapshot_id} does not contain {SERVICE}')
if not SERVICE in app_versions_from_tags(snapshots[0].get("tags")):
logger.error(f"Snapshot with ID {snapshot_id} does not contain {SERVICE}")
exit(1)
else:
snapshots = restic.snapshots()
snapshots = list(filter(lambda x: SERVICE in app_versions_from_tags(x.get('tags')), snapshots))
if snapshot_id == 'latest':
snapshots = list(
filter(
lambda x: SERVICE in app_versions_from_tags(x.get("tags")), snapshots
)
)
if snapshot_id == "latest":
return snapshots[-1:]
else:
return snapshots
@ -226,54 +253,63 @@ def get_snapshots(snapshot_id=None):
def app_versions_from_tags(tags):
if tags:
app_versions = map(lambda x: x.split(':'), tags)
app_versions = map(lambda x: x.split(":"), tags)
return {i[0]: i[1] if len(i) > 1 else None for i in app_versions}
else:
return {}
def str2bool(value: str) -> bool:
return value.lower() in ("yes", "true", "t", "1")
def parse_backup_labels(hook_type='backup', selected_container=[]):
def parse_backup_labels(hook_type="backup", selected_container=[]):
client = docker.from_env()
container_by_service = {
c.labels.get('com.docker.swarm.service.name'): c for c in client.containers.list()}
c.labels.get("com.docker.swarm.service.name"): c
for c in client.containers.list()
}
services = client.services.list()
app_settings = {}
for s in services:
specs = s.attrs['Spec']
labels = specs['Labels']
stack_name = labels['com.docker.stack.namespace']
specs = s.attrs["Spec"]
labels = specs["Labels"]
stack_name = labels["com.docker.stack.namespace"]
container_name = s.name.removeprefix(f"{stack_name}_")
version = labels.get(f'coop-cloud.{stack_name}.version')
version = labels.get(f"coop-cloud.{stack_name}.version")
settings = app_settings[stack_name] = app_settings.get(stack_name) or {}
if (backup := labels.get('backupbot.backup')) and str2bool(backup):
settings['enabled'] = True
if (backup := labels.get("backupbot.backup")) and str2bool(backup):
settings["enabled"] = True
if version:
settings['version'] = version
settings["version"] = version
if selected_container and container_name not in selected_container:
logger.debug(f"Skipping {s.name} because it's not a selected container")
continue
if mounts:= specs['TaskTemplate']['ContainerSpec'].get('Mounts'):
if mounts := specs["TaskTemplate"]["ContainerSpec"].get("Mounts"):
volumes = parse_volumes(stack_name, mounts)
volumes.update(settings.get('volumes') or {})
settings['volumes'] = volumes
volumes.update(settings.get("volumes") or {})
settings["volumes"] = volumes
excluded_volumes, included_volume_paths = parse_excludes_includes(labels)
settings['excluded_volumes'] = excluded_volumes.union(settings.get('excluded_volumes') or set())
settings['included_volume_paths'] = included_volume_paths.union(settings.get('included_volume_paths') or set())
settings["excluded_volumes"] = excluded_volumes.union(
settings.get("excluded_volumes") or set()
)
settings["included_volume_paths"] = included_volume_paths.union(
settings.get("included_volume_paths") or set()
)
if container := container_by_service.get(s.name):
if command := labels.get(f'backupbot.{hook_type}.pre-hook'):
if not (pre_hooks:= settings.get('pre_hooks')):
pre_hooks = settings['pre_hooks'] = {}
if command := labels.get(f"backupbot.{hook_type}.pre-hook"):
if not (pre_hooks := settings.get("pre_hooks")):
pre_hooks = settings["pre_hooks"] = {}
pre_hooks[container] = command
if command := labels.get(f'backupbot.{hook_type}.post-hook'):
if not (post_hooks:= settings.get('post_hooks')):
post_hooks = settings['post_hooks'] = {}
if command := labels.get(f"backupbot.{hook_type}.post-hook"):
if not (post_hooks := settings.get("post_hooks")):
post_hooks = settings["post_hooks"] = {}
post_hooks[container] = command
else:
logger.debug(f"Container {s.name} is not running.")
if labels.get(f'backupbot.{hook_type}.pre-hook') or labels.get(f'backupbot.{hook_type}.post-hook'):
if labels.get(f"backupbot.{hook_type}.pre-hook") or labels.get(
f"backupbot.{hook_type}.post-hook"
):
logger.error(f"Container {s.name} contain hooks but it's not running")
return app_settings
@ -284,44 +320,50 @@ def get_backup_details(app_settings, volumes=[]):
pre_hooks = {}
post_hooks = {}
for app, settings in app_settings.items():
if settings.get('enabled'):
if SERVICE != 'ALL' and SERVICE != app:
if settings.get("enabled"):
if SERVICE != "ALL" and SERVICE != app:
continue
backup_apps_versions[app] = settings.get('version')
backup_apps_versions[app] = settings.get("version")
add_backup_paths(backup_paths, settings, app, volumes)
if hooks:= settings.get('pre_hooks'):
if hooks := settings.get("pre_hooks"):
pre_hooks.update(hooks)
if hooks:= settings.get('post_hooks'):
if hooks := settings.get("post_hooks"):
post_hooks.update(hooks)
return pre_hooks, post_hooks, list(backup_paths), backup_apps_versions
def add_backup_paths(backup_paths, settings, app, selected_volumes):
if (volumes := settings.get('volumes')):
if includes:= settings.get('included_volume_paths'):
if volumes := settings.get("volumes"):
if includes := settings.get("included_volume_paths"):
included_volumes = list(zip(*includes))[0]
for volume, rel_paths in includes:
if not (volume_path := volumes.get(volume)):
logger.error(f'Can not find volume with the name {volume} for {app}')
logger.error(
f"Can not find volume with the name {volume} for {app}"
)
continue
if selected_volumes and volume not in selected_volumes:
logger.debug(f'Skipping {volume}:{rel_paths} because the volume is not selected')
logger.debug(
f"Skipping {volume}:{rel_paths} because the volume is not selected"
)
continue
for p in rel_paths:
absolute_path = Path(f"{volume_path}/{p}")
backup_paths.add(absolute_path)
else:
included_volumes = []
excluded_volumes = settings.get('excluded_volumes') or []
excluded_volumes = settings.get("excluded_volumes") or []
for name, path in volumes.items():
if selected_volumes and name not in selected_volumes:
logger.debug(f'Skipping volume: {name} because the volume is not selected')
logger.debug(
f"Skipping volume: {name} because the volume is not selected"
)
continue
if name in excluded_volumes:
logger.debug(f'Skipping volume: {name} because the volume is excluded')
logger.debug(f"Skipping volume: {name} because the volume is excluded")
continue
if name in included_volumes:
logger.debug(f'Skipping volume: {name} because a path is selected')
logger.debug(f"Skipping volume: {name} because a path is selected")
continue
backup_paths.add(path)
else:
@ -331,10 +373,10 @@ def add_backup_paths(backup_paths, settings, app, selected_volumes):
def parse_volumes(stack_name, mounts):
volumes = {}
for m in mounts:
if m['Type'] != 'volume':
if m["Type"] != "volume":
continue
relative_path = m['Source']
name = relative_path.removeprefix(stack_name + '_')
relative_path = m["Source"]
name = relative_path.removeprefix(stack_name + "_")
absolute_path = Path(f"{VOLUME_PATH}{relative_path}/_data/")
volumes[name] = absolute_path
return volumes
@ -344,10 +386,12 @@ def parse_excludes_includes(labels):
excluded_volumes = set()
included_volume_paths = set()
for label, value in labels.items():
if label.startswith('backupbot.backup.volumes.'):
volume_name = label.removeprefix('backupbot.backup.volumes.').removesuffix('.path')
if label.endswith('path'):
relative_paths = tuple(value.split(','))
if label.startswith("backupbot.backup.volumes."):
volume_name = label.removeprefix("backupbot.backup.volumes.").removesuffix(
".path"
)
if label.endswith("path"):
relative_paths = tuple(value.split(","))
included_volume_paths.add((volume_name, relative_paths))
elif not str2bool(value):
excluded_volumes.add(volume_name)
@ -360,24 +404,29 @@ def copy_secrets(apps):
os.mkdir(SECRET_PATH)
client = docker.from_env()
container_by_service = {
c.labels.get('com.docker.swarm.service.name'): c for c in client.containers.list()}
c.labels.get("com.docker.swarm.service.name"): c
for c in client.containers.list()
}
services = client.services.list()
for s in services:
app_name = s.attrs['Spec']['Labels']['com.docker.stack.namespace']
if (app_name in apps and
(app_secs := s.attrs['Spec']['TaskTemplate']['ContainerSpec'].get('Secrets'))):
app_name = s.attrs["Spec"]["Labels"]["com.docker.stack.namespace"]
if app_name in apps and (
app_secs := s.attrs["Spec"]["TaskTemplate"]["ContainerSpec"].get("Secrets")
):
if not container_by_service.get(s.name):
logger.warning(
f"Container {s.name} is not running, secrets can not be copied.")
f"Container {s.name} is not running, secrets can not be copied."
)
continue
container_id = container_by_service[s.name].id
for sec in app_secs:
src = f'/var/lib/docker/containers/{container_id}/mounts/secrets/{sec["SecretID"]}'
src = f"/var/lib/docker/containers/{container_id}/mounts/secrets/{sec['SecretID']}"
if not Path(src).exists():
logger.error(
f"For the secret {sec['SecretName']} the file {src} does not exist for {s.name}")
f"For the secret {sec['SecretName']} the file {src} does not exist for {s.name}"
)
continue
dst = SECRET_PATH + sec['SecretName']
dst = SECRET_PATH + sec["SecretName"]
logger.debug(f"Copy Secret {sec['SecretName']}")
copyfile(src, dst)
@ -387,9 +436,15 @@ def run_commands(commands):
if not command:
continue
# Remove bash/sh wrapping
command = command.removeprefix('bash -c').removeprefix('sh -c').removeprefix(' ')
command = (
command.removeprefix("bash -c").removeprefix("sh -c").removeprefix(" ")
)
# Remove quotes surrounding the command
if (len(command) >= 2 and command[0] == command[-1] and (command[0] == "'" or command[0] == '"')):
if (
len(command) >= 2
and command[0] == command[-1]
and (command[0] == "'" or command[0] == '"')
):
command = command[1:-1]
# Use bash's pipefail to return exit codes inside a pipe to prevent silent failure
command = f"bash -c 'set -o pipefail;{command}'"
@ -398,7 +453,8 @@ def run_commands(commands):
result = container.exec_run(command)
if result.exit_code:
logger.error(
f"Failed to run command {command} in {container.name}: {result.output.decode()}")
f"Failed to run command {command} in {container.name}: {result.output.decode()}"
)
else:
logger.debug(result.output.decode())
@ -410,15 +466,17 @@ def backup_volumes(backup_paths, apps_versions, retries, dry_run=False):
logger.info("\n".join(map(str, backup_paths)))
backup_paths = list(filter(path_exists, backup_paths))
cmd = restic.cat.base_command()
parent = get_snapshots('latest')
parent = get_snapshots("latest")
if parent:
# https://restic.readthedocs.io/en/stable/040_backup.html#file-change-detection
cmd.extend(['--parent', parent[0]['short_id']])
cmd.extend(["--parent", parent[0]["short_id"]])
tags = [f"{app}:{version}" for app, version in apps_versions.items()]
if SERVICE == 'ALL':
if SERVICE == "ALL":
tags.append(SERVICE)
logger.info("Start volume backup")
result = restic.internal.backup.run(cmd, backup_paths, dry_run=dry_run, tags=tags)
result = restic.internal.backup.run(
cmd, backup_paths, dry_run=dry_run, tags=tags
)
logger.summary("backup finished", extra=result)
return
except ResticFailedError as error:
@ -432,7 +490,7 @@ def backup_volumes(backup_paths, apps_versions, retries, dry_run=False):
def path_exists(path):
if not path.exists():
logger.error(f'{path} does not exist')
logger.error(f"{path} does not exist")
return path.exists()
@ -440,37 +498,39 @@ def path_exists(path):
def snapshots():
snapshots = get_snapshots()
for snap in snapshots:
output = [snap['time'].split('.')[0], snap['short_id']]
if tags:= snap.get('tags'):
output = [snap["time"].split(".")[0], snap["short_id"]]
if tags := snap.get("tags"):
app_versions = app_versions_from_tags(tags)
if version := app_versions.get(SERVICE):
output.append(version)
print(*output)
if not snapshots:
err_msg = "No Snapshots found"
if SERVICE != 'ALL':
service_name = SERVICE.replace('_', '.')
err_msg += f' for app {service_name}'
if SERVICE != "ALL":
service_name = SERVICE.replace("_", ".")
err_msg += f" for app {service_name}"
logger.warning(err_msg)
@cli.command()
@click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest')
@click.option('show_all', '--all', '-a', envvar='SHOW_ALL', is_flag=True)
@click.option('timestamps', '--timestamps', '-t', envvar='TIMESTAMPS', is_flag=True)
@click.argument('path', required=False, default='/var/lib/docker/volumes/', envvar='INCLUDE_PATH')
@click.option("snapshot", "--snapshot", "-s", envvar="SNAPSHOT", default="latest")
@click.option("show_all", "--all", "-a", envvar="SHOW_ALL", is_flag=True)
@click.option("timestamps", "--timestamps", "-t", envvar="TIMESTAMPS", is_flag=True)
@click.argument(
"path", required=False, default="/var/lib/docker/volumes/", envvar="INCLUDE_PATH"
)
def ls(snapshot, show_all, timestamps, path):
if snapshot == 'latest':
latest_snapshot = get_snapshots('latest')
if snapshot == "latest":
latest_snapshot = get_snapshots("latest")
if not latest_snapshot:
logger.error(f"There is no latest snapshot for {SERVICE}")
exit(1)
snapshot = latest_snapshot[0]['short_id']
snapshot = latest_snapshot[0]["short_id"]
if show_all:
path = None
results = list_files(snapshot, path)
for r in results:
if r.get('path'):
if r.get("path"):
if timestamps:
print(f"{r['ctime']}\t{r['path']}")
else:
@ -478,94 +538,93 @@ def ls(snapshot, show_all, timestamps, path):
def list_files(snapshot, path):
cmd = restic.cat.base_command() + ['ls']
cmd = restic.cat.base_command() + ["ls"]
cmd.append(snapshot)
if path:
cmd.append(path)
try:
output = restic.internal.command_executor.execute(cmd)
except ResticFailedError as error:
if 'no snapshot found' in str(error):
if "no snapshot found" in str(error):
err_msg = f'There is no snapshot "{snapshot}"'
if SERVICE != 'ALL':
if SERVICE != "ALL":
err_msg += f' for the app "{SERVICE}"'
logger.error(err_msg)
exit(1)
else:
raise error
output = output.replace('}\n{', '}|{')
results = list(map(json.loads, output.split('|')))
output = output.replace("}\n{", "}|{")
results = list(map(json.loads, output.split("|")))
return results
@cli.command()
@click.option('snapshot', '--snapshot', '-s', envvar='SNAPSHOT', default='latest')
@click.option('path', '--path', '-p', envvar='INCLUDE_PATH')
@click.option('volumes', '--volumes', '-v', envvar='VOLUMES')
@click.option('secrets', '--secrets', '-c', is_flag=True, envvar='SECRETS')
@click.option("snapshot", "--snapshot", "-s", envvar="SNAPSHOT", default="latest")
@click.option("path", "--path", "-p", envvar="INCLUDE_PATH")
@click.option("volumes", "--volumes", "-v", envvar="VOLUMES")
@click.option("secrets", "--secrets", "-c", is_flag=True, envvar="SECRETS")
def download(snapshot, path, volumes, secrets):
file_dumps = []
if snapshot == 'latest':
latest_snapshot = get_snapshots('latest')
if snapshot == "latest":
latest_snapshot = get_snapshots("latest")
if not latest_snapshot:
logger.error(f"There is no latest snapshot for {SERVICE}")
exit(1)
snapshot = latest_snapshot[0]['short_id']
snapshot = latest_snapshot[0]["short_id"]
if not any([path, volumes, secrets]):
volumes = secrets = True
if path:
path = path.removesuffix('/')
path = path.removesuffix("/")
binary_output = dump(snapshot, path)
files = list_files(snapshot, path)
filetype = [f.get('type') for f in files if f.get('path') == path][0]
filetype = [f.get("type") for f in files if f.get("path") == path][0]
filename = Path(path).name
if filetype == 'dir':
if filetype == "dir":
filename = filename + ".tar"
tarinfo = tarfile.TarInfo(name=filename)
tarinfo.size = len(binary_output)
file_dumps.append((binary_output, tarinfo))
if volumes:
if SERVICE == 'ALL':
if SERVICE == "ALL":
logger.error("Please specify '--host' when using '--volumes'")
exit(1)
files = list_files(snapshot, VOLUME_PATH)
for f in files[1:]:
path = f['path']
if Path(path).name.startswith(SERVICE) and f['type'] == 'dir':
path = f["path"]
if Path(path).name.startswith(SERVICE) and f["type"] == "dir":
binary_output = dump(snapshot, path)
filename = f"{Path(path).name}.tar"
tarinfo = tarfile.TarInfo(name=filename)
tarinfo.size = len(binary_output)
file_dumps.append((binary_output, tarinfo))
if secrets:
if SERVICE == 'ALL':
if SERVICE == "ALL":
logger.error("Please specify '--host' when using '--secrets'")
exit(1)
filename = f"{SERVICE}.json"
files = list_files(snapshot, SECRET_PATH)
secrets = {}
for f in files[1:]:
path = f['path']
if Path(path).name.startswith(SERVICE) and f['type'] == 'file':
path = f["path"]
if Path(path).name.startswith(SERVICE) and f["type"] == "file":
secret = dump(snapshot, path).decode()
secret_name = path.removeprefix(f'{SECRET_PATH}{SERVICE}_')
secret_name = path.removeprefix(f"{SECRET_PATH}{SERVICE}_")
secrets[secret_name] = secret
binary_output = json.dumps(secrets).encode()
tarinfo = tarfile.TarInfo(name=filename)
tarinfo.size = len(binary_output)
file_dumps.append((binary_output, tarinfo))
with tarfile.open('/tmp/backup.tar.gz', "w:gz") as tar:
with tarfile.open("/tmp/backup.tar.gz", "w:gz") as tar:
print(f"Writing files to /tmp/backup.tar.gz...")
for binary_output, tarinfo in file_dumps:
tar.addfile(tarinfo, fileobj=io.BytesIO(binary_output))
size = get_formatted_size('/tmp/backup.tar.gz')
print(
f"Backup has been written to /tmp/backup.tar.gz with a size of {size}")
size = get_formatted_size("/tmp/backup.tar.gz")
print(f"Backup has been written to /tmp/backup.tar.gz with a size of {size}")
def get_formatted_size(file_path):
file_size = os.path.getsize(file_path)
units = ['Bytes', 'KB', 'MB', 'GB', 'TB']
units = ["Bytes", "KB", "MB", "GB", "TB"]
for unit in units:
if file_size < 1024:
return f"{round(file_size, 3)} {unit}"
@ -574,16 +633,17 @@ def get_formatted_size(file_path):
def dump(snapshot, path):
cmd = restic.cat.base_command() + ['dump']
cmd = restic.cat.base_command() + ["dump"]
cmd = cmd + [snapshot, path]
print(f"Dumping {path} from snapshot '{snapshot}'")
output = subprocess.run(cmd, capture_output=True)
if output.returncode:
logger.error(
f"error while dumping {path} from snapshot '{snapshot}': {output.stderr}")
f"error while dumping {path} from snapshot '{snapshot}': {output.stderr}"
)
exit(1)
return output.stdout
if __name__ == '__main__':
if __name__ == "__main__":
cli()