Skip to content

Commit

Permalink
Merge pull request #8813 from dolthub/aaron/nbs-manifest-update-dedup
Browse files Browse the repository at this point in the history
[no-release-notes] go: store/nbs: Unify logic from UpdateManifest and UpdateManifestWithAppendix
  • Loading branch information
reltuk authored Feb 5, 2025
2 parents cb155c5 + 928ae32 commit 973ca47
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 97 deletions.
140 changes: 46 additions & 94 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,20 +271,20 @@ func (nbs *NomsBlockStore) conjoinIfRequired(ctx context.Context) (bool, error)
}

func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
err = nbs.checkAllManifestUpdatesExist(ctx, updates)
if err != nil {
return
}
return nbs.updateManifestAddFiles(ctx, updates, nil)
}

func (nbs *NomsBlockStore) updateManifestAddFiles(ctx context.Context, updates map[hash.Hash]uint32, appendixOption *ManifestAppendixOption) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()

nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()

if err == nil {
err = unlockErr
}
err = errors.Join(err, nbs.mm.UnlockForUpdate())
}()

_, err = nbs.conjoinIfRequired(ctx)
Expand All @@ -303,30 +303,56 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.

originalLock := contents.lock

// Behavior:
// If appendix == nil, we are appending to currSpecs and keeping the current appendix.
// If *appendix == ManifestAppendixOption_Set, we are setting appendix to updates.
// If *appendix == ManifestAppendixOption_Append, we are appending updates to appendix.

currSpecs := contents.getSpecSet()
currAppendixSpecs := contents.getAppendixSet()
appendixSpecs := make([]tableSpec, 0)

var addCount int
hasWork := (appendixOption != nil && *appendixOption == ManifestAppendixOption_Set)
for h, count := range updates {
if _, ok := currSpecs[h]; !ok {
addCount++
contents.specs = append(contents.specs, tableSpec{h, count})
if appendixOption == nil {
if _, ok := currSpecs[h]; !ok {
hasWork = true
contents.specs = append(contents.specs, tableSpec{h, count})
}
} else if *appendixOption == ManifestAppendixOption_Set {
hasWork = true
appendixSpecs = append(appendixSpecs, tableSpec{h, count})
} else if *appendixOption == ManifestAppendixOption_Append {
if _, ok := currAppendixSpecs[h]; !ok {
hasWork = true
appendixSpecs = append(appendixSpecs, tableSpec{h, count})
}
} else {
return manifestContents{}, ErrUnsupportedManifestAppendixOption
}
}

if addCount == 0 {
if !hasWork {
return contents, nil
}

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)

// ensure we don't drop existing appendices
if contents.appendix != nil && len(contents.appendix) > 0 {
contents, err = fromManifestAppendixOptionNewContents(contents, contents.appendix, ManifestAppendixOption_Set)
if appendixOption == nil {
// ensure we don't drop existing appendices
if contents.appendix != nil && len(contents.appendix) > 0 {
contents, err = fromManifestAppendixOptionNewContents(contents, contents.appendix, ManifestAppendixOption_Set)
if err != nil {
return manifestContents{}, err
}
}
} else {
contents, err = fromManifestAppendixOptionNewContents(contents, appendixSpecs, *appendixOption)
if err != nil {
return manifestContents{}, err
}
}

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)

updatedContents, err = nbs.mm.Update(ctx, originalLock, contents, nbs.stats, nil)
if err != nil {
return manifestContents{}, err
Expand All @@ -337,7 +363,8 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
}
}

newTables, err := nbs.tables.rebase(ctx, updatedContents.specs, nbs.stats)
var newTables tableSet
newTables, err = nbs.tables.rebase(ctx, updatedContents.specs, nbs.stats)
if err != nil {
return manifestContents{}, err
}
Expand All @@ -354,86 +381,11 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
}

func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
err = nbs.checkAllManifestUpdatesExist(ctx, updates)
if err != nil {
return
}

nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()

if err == nil {
err = unlockErr
}
}()

_, err = nbs.conjoinIfRequired(ctx)
if err != nil {
return manifestContents{}, err
}

var updatedContents manifestContents
for {
ok, contents, _, ferr := nbs.mm.Fetch(ctx, nbs.stats)

if ferr != nil {
return manifestContents{}, ferr
} else if !ok {
contents = manifestContents{nbfVers: nbs.upstream.nbfVers}
}

originalLock := contents.lock

currAppendixSpecs := contents.getAppendixSet()

appendixSpecs := make([]tableSpec, 0)
var addCount int
for h, count := range updates {
if option == ManifestAppendixOption_Set {
appendixSpecs = append(appendixSpecs, tableSpec{h, count})
} else {
if _, ok := currAppendixSpecs[h]; !ok {
addCount++
appendixSpecs = append(appendixSpecs, tableSpec{h, count})
}
}
}

if addCount == 0 && option != ManifestAppendixOption_Set {
return contents, nil
}

contents, err = fromManifestAppendixOptionNewContents(contents, appendixSpecs, option)
if err != nil {
return manifestContents{}, err
}

updatedContents, err = nbs.mm.Update(ctx, originalLock, contents, nbs.stats, nil)
if err != nil {
return manifestContents{}, err
}

if updatedContents.lock == contents.lock {
break
}
}

newTables, err := nbs.tables.rebase(ctx, updatedContents.specs, nbs.stats)
if err != nil {
return manifestContents{}, err
}

nbs.upstream = updatedContents
oldTables := nbs.tables
nbs.tables = newTables
err = oldTables.close()
if err != nil {
return manifestContents{}, err
}
return updatedContents, nil
return nbs.updateManifestAddFiles(ctx, updates, &option)
}

func (nbs *NomsBlockStore) checkAllManifestUpdatesExist(ctx context.Context, updates map[hash.Hash]uint32) error {
Expand Down
6 changes: 3 additions & 3 deletions go/store/nbs/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,9 @@ func prepStore(ctx context.Context, t *testing.T, assert *assert.Assertions) (*f
}

func TestNBSUpdateManifestWithAppendixOptions(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()

_, p, q, store, _, _ := prepStore(ctx, t, assert)
_, p, q, store, _, _ := prepStore(ctx, t, assert.New(t))
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
Expand Down Expand Up @@ -469,6 +468,7 @@ func TestNBSUpdateManifestWithAppendixOptions(t *testing.T) {

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
assert := assert.New(t)
updates := make(map[hash.Hash]uint32)
for _, id := range test.appendixSpecIds {
updates[id] = appendixUpdates[id]
Expand All @@ -481,7 +481,7 @@ func TestNBSUpdateManifestWithAppendixOptions(t *testing.T) {
assert.Equal(test.expectedNumberOfAppendixSpecs, info.NumAppendixSpecs())
} else {
_, err := store.UpdateManifestWithAppendix(ctx, updates, test.option)
assert.Equal(test.expectedError, err)
assert.ErrorIs(err, test.expectedError)
}
})
}
Expand Down

0 comments on commit 973ca47

Please sign in to comment.