-
-
Notifications
You must be signed in to change notification settings - Fork 32.5k
/
Copy pathbackup_restore.py
126 lines (104 loc) · 4 KB
/
backup_restore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Home Assistant module to handle restoring backups."""
from dataclasses import dataclass
import json
import logging
from pathlib import Path
import shutil
import sys
from tempfile import TemporaryDirectory
from awesomeversion import AwesomeVersion
import securetar
from .const import __version__ as HA_VERSION
RESTORE_BACKUP_FILE = ".HA_RESTORE"
KEEP_PATHS = ("backups",)
_LOGGER = logging.getLogger(__name__)
@dataclass
class RestoreBackupFileContent:
"""Definition for restore backup file content."""
backup_file_path: Path
def restore_backup_file_content(config_dir: Path) -> RestoreBackupFileContent | None:
"""Return the contents of the restore backup file."""
instruction_path = config_dir.joinpath(RESTORE_BACKUP_FILE)
try:
instruction_content = json.loads(instruction_path.read_text(encoding="utf-8"))
return RestoreBackupFileContent(
backup_file_path=Path(instruction_content["path"])
)
except (FileNotFoundError, json.JSONDecodeError):
return None
def _clear_configuration_directory(config_dir: Path) -> None:
"""Delete all files and directories in the config directory except for the backups directory."""
keep_paths = [config_dir.joinpath(path) for path in KEEP_PATHS]
config_contents = sorted(
[entry for entry in config_dir.iterdir() if entry not in keep_paths]
)
for entry in config_contents:
entrypath = config_dir.joinpath(entry)
if entrypath.is_file():
entrypath.unlink()
elif entrypath.is_dir():
shutil.rmtree(entrypath)
def _extract_backup(config_dir: Path, backup_file_path: Path) -> None:
"""Extract the backup file to the config directory."""
with (
TemporaryDirectory() as tempdir,
securetar.SecureTarFile(
backup_file_path,
gzip=False,
mode="r",
) as ostf,
):
ostf.extractall(
path=Path(tempdir, "extracted"),
members=securetar.secure_path(ostf),
filter="fully_trusted",
)
backup_meta_file = Path(tempdir, "extracted", "backup.json")
backup_meta = json.loads(backup_meta_file.read_text(encoding="utf8"))
if (
backup_meta_version := AwesomeVersion(
backup_meta["homeassistant"]["version"]
)
) > HA_VERSION:
raise ValueError(
f"You need at least Home Assistant version {backup_meta_version} to restore this backup"
)
with securetar.SecureTarFile(
Path(
tempdir,
"extracted",
f"homeassistant.tar{'.gz' if backup_meta["compressed"] else ''}",
),
gzip=backup_meta["compressed"],
mode="r",
) as istf:
for member in istf.getmembers():
if member.name == "data":
continue
member.name = member.name.replace("data/", "")
_clear_configuration_directory(config_dir)
istf.extractall(
path=config_dir,
members=[
member
for member in securetar.secure_path(istf)
if member.name != "data"
],
filter="fully_trusted",
)
def restore_backup(config_dir_path: str) -> bool:
"""Restore the backup file if any.
Returns True if a restore backup file was found and restored, False otherwise.
"""
config_dir = Path(config_dir_path)
if not (restore_content := restore_backup_file_content(config_dir)):
return False
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
backup_file_path = restore_content.backup_file_path
_LOGGER.info("Restoring %s", backup_file_path)
try:
_extract_backup(config_dir, backup_file_path)
except FileNotFoundError as err:
raise ValueError(f"Backup file {backup_file_path} does not exist") from err
_LOGGER.info("Restore complete, restarting")
return True