Skip to content

Commit 402a6f9

Browse files
committed
feat(sealing): new sealing using interface to separate from active fraction implementation
1 parent abd6bfb commit 402a6f9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2058
-1471
lines changed

bytespool/writer.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@ func AcquireWriterSize(out io.Writer, size int) *Writer {
2929
}
3030
}
3131

32-
func FlushReleaseWriter(w *Writer) error {
33-
err := w.Flush()
34-
if err != nil {
35-
return err
36-
}
32+
func ReleaseWriter(w *Writer) {
3733
Release(w.Buf)
3834
w.Buf = nil
3935
w.out = nil
4036
writerPool.Put(w)
37+
}
38+
39+
func FlushReleaseWriter(w *Writer) error {
40+
if err := w.Flush(); err != nil {
41+
return err
42+
}
43+
ReleaseWriter(w)
4144
return nil
4245
}
4346

cmd/distribution/main.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import (
1111

1212
"github.com/ozontech/seq-db/cache"
1313
"github.com/ozontech/seq-db/consts"
14-
"github.com/ozontech/seq-db/frac"
14+
"github.com/ozontech/seq-db/frac/common"
15+
"github.com/ozontech/seq-db/frac/sealed"
1516
"github.com/ozontech/seq-db/fracmanager"
1617
"github.com/ozontech/seq-db/logger"
1718
"github.com/ozontech/seq-db/seq"
@@ -58,8 +59,10 @@ func readBlock(reader storage.IndexReader, blockIndex uint32) ([]byte, error) {
5859
return data, nil
5960
}
6061

61-
func loadInfo(path string) *frac.Info {
62+
func loadInfo(path string) *common.Info {
6263
indexReader, f := getReader(path)
64+
defer f.Close()
65+
6366
result, err := readBlock(indexReader, 0)
6467
if err != nil {
6568
logger.Fatal("error reading block", zap.String("file", path), zap.Error(err))
@@ -69,7 +72,7 @@ func loadInfo(path string) *frac.Info {
6972
logger.Fatal("seq-db index file header corrupted", zap.String("file", path))
7073
}
7174

72-
b := frac.BlockInfo{}
75+
b := sealed.BlockInfo{}
7376
err = b.Unpack(result)
7477
if err != nil {
7578
logger.Fatal("can't unpack info bloc of index file", zap.String("file", path), zap.Error(err))
@@ -84,8 +87,9 @@ func loadInfo(path string) *frac.Info {
8487
return b.Info
8588
}
8689

87-
func buildDist(dist *seq.MIDsDistribution, path string, _ *frac.Info) {
88-
blocksReader, _ := getReader(path)
90+
func buildDist(dist *seq.MIDsDistribution, path string, _ *common.Info) {
91+
blocksReader, f := getReader(path)
92+
defer f.Close()
8993

9094
// skip tokens
9195
blockIndex := uint32(1)

cmd/index_analyzer/main.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/alecthomas/units"
1111
"go.uber.org/zap"
1212

13-
"github.com/ozontech/seq-db/frac"
13+
"github.com/ozontech/seq-db/frac/sealed"
1414
"github.com/ozontech/seq-db/frac/sealed/lids"
1515
"github.com/ozontech/seq-db/frac/sealed/token"
1616
"github.com/ozontech/seq-db/fracmanager"
@@ -91,8 +91,11 @@ func analyzeIndex(
9191
}
9292

9393
// load info
94-
b := frac.BlockInfo{}
95-
_ = b.Unpack(readBlock())
94+
var b sealed.BlockInfo
95+
if err := b.Unpack(readBlock()); err != nil {
96+
logger.Fatal("error unpacking block info", zap.Error(err))
97+
}
98+
9699
docsCount := int(b.Info.DocsTotal)
97100

98101
// load tokens

cmd/seq-db/seq-db.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/ozontech/seq-db/config"
2323
"github.com/ozontech/seq-db/consts"
2424
"github.com/ozontech/seq-db/frac"
25+
"github.com/ozontech/seq-db/frac/common"
2526
"github.com/ozontech/seq-db/fracmanager"
2627
"github.com/ozontech/seq-db/logger"
2728
"github.com/ozontech/seq-db/mappingprovider"
@@ -259,7 +260,7 @@ func startStore(
259260
MaintenanceDelay: 0,
260261
CacheGCDelay: 0,
261262
CacheCleanupDelay: 0,
262-
SealParams: frac.SealParams{
263+
SealParams: common.SealParams{
263264
IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
264265
LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
265266
TokenListZstdLevel: cfg.Compression.SealedZstdCompressionLevel,

consts/consts.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@ const (
1111
// DummyMID is used in aggregations when we do not need to build time series.
1212
DummyMID = 0
1313

14-
IDsBlockSize = int(4 * units.KiB)
15-
RegularBlockSize = int(16 * units.KiB)
1614
IDsPerBlock = int(4 * units.KiB)
1715
LIDBlockCap = int(64 * units.KiB)
16+
RegularBlockSize = int(16 * units.KiB)
1817

1918
DefaultMaintenanceDelay = time.Second
2019
DefaultCacheGCDelay = 1 * time.Second

frac/active.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/ozontech/seq-db/cache"
1717
"github.com/ozontech/seq-db/config"
1818
"github.com/ozontech/seq-db/consts"
19+
"github.com/ozontech/seq-db/frac/common"
1920
"github.com/ozontech/seq-db/logger"
2021
"github.com/ozontech/seq-db/metric"
2122
"github.com/ozontech/seq-db/metric/stopwatch"
@@ -38,7 +39,7 @@ type Active struct {
3839
released bool
3940

4041
infoMu sync.RWMutex
41-
info *Info
42+
info *common.Info
4243

4344
MIDs *UInt64s
4445
RIDs *UInt64s
@@ -103,7 +104,7 @@ func NewActive(
103104
writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync),
104105

105106
BaseFileName: baseFileName,
106-
info: NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
107+
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
107108
Config: cfg,
108109
}
109110

@@ -300,7 +301,7 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
300301
}
301302
}
302303

303-
func (f *Active) Info() *Info {
304+
func (f *Active) Info() *common.Info {
304305
f.infoMu.RLock()
305306
defer f.infoMu.RUnlock()
306307

frac/active_docs_positions.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,22 @@ import (
77
)
88

99
type DocsPositions struct {
10-
mu sync.RWMutex
11-
positions map[seq.ID]seq.DocPos
10+
mu sync.RWMutex
11+
idToPos map[seq.ID]seq.DocPos
12+
lidToPos []seq.DocPos
1213
}
1314

1415
func NewSyncDocsPositions() *DocsPositions {
15-
return &DocsPositions{
16-
positions: make(map[seq.ID]seq.DocPos),
16+
dp := DocsPositions{
17+
lidToPos: make([]seq.DocPos, 0),
18+
idToPos: make(map[seq.ID]seq.DocPos),
1719
}
20+
dp.lidToPos = append(dp.lidToPos, 0) // systemID
21+
return &dp
1822
}
1923

2024
func (dp *DocsPositions) Get(id seq.ID) seq.DocPos {
21-
if val, ok := dp.positions[id]; ok {
25+
if val, ok := dp.idToPos[id]; ok {
2226
return val
2327
}
2428
return seq.DocPosNotFound
@@ -36,13 +40,22 @@ func (dp *DocsPositions) SetMultiple(ids []seq.ID, pos []seq.DocPos) []seq.ID {
3640
dp.mu.Lock()
3741
defer dp.mu.Unlock()
3842

39-
appended := make([]seq.ID, 0)
43+
appended := make([]seq.ID, 0, len(ids))
4044
for i, id := range ids {
41-
// Positions may be equal in case of nested index.
42-
if savedPos, ok := dp.positions[id]; !ok || savedPos == pos[i] {
43-
dp.positions[id] = pos[i]
44-
appended = append(appended, id)
45+
p, ok := dp.idToPos[id]
46+
47+
if ok {
48+
if p != pos[i] {
49+
// same ID but different position
50+
// this is a duplicate ID, we can't append it
51+
continue
52+
}
53+
} else {
54+
dp.idToPos[id] = pos[i]
4555
}
56+
57+
dp.lidToPos = append(dp.lidToPos, pos[i])
58+
appended = append(appended, id)
4659
}
4760
return appended
4861
}

frac/active_index.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package frac
33
import (
44
"context"
55

6+
"github.com/ozontech/seq-db/frac/common"
67
"github.com/ozontech/seq-db/frac/processor"
78
"github.com/ozontech/seq-db/frac/sealed/lids"
89
"github.com/ozontech/seq-db/metric/stopwatch"
@@ -15,7 +16,7 @@ import (
1516
type activeDataProvider struct {
1617
ctx context.Context
1718
config *Config
18-
info *Info
19+
info *common.Info
1920

2021
mids *UInt64s
2122
rids *UInt64s

0 commit comments

Comments
 (0)