Skip to content

Commit

Permalink
Fix: save a no-data session when volume shrinks and no other deltas a…
Browse files Browse the repository at this point in the history
…re detected

Issue #168
  • Loading branch information
tasket committed Jul 11, 2023
1 parent 75aea90 commit b5d236d
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions wyng
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ class ArchiveSet:

def last_chunk_addr(self, vsize=None):
if vsize is None: vsize = self.volsize()
return (vsize-1) - ((vsize-1) % self.archive.chunksize)
chdigits = max_address.bit_length() // 4
lchunk_addr = (vsize-1) - ((vsize-1) % self.archive.chunksize)
return lchunk_addr, ("x%0"+str(chdigits)+"x") % lchunk_addr

# Based on last session size unless volume_size is specified.
def mapsize(self, volume_size=None):
Expand Down Expand Up @@ -1389,7 +1391,8 @@ def update_delta_digest(datavol):

# Check for volume size increase;
# Chunks from 'markall_pos' onward will be marked for backup.
next_chunk_addr = vol.last_chunk_addr() + chunksize
changed = snap2size != vol.volsize()
next_chunk_addr = vol.last_chunk_addr()[0] + chunksize
markall_pos = (next_chunk_addr//chunksize//8) if snap2size-1 >= next_chunk_addr else None

# Setup access to deltamap as an mmap object.
Expand Down Expand Up @@ -1431,16 +1434,16 @@ def update_delta_digest(datavol):
if dnewchunks+dfreedblocks: bmapf.flush() ; os.fsync(bmapf.fileno())

catch_signals()
map_updated = dnewchunks+dfreedblocks+anynew+vol.map_used(ext="-tmp") > 0
changed = changed or dnewchunks+dfreedblocks+anynew+vol.map_used(ext="-tmp") > 0
if monitor_only:
rotate_snapshots(vol, rotate=map_updated)
rotate_snapshots(vol, rotate=changed)
os.rename(vol.mapfile+"-tmp", vol.mapfile)
print(("\r %d ch, %d dis" % (dnewchunks, dfreedblocks//chunksize))
if map_updated else "\r No changes ")
if changed else "\r No changes ")

if dnewchunks: vol.changed_bytes_add(dnewchunks*aset.chunksize, save=monitor_only)
catch_signals(None)
return map_updated
return changed


# Reads addresses from manifest and marks corresponding chunks in a volume's deltamap.
Expand Down Expand Up @@ -1478,7 +1481,7 @@ def send_volume(datavol, localtime, ses_tags, send_all):
if len(vol.sessions):
# Our chunks are usually smaller than LVM's, so generate a full manifest to detect
# significant amount of unchanged chunks that are flagged in the delta bmap.
fullmanifest = open(merge_manifests(datavol), "r")
fullmanifest = open(merge_manifests(datavol, vsize=prior_size), "r")
fullmanifest_readline = fullmanifest.readline
else:
fullmanifest = None
Expand Down Expand Up @@ -1507,17 +1510,24 @@ def send_volume(datavol, localtime, ses_tags, send_all):
zeros = bytes(chunksize)
bcount = freed = ddbytes = 0
addrsplit = -address_split[1]
lchunk_addr = vol.last_chunk_addr(snap2size)
lchunk_addr = vol.last_chunk_addr(snap2size)[0]

compresslevel = int(aset.compr_level) ; compress = compressors[aset.compression][2]

# Use tar to stream files to destination
stream_started = False
untar_cmd = [destcd + " && mkdir -p ."+bkdir
+ " && exec >>"+desttmpdir+"/send.log 2>&1"
+ " && "+destcd+bkdir + " && echo "+datavol
+ " && python3 "+desttmpdir+"/dest_helper.py send"
+ (" --sync" if options.maxsync else "")]
untar = subprocess.Popen(dest_run_args(desttype, untar_cmd),
stdin =subprocess.PIPE, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
tarf = tarfile.open(mode="w|", fileobj=untar.stdin)
tarf_addfile = tarf.addfile ; TarInfo = tarfile.TarInfo
stream_started = True ; LNKTYPE = tarfile.LNKTYPE
tar_info = TarInfo(sdir+"-tmp") ; tar_info.type = tarfile.DIRTYPE
tarf_addfile(tarinfo=tar_info)

# Open source volume and its delta bitmap as r, session manifest as w.
with open(pjoin("/dev",aset.vgname,snap2vol),"rb", buffering=chunksize) as vf, \
Expand Down Expand Up @@ -1588,17 +1598,6 @@ def send_volume(datavol, localtime, ses_tags, send_all):
# Skip when current and prior chunks are the same
if compare_digest(fman_hash, hexhash): continue

# Start tar stream
if not stream_started:
untar = subprocess.Popen(dest_run_args(desttype, untar_cmd),
stdin =subprocess.PIPE, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
tarf = tarfile.open(mode="w|", fileobj=untar.stdin)
tarf_addfile = tarf.addfile ; TarInfo = tarfile.TarInfo
stream_started = True ; LNKTYPE = tarfile.LNKTYPE
tar_info = TarInfo(sdir+"-tmp") ; tar_info.type = tarfile.DIRTYPE
tarf_addfile(tarinfo=tar_info)

# Add entry to new manifest
print(hexhash, destfile, file=hashf)
if hexhash == "0": continue
Expand Down Expand Up @@ -2072,7 +2071,7 @@ def autoprune(datavol, needed_space=0, apmode="off", include=set()):
# Accepts a list of session names in ascending order (or else uses all sessions in the volume)
# and merges the manifests. Setting 'addcol' will add a colunm showing the session dir name.

def merge_manifests(datavol, msessions=None, mtarget=None, addcol=False):
def merge_manifests(datavol, msessions=None, mtarget=None, vsize=None, addcol=False):
volume = aset.vols[datavol]
msessions = volume.sesnames if not msessions else msessions
sespaths = [ os.path.basename(volume.sessions[x].path) for x in msessions ]
Expand All @@ -2096,8 +2095,12 @@ def merge_manifests(datavol, msessions=None, mtarget=None, addcol=False):
else:
cdir = volume.path ; slsort = slist[0]

do_exec([[CP.sort, "-umd", "-k2,2", "--batch-size=64", "--files0-from="+slsort]],
out=outfile, cwd=cdir)
cmds = [[CP.sort, "-umsd", "-k2,2", "--batch-size=16", "--files0-from="+slsort]]
if vsize is not None and vsize < max(volume.sessions[x].volsize for x in msessions):
# session vol size shrank, so filter result by requested vsize
cmds.append([CP.awk, r'$2<="'+volume.last_chunk_addr(vsize)[1]+'"'])

do_exec(cmds, out=outfile, cwd=cdir)
if addcol: shutil.rmtree(tmp+"/m")

return outfile
Expand All @@ -2112,8 +2115,6 @@ def merge_manifests(datavol, msessions=None, mtarget=None, addcol=False):
def merge_sessions(datavol, sources, target, clear_sources=False):

volume = aset.vols[datavol] ; resume = aset.in_process is not None
chdigits = max_address.bit_length() // 4 # 4bits per digit
chformat = "x%0"+str(chdigits)+"x"
m_tmp = tmpdir if volume.volsize() < 128000000000 else big_tmpdir

# Prepare manifests for efficient merge using fs mv/replace. The target is
Expand All @@ -2131,8 +2132,7 @@ def merge_sessions(datavol, sources, target, clear_sources=False):
volsize = volume.sessions[target].volsize
vol_shrank = volsize < max(x.volsize for x in volume.sessions.values()
if x.name in sources)
last_chunk = chformat % volume.last_chunk_addr(volsize)
lc_filter = '"'+last_chunk+'"'
lc_filter = '"'+volume.last_chunk_addr(volsize)[1]+'"'

with open("merge.lst", "wt") as lstf:
print(merge_target, target, file=lstf)
Expand Down Expand Up @@ -2348,8 +2348,8 @@ def receive_volume(datavol, select_ses="", save_path="", diff=False):
x_it(0,"")

chdigits = max_address.bit_length() // 4 ; chformat = "x%0"+str(chdigits)+"x"
lchunk_addr = vol.last_chunk_addr(volsize) ; last_chunkx = chformat % lchunk_addr
addrsplit = -address_split[1] ; rc = None
lchunk_addr, last_chunkx = vol.last_chunk_addr(volsize)

# Collect session manifests
include = False ; incl_ses = []
Expand Down Expand Up @@ -2791,7 +2791,7 @@ def cleanup():

# Constants / Globals
prog_name = "wyng"
prog_version = "0.3.14" ; prog_date = "20230622"
prog_version = "0.3.15" ; prog_date = "20230708"
format_version = 2 ; debug = False ; tmpdir = None


Expand Down

0 comments on commit b5d236d

Please sign in to comment.