Skip to content

Commit 23134d3

Browse files
committed
feat(fracmanager): improve framanager consistency
1 parent 489a401 commit 23134d3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2214
-1671
lines changed

cmd/distribution/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func main() {
161161
}
162162
}
163163

164-
fc := fracmanager.NewSealedFracCache(filePathDist)
164+
fc := fracmanager.NewFracInfoCache(filePathDist)
165165

166166
lastSavedTime := time.Now()
167167
for _, path := range getAllFracs(dataDir) {
@@ -170,7 +170,7 @@ func main() {
170170

171171
logger.Info("start process", zap.String("name", key))
172172

173-
info, ok := fc.GetFracInfo(key)
173+
info, ok := fc.Get(key)
174174
if ok {
175175
logger.Info("found in frac-cache", zap.String("key", key))
176176
} else {
@@ -198,7 +198,7 @@ func main() {
198198
}
199199

200200
buildDist(info.Distribution, path, info)
201-
fc.AddFraction(key, info)
201+
fc.Add(info)
202202
logger.Info("built distribution", zap.Int("affected_minutes", len(info.Distribution.GetDist())))
203203
printDistribution(info.Distribution)
204204

cmd/index_analyzer/main.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"hash/fnv"
77
"os"
8+
"sync"
89
"time"
910

1011
"github.com/alecthomas/units"
@@ -57,7 +58,15 @@ func main() {
5758
func getCacheMaintainer() (*fracmanager.CacheMaintainer, func()) {
5859
done := make(chan struct{})
5960
cm := fracmanager.NewCacheMaintainer(uint64(units.GiB), uint64(units.MiB*64), nil)
60-
wg := cm.RunCleanLoop(done, time.Second, time.Second)
61+
62+
wg := sync.WaitGroup{}
63+
64+
wg.Add(1)
65+
go func() {
66+
defer wg.Done()
67+
cm.RunCleanLoop(done, time.Second, time.Second)
68+
}()
69+
6170
return cm, func() {
6271
close(done)
6372
wg.Wait()

cmd/seq-db/seq-db.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ func startStore(
251251
CacheSize: uint64(cfg.Resources.CacheSize),
252252
SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize),
253253
FracLoadLimit: 0,
254-
ShouldReplay: true,
255254
MaintenanceDelay: 0,
256255
CacheGCDelay: 0,
257256
CacheCleanupDelay: 0,

consts/consts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const (
4242

4343
MaxTextFieldValueLength = 32 * 1024
4444

45-
SealOnExitFracSizePercent = 20 // Percent of the max frac size, above which the fraction is sealed on exit
45+
MinSealPercent = 20 // Percent of the max frac size, above which the fraction is sealed on exit
4646

4747
IngestorMaxInflightBulks = 32
4848

frac/active.go

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/ozontech/seq-db/config"
1818
"github.com/ozontech/seq-db/consts"
1919
"github.com/ozontech/seq-db/frac/common"
20+
"github.com/ozontech/seq-db/frac/processor"
2021
"github.com/ozontech/seq-db/logger"
2122
"github.com/ozontech/seq-db/metric"
2223
"github.com/ozontech/seq-db/metric/stopwatch"
@@ -34,10 +35,6 @@ type Active struct {
3435

3536
BaseFileName string
3637

37-
useMu sync.RWMutex
38-
suicided bool
39-
released bool
40-
4138
infoMu sync.RWMutex
4239
info *common.Info
4340

@@ -266,23 +263,19 @@ func (f *Active) String() string {
266263
return fracToString(f, "active")
267264
}
268265

269-
func (f *Active) DataProvider(ctx context.Context) (DataProvider, func()) {
270-
f.useMu.RLock()
271-
272-
if f.suicided || f.released || f.Info().DocsTotal == 0 { // it is empty active fraction state
273-
if f.suicided {
274-
metric.CountersTotal.WithLabelValues("fraction_suicided").Inc()
275-
}
276-
f.useMu.RUnlock()
277-
return EmptyDataProvider{}, func() {}
266+
func (f *Active) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
267+
if f.Info().DocsTotal == 0 { // it is empty active fraction state
268+
return nil, nil
278269
}
270+
return f.createDataProvider(ctx).Fetch(ids)
271+
}
279272

280-
// it is ordinary active fraction state
281-
dp := f.createDataProvider(ctx)
282-
return dp, func() {
283-
dp.release()
284-
f.useMu.RUnlock()
273+
func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
274+
if f.Info().DocsTotal == 0 { // it is empty active fraction state
275+
metric.CountersTotal.WithLabelValues("empty_data_provider").Inc()
276+
return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil
285277
}
278+
return f.createDataProvider(ctx).Search(params)
286279
}
287280

288281
func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
@@ -318,10 +311,6 @@ func (f *Active) IsIntersecting(from, to seq.MID) bool {
318311
}
319312

320313
func (f *Active) Release() {
321-
f.useMu.Lock()
322-
f.released = true
323-
f.useMu.Unlock()
324-
325314
f.releaseMem()
326315

327316
if !f.Config.KeepMetaFile {
@@ -334,35 +323,6 @@ func (f *Active) Release() {
334323
}
335324
}
336325

337-
// Offload for [Active] fraction is no-op.
338-
//
339-
// Since search within [Active] fraction is too costly (we have to replay the whole index in memory),
340-
// we decided to support offloading only for [Sealed] fractions.
341-
func (f *Active) Offload(context.Context, storage.Uploader) (bool, error) {
342-
return false, nil
343-
}
344-
345-
func (f *Active) Suicide() {
346-
f.useMu.Lock()
347-
released := f.released
348-
f.suicided = true
349-
f.released = true
350-
f.useMu.Unlock()
351-
352-
if released { // fraction can be suicided after release
353-
if f.Config.KeepMetaFile {
354-
f.removeMetaFile() // meta was not removed while release
355-
}
356-
if f.Config.SkipSortDocs {
357-
f.removeDocsFiles() // docs was not removed while release
358-
}
359-
} else { // was not release
360-
f.releaseMem()
361-
f.removeMetaFile()
362-
f.removeDocsFiles()
363-
}
364-
}
365-
366326
func (f *Active) releaseMem() {
367327
f.writer.Stop()
368328
f.TokenList.Stop()

frac/active_indexer.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ type ActiveIndexer struct {
1717
ch chan *indexTask
1818
chMerge chan *mergeTask
1919
workerCount int
20-
21-
stopFn func()
2220
}
2321

2422
type indexTask struct {
@@ -33,12 +31,14 @@ type mergeTask struct {
3331
tokenLIDs *TokenLIDs
3432
}
3533

36-
func NewActiveIndexer(workerCount, chLen int) *ActiveIndexer {
37-
return &ActiveIndexer{
34+
func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) {
35+
idx := ActiveIndexer{
3836
ch: make(chan *indexTask, chLen),
3937
chMerge: make(chan *mergeTask, chLen),
4038
workerCount: workerCount,
4139
}
40+
stopIdx := idx.start()
41+
return &idx, stopIdx
4242
}
4343

4444
func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) {
@@ -52,7 +52,7 @@ func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, s
5252
m.Stop()
5353
}
5454

55-
func (ai *ActiveIndexer) Start() {
55+
func (ai *ActiveIndexer) start() func() {
5656
wg := sync.WaitGroup{}
5757
wg.Add(ai.workerCount)
5858

@@ -71,13 +71,10 @@ func (ai *ActiveIndexer) Start() {
7171
}()
7272
}
7373

74-
ai.stopFn = func() {
74+
return func() {
7575
close(ai.ch)
7676
close(ai.chMerge)
77-
7877
wg.Wait()
79-
80-
ai.stopFn = nil
8178
}
8279
}
8380

@@ -87,12 +84,6 @@ func (ai *ActiveIndexer) mergeWorker() {
8784
}
8885
}
8986

90-
func (ai *ActiveIndexer) Stop() {
91-
if ai.stopFn != nil {
92-
ai.stopFn()
93-
}
94-
}
95-
9687
var metaDataPool = sync.Pool{
9788
New: func() any {
9889
return new(MetaData)

frac/fraction.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,14 @@ import (
99
"github.com/ozontech/seq-db/frac/common"
1010
"github.com/ozontech/seq-db/frac/processor"
1111
"github.com/ozontech/seq-db/seq"
12-
"github.com/ozontech/seq-db/storage"
1312
)
1413

15-
type DataProvider interface {
16-
Fetch([]seq.ID) ([][]byte, error)
17-
Search(processor.SearchParams) (*seq.QPR, error)
18-
}
19-
2014
type Fraction interface {
2115
Info() *common.Info
2216
IsIntersecting(from seq.MID, to seq.MID) bool
2317
Contains(mid seq.MID) bool
24-
DataProvider(context.Context) (DataProvider, func())
25-
Offload(ctx context.Context, u storage.Uploader) (bool, error)
26-
Suicide()
18+
Fetch(context.Context, []seq.ID) ([][]byte, error)
19+
Search(context.Context, processor.SearchParams) (*seq.QPR, error)
2720
}
2821

2922
func fracToString(f Fraction, fracType string) string {

frac/remote.go

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ import (
1111
"github.com/ozontech/seq-db/cache"
1212
"github.com/ozontech/seq-db/consts"
1313
"github.com/ozontech/seq-db/frac/common"
14+
"github.com/ozontech/seq-db/frac/processor"
1415
"github.com/ozontech/seq-db/frac/sealed"
1516
"github.com/ozontech/seq-db/frac/sealed/lids"
1617
"github.com/ozontech/seq-db/frac/sealed/seqids"
1718
"github.com/ozontech/seq-db/frac/sealed/token"
1819
"github.com/ozontech/seq-db/logger"
19-
"github.com/ozontech/seq-db/metric"
2020
"github.com/ozontech/seq-db/seq"
2121
"github.com/ozontech/seq-db/storage"
2222
"github.com/ozontech/seq-db/storage/s3"
@@ -41,9 +41,6 @@ type Remote struct {
4141

4242
info *common.Info
4343

44-
useMu sync.RWMutex
45-
suicided bool
46-
4744
docsFile storage.ImmutableFile
4845
docsCache *cache.Cache[[]byte]
4946
docsReader storage.DocsReader
@@ -114,37 +111,20 @@ func (f *Remote) Contains(mid seq.MID) bool {
114111
return f.info.IsIntersecting(mid, mid)
115112
}
116113

117-
func (f *Remote) DataProvider(ctx context.Context) (DataProvider, func()) {
118-
f.useMu.RLock()
119-
120-
if f.suicided {
121-
metric.CountersTotal.WithLabelValues("fraction_suicided").Inc()
122-
f.useMu.RUnlock()
123-
return EmptyDataProvider{}, func() {}
124-
}
125-
126-
defer func() {
127-
if panicData := recover(); panicData != nil {
128-
f.useMu.RUnlock()
129-
panic(panicData)
130-
}
131-
}()
132-
133-
if err := f.load(); err != nil {
134-
logger.Error(
135-
"will create empty data provider: cannot load remote fraction",
136-
zap.String("fraction", f.Info().Name()),
137-
zap.Error(err),
138-
)
139-
f.useMu.RUnlock()
140-
return EmptyDataProvider{}, func() {}
114+
func (f *Remote) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
115+
dp, err := f.createDataProvider(ctx)
116+
if err != nil {
117+
return nil, err
141118
}
119+
return dp.Fetch(ids)
120+
}
142121

143-
dp := f.createDataProvider(ctx)
144-
return dp, func() {
145-
dp.release()
146-
f.useMu.RUnlock()
122+
func (f *Remote) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
123+
dp, err := f.createDataProvider(ctx)
124+
if err != nil {
125+
return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, err
147126
}
127+
return dp.Search(params)
148128
}
149129

150130
func (f *Remote) Info() *common.Info {
@@ -155,15 +135,7 @@ func (f *Remote) IsIntersecting(from, to seq.MID) bool {
155135
return f.info.IsIntersecting(from, to)
156136
}
157137

158-
func (f *Remote) Offload(context.Context, storage.Uploader) (bool, error) {
159-
panic("BUG: remote fraction cannot be offloaded")
160-
}
161-
162138
func (f *Remote) Suicide() {
163-
f.useMu.Lock()
164-
f.suicided = true
165-
f.useMu.Unlock()
166-
167139
util.MustRemoveFileByPath(f.BaseFileName + consts.RemoteFractionSuffix)
168140

169141
f.docsCache.Release()
@@ -189,7 +161,15 @@ func (f *Remote) String() string {
189161
return fracToString(f, "remote")
190162
}
191163

192-
func (f *Remote) createDataProvider(ctx context.Context) *sealedDataProvider {
164+
func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, error) {
165+
if err := f.load(); err != nil {
166+
logger.Error(
167+
"will create empty data provider: cannot load remote fraction",
168+
zap.String("fraction", f.Info().Name()),
169+
zap.Error(err),
170+
)
171+
return nil, err
172+
}
193173
return &sealedDataProvider{
194174
ctx: ctx,
195175
info: f.info,
@@ -210,7 +190,7 @@ func (f *Remote) createDataProvider(ctx context.Context) *sealedDataProvider {
210190
&f.blocksData.IDsTable,
211191
f.info.BinaryDataVer,
212192
),
213-
}
193+
}, nil
214194
}
215195

216196
func (f *Remote) load() error {

0 commit comments

Comments
 (0)