Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize startup for filtered consumers when creating pending count. #2075

Merged
merged 1 commit into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

// Check if we have filtered subject that is a wildcard.
if config.FilterSubject != _EMPTY_ && !subjectIsLiteral(config.FilterSubject) {
if config.FilterSubject != _EMPTY_ && subjectHasWildcard(config.FilterSubject) {
o.filterWC = true
}

Expand Down Expand Up @@ -2909,17 +2909,7 @@ func (o *consumer) setInitialPending() {
}
} else {
// Here we are filtered.
// FIXME(dlc) - This could be slow with O(n)
for seq := o.sseq; ; seq++ {
subj, _, _, _, err := o.mset.store.LoadMsg(seq)
if err == ErrStoreMsgNotFound {
continue
} else if err == ErrStoreEOF {
break
} else if err == nil && o.isFilteredMatch(subj) {
o.sgap++
}
}
o.sgap = o.mset.store.NumFilteredPending(o.sseq, o.cfg.FilterSubject)
}
}

Expand Down
133 changes: 130 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type fileStore struct {
hh hash.Hash64
qch chan struct{}
cfs []*consumerFileStore
fsi map[string]seqSlice
fsis *simpleState
closed bool
expiring bool
fip bool
Expand Down Expand Up @@ -250,7 +252,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return nil, fmt.Errorf("could not create message storage directory - %v", err)
}
if err := os.MkdirAll(odir, 0755); err != nil {
return nil, fmt.Errorf("could not create message storage directory - %v", err)
return nil, fmt.Errorf("could not create consumer storage directory - %v", err)
}

// Create highway hash for message blocks. Use sha256 of directory as key.
Expand All @@ -260,11 +262,23 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return nil, fmt.Errorf("could not create hash: %v", err)
}

// Recover our state.
// Recover our message state.
if err := fs.recoverMsgs(); err != nil {
return nil, err
}

// Check to see if we have lots of messages and existing consumers.
// If they could be filtered we should generate an index here.
const lowWaterMarkMsgs = 8192
if fs.state.Msgs > lowWaterMarkMsgs {
// If we have one subject that is not a wildcard we can skip.
if !(len(cfg.Subjects) == 1 && subjectIsLiteral(cfg.Subjects[0])) {
if ofis, _ := ioutil.ReadDir(odir); len(ofis) > 0 {
fs.genFilterIndex()
}
}
}

// Write our meta data iff does not exist.
meta := path.Join(fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
Expand Down Expand Up @@ -663,6 +677,7 @@ func (fs *fileStore) recoverMsgs() error {
fs.startAgeChk()
fs.expireMsgsLocked()
}

return nil
}

Expand Down Expand Up @@ -700,6 +715,108 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
return 0
}

type seqSlice []uint64

func (x seqSlice) Len() int { return len(x) }
func (x seqSlice) Less(i, j int) bool { return x[i] < x[j] }
func (x seqSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] }

func (x seqSlice) Search(n uint64) int {
return sort.Search(len(x), func(i int) bool { return x[i] >= n })
}

type simpleState struct {
msgs, first, last uint64
}

// This will generate an index for us on startup to determine num pending for
// filtered consumers easier.
func (fs *fileStore) genFilterIndex() {
fs.mu.Lock()
defer fs.mu.Unlock()

fsi := make(map[string]seqSlice)

for _, mb := range fs.blks {
mb.loadMsgs()
mb.mu.Lock()
fseq, lseq := mb.first.seq, mb.last.seq
for seq := fseq; seq <= lseq; seq++ {
if sm, err := mb.cacheLookupWithLock(seq); sm != nil && err == nil {
fsi[sm.subj] = append(fsi[sm.subj], seq)
}
}
// Expire this cache before moving on.
mb.llts = 0
mb.expireCacheLocked()
mb.mu.Unlock()
}

fs.fsi = fsi
fs.fsis = &simpleState{fs.state.Msgs, fs.state.FirstSeq, fs.state.LastSeq}
}

// Clears out the filter index.
func (fs *fileStore) clearFilterIndex() {
fs.mu.Lock()
fs.fsi, fs.fsis = nil, nil
fs.mu.Unlock()
}

// Fetch our num filtered pending from our index.
// Lock should be held.
func (fs *fileStore) getNumFilteredPendingFromIndex(sseq uint64, subj string) (uint64, error) {
cstate := simpleState{fs.state.Msgs, fs.state.FirstSeq, fs.state.LastSeq}
if fs.fsis == nil || *fs.fsis != cstate {
fs.fsi, fs.fsis = nil, nil
return 0, errors.New("state changed, index not valid")
}
var total uint64
for tsubj, seqs := range fs.fsi {
if subjectIsSubsetMatch(tsubj, subj) {
total += uint64(len(seqs[seqs.Search(sseq):]))
}
}
return total, nil
}

// Returns number of messages matching the subject starting at sequence sseq.
func (fs *fileStore) NumFilteredPending(sseq uint64, subj string) (total uint64) {
fs.mu.RLock()
lseq := fs.state.LastSeq
if sseq < fs.state.FirstSeq {
sseq = fs.state.FirstSeq
}
if fs.fsi != nil {
if np, err := fs.getNumFilteredPendingFromIndex(sseq, subj); err == nil {
fs.mu.RUnlock()
return np
}
}
fs.mu.RUnlock()

if subj == _EMPTY_ {
if sseq <= lseq {
return lseq - sseq
}
return 0
}

var eq func(string, string) bool
if subjectHasWildcard(subj) {
eq = subjectIsSubsetMatch
} else {
eq = func(a, b string) bool { return a == b }
}

for seq := sseq; seq <= lseq; seq++ {
if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subj) {
total++
}
}
return total
}

// RegisterStorageUpdates registers a callback for updates to storage changes.
// It will present number of messages and bytes as a signed integer and an
// optional sequence number of the message if a single.
Expand Down Expand Up @@ -886,6 +1003,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
fs.startAgeChk()
}

// If we had an index cache wipe that out.
if fs.fsi != nil {
fs.fsi, fs.fsis = nil, nil
}

return nil
}

Expand Down Expand Up @@ -1119,6 +1241,11 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) {
fs.state.Msgs--
fs.state.Bytes -= msz

// If we had an index cache wipe that out.
if fs.fsi != nil {
fs.fsi, fs.fsis = nil, nil
}

// Now local mb updates.
mb.msgs--
mb.bytes -= msz
Expand Down Expand Up @@ -2407,7 +2534,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {

// Check to see if we are the last seq for this message block and are doing
// a linear scan. If that is true and we are not the last message block we can
// expire try to expire the cache.
// try to expire the cache.
mb.mu.RLock()
shouldTryExpire := mb != lmb && seq == mb.last.seq && mb.llseq == seq-1
mb.mu.RUnlock()
Expand Down
1 change: 1 addition & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
s.Warnf(" Error restoring Consumer state: %v", err)
}
}
mset.clearFilterIndex()
}

// Make sure to cleanup and old remaining snapshots.
Expand Down
31 changes: 31 additions & 0 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,37 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
return uint64(index) + ms.state.FirstSeq
}

// Returns number of messages matching the subject starting at sequence sseq.
func (ms *memStore) NumFilteredPending(sseq uint64, subj string) (total uint64) {
ms.mu.RLock()
defer ms.mu.RUnlock()

if sseq < ms.state.FirstSeq {
sseq = ms.state.FirstSeq
}

if subj == _EMPTY_ {
if sseq <= ms.state.LastSeq {
return ms.state.LastSeq - sseq
}
return 0
}

var eq func(string, string) bool
if subjectHasWildcard(subj) {
eq = subjectIsSubsetMatch
} else {
eq = func(a, b string) bool { return a == b }
}

for seq := sseq; seq <= ms.state.LastSeq; seq++ {
if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subj) {
total++
}
}
return total
}

// Will check the msg limit and drop firstSeq msg if needed.
// Lock should be held.
func (ms *memStore) enforceMsgLimit() {
Expand Down
1 change: 1 addition & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type StreamStore interface {
Compact(seq uint64) (uint64, error)
Truncate(seq uint64) error
GetSeqFromTime(t time.Time) uint64
NumFilteredPending(sseq uint64, subject string) uint64
State() StreamState
FastState(*StreamState)
Type() StorageType
Expand Down
10 changes: 10 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2083,6 +2083,16 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
return nil
}

// Clears out any filtered index from filestores.
func (mset *stream) clearFilterIndex() {
mset.mu.Lock()
defer mset.mu.Unlock()

if fs, ok := mset.store.(*fileStore); ok {
fs.clearFilterIndex()
}
}

// Called for any updates to the underlying stream. We pass through the bytes to the
// jetstream account. We do local processing for stream pending for consumers, but only
// for removals.
Expand Down