Skip to content

Commit 4868f9f

Browse files
authored
ct-fetch: pipeline cache insertions (#363)
* certdatabase: move cache-modifying SerialCacheWriter methods to CertDatabase * storage: add SetInsertMany * storage: use built-in batching for SRem * ct-fetch: queue set member additions in insertCTWorker * storage: remove unused cache member from SerialCacheKey
1 parent c900186 commit 4868f9f

File tree

7 files changed

+175
-162
lines changed

7 files changed

+175
-162
lines changed

go/cmd/ct-fetch/ct-fetch.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -194,24 +194,20 @@ func (ld *LogSyncEngine) Wait() {
194194
ld.ThreadWaitGroup.Wait()
195195
}
196196

197-
func (ld *LogSyncEngine) tryUpdate(ep *LogSyncMessage) bool {
198-
entry := *ep
199-
200-
if entry.Certificate != nil && entry.Issuer != nil {
201-
err := ld.database.Store(entry.Certificate, entry.Issuer)
202-
if err != nil {
203-
glog.Errorf("Problem inserting certificate (serial=%s): %s", types.NewSerial(entry.Certificate).String(), err)
204-
return false
205-
}
197+
func (ld *LogSyncEngine) tryUpdate(queue []storage.SetMemberWithExpiry, logState *types.CTLogState) bool {
198+
err := ld.database.Store(queue)
199+
if err != nil {
200+
glog.Errorf("Problem inserting certificates: %s", err)
201+
return false
206202
}
207203

208-
if entry.LogState != nil {
209-
err := ld.database.SaveLogState(entry.LogState)
204+
if logState != nil {
205+
err = ld.database.SaveLogState(logState)
210206
if err != nil {
211-
glog.Errorf("Problem saving log state (%s): %s", entry.LogState, err)
207+
glog.Errorf("Problem saving log state (%s): %s", logState, err)
212208
return false
213209
}
214-
glog.Infof("[%s] Saved log state: %s", entry.LogState.ShortURL, entry.LogState)
210+
glog.Infof("[%s] Saved log state: %s", logState.ShortURL, logState)
215211
}
216212

217213
return true
@@ -226,6 +222,9 @@ func (ld *LogSyncEngine) insertCTWorker(ctx context.Context) {
226222
healthStatusTicker := time.NewTicker(healthStatusPeriod)
227223
defer healthStatusTicker.Stop()
228224

225+
batchSize := *ctconfig.BatchSize
226+
queue := make([]storage.SetMemberWithExpiry, 0, batchSize)
227+
229228
for ep := range ld.entryChan {
230229
select { // Taking something off the queue is useful work.
231230
// So indicate server health when requested.
@@ -234,8 +233,20 @@ func (ld *LogSyncEngine) insertCTWorker(ctx context.Context) {
234233
default:
235234
}
236235

236+
if ep.Certificate != nil && ep.Issuer != nil {
237+
item := ld.database.PrepareSetMember(ep.Certificate, ep.Issuer)
238+
queue = append(queue, item)
239+
}
240+
241+
// Only dispatch the queue if it is full or if we're updating
242+
// the log state
243+
if uint64(len(queue)+1) < batchSize && ep.LogState == nil {
244+
continue
245+
}
246+
237247
for {
238-
if ld.tryUpdate(&ep) {
248+
if ld.tryUpdate(queue, ep.LogState) {
249+
queue = queue[:0]
239250
break
240251
}
241252

go/storage/certdatabase.go

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -276,14 +276,15 @@ func (db *CertDatabase) GetLogState(aUrl *url.URL) (*types.CTLogState, error) {
276276
}, nil
277277
}
278278

279-
func (db *CertDatabase) Store(aCert *x509.Certificate, aIssuer *x509.Certificate) error {
280-
expDate := types.NewExpDateFromTime(aCert.NotAfter)
279+
func (db *CertDatabase) PrepareSetMember(aCertificate, aIssuer *x509.Certificate) SetMemberWithExpiry {
280+
expDate := types.NewExpDateFromTime(aCertificate.NotAfter)
281281
issuer := types.NewIssuer(aIssuer)
282-
serialWriter := db.GetSerialCacheAccessor(expDate, issuer)
283-
284-
serial := types.NewSerial(aCert)
282+
serial := types.NewSerial(aCertificate)
283+
return db.GetSerialCacheKey(expDate, issuer).NewMember(serial)
284+
}
285285

286-
_, err := serialWriter.Insert(serial)
286+
func (db *CertDatabase) Store(items []SetMemberWithExpiry) error {
287+
err := db.cache.SetInsertMany(items)
287288
if err != nil {
288289
return err
289290
}
@@ -330,15 +331,15 @@ func (db *CertDatabase) GetCTLogsFromStorage() ([]types.CTLogState, error) {
330331
return ctLogList, nil
331332
}
332333

333-
func (db *CertDatabase) GetSerialCacheAccessor(aExpDate types.ExpDate, aIssuer types.Issuer) *SerialCacheWriter {
334-
var kc *SerialCacheWriter
334+
func (db *CertDatabase) GetSerialCacheKey(aExpDate types.ExpDate, aIssuer types.Issuer) *SerialCacheKey {
335+
var kc *SerialCacheKey
335336

336337
id := aIssuer.ID() + aExpDate.ID()
337338

338339
cacheObj, err := db.cacheAccessors.GetIFPresent(id)
339340
if err != nil {
340341
if err == gcache.KeyNotFoundError {
341-
kc = NewSerialCacheWriter(aExpDate, aIssuer, db.cache)
342+
kc = NewSerialCacheKey(aExpDate, aIssuer)
342343
err = db.cacheAccessors.Set(id, kc)
343344
if err != nil {
344345
glog.Fatalf("Couldn't set into the cache expDate=%s issuer=%s from cache: %s",
@@ -349,7 +350,7 @@ func (db *CertDatabase) GetSerialCacheAccessor(aExpDate types.ExpDate, aIssuer t
349350
aExpDate, aIssuer.ID(), err)
350351
}
351352
} else {
352-
kc = cacheObj.(*SerialCacheWriter)
353+
kc = cacheObj.(*SerialCacheKey)
353354
}
354355

355356
if kc == nil {
@@ -359,8 +360,7 @@ func (db *CertDatabase) GetSerialCacheAccessor(aExpDate types.ExpDate, aIssuer t
359360
}
360361

361362
func (db *CertDatabase) ReadSerialsFromCache(aExpDate types.ExpDate, aIssuer types.Issuer) []types.Serial {
362-
accessor := db.GetSerialCacheAccessor(aExpDate, aIssuer)
363-
return accessor.List()
363+
return db.List(db.GetSerialCacheKey(aExpDate, aIssuer))
364364
}
365365

366366
func (db *CertDatabase) ReadSerialsFromStorage(aExpDate types.ExpDate, aIssuer types.Issuer) ([]types.Serial, error) {
@@ -434,8 +434,8 @@ func (db *CertDatabase) moveOneBinOfCachedSerialsToStorage(aTmpDir string, aExpD
434434
}
435435

436436
// It's now safe to remove cachedSerials from the cache.
437-
cacheWriter := db.GetSerialCacheAccessor(aExpDate, aIssuer)
438-
err = cacheWriter.RemoveMany(cachedSerials)
437+
key := db.GetSerialCacheKey(aExpDate, aIssuer)
438+
err = db.RemoveMany(key, cachedSerials)
439439
if err != nil {
440440
glog.Warningf("Failed to remove serial from cache: %s", err)
441441
}
@@ -698,3 +698,68 @@ func (db *CertDatabase) Commit(aProofOfLock string) error {
698698
func (db *CertDatabase) AddPreIssuerAlias(aPreIssuer types.Issuer, aIssuer types.Issuer) error {
699699
return db.cache.AddPreIssuerAlias(aPreIssuer, aIssuer)
700700
}
701+
702+
// Returns true if this serial was unknown. Subsequent calls with the same serial
703+
// will return false, as it will be known then.
704+
func (db *CertDatabase) Insert(k *SerialCacheKey, aSerial types.Serial) (bool, error) {
705+
result, err := db.cache.SetInsert(k.ID(), aSerial.BinaryString())
706+
if err != nil {
707+
return false, err
708+
}
709+
710+
if !k.expirySet {
711+
expireTime := k.expDate.ExpireTime()
712+
if err := db.cache.ExpireAt(k.ID(), expireTime); err != nil {
713+
glog.Errorf("Couldn't set expiration time %v for serials %s: %v", expireTime, k.ID(), err)
714+
} else {
715+
k.expirySet = true
716+
}
717+
}
718+
719+
return result, nil
720+
}
721+
722+
func (db *CertDatabase) RemoveMany(k *SerialCacheKey, aSerials []types.Serial) error {
723+
// Removing an element of a set may leave the set empty. Redis
724+
// automatically deletes empty sets, so assume that we need to reset
725+
// the ExpireAt time for this set on the next Insert call.
726+
k.expirySet = false
727+
serialStrings := make([]string, len(aSerials))
728+
for i := 0; i < len(aSerials); i++ {
729+
serialStrings[i] = aSerials[i].BinaryString()
730+
}
731+
return db.cache.SetRemove(k.ID(), serialStrings)
732+
}
733+
734+
func (db *CertDatabase) List(k *SerialCacheKey) []types.Serial {
735+
// Redis' scan methods regularly provide duplicates. The duplication
736+
// happens at this level, pulling from SetToChan, so we make a hash-set
737+
// here to de-duplicate when the memory impacts are the most minimal.
738+
serials := make(map[string]struct{})
739+
var count int
740+
741+
strChan := make(chan string)
742+
go func() {
743+
err := db.cache.SetToChan(k.ID(), strChan)
744+
if err != nil {
745+
glog.Fatalf("Error obtaining list of known certificates: %v", err)
746+
}
747+
}()
748+
749+
for str := range strChan {
750+
serials[str] = struct{}{}
751+
count += 1
752+
}
753+
754+
serialList := make([]types.Serial, 0, count)
755+
for str := range serials {
756+
bs, err := types.NewSerialFromBinaryString(str)
757+
if err != nil {
758+
glog.Errorf("Failed to populate serial str=[%s] %v", str, err)
759+
continue
760+
}
761+
serialList = append(serialList, bs)
762+
}
763+
764+
return serialList
765+
}

go/storage/certdatabase_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ func cacheSerial(t *testing.T, db CertDatabase, expDateStr string, issuerStr str
4242
}
4343
serial := types.NewSerialFromHex(serialStr)
4444

45-
kc := db.GetSerialCacheAccessor(expDate, issuer)
46-
_, err = kc.Insert(serial)
45+
_, err = db.Insert(db.GetSerialCacheKey(expDate, issuer), serial)
4746
if err != nil {
4847
t.Error(err)
4948
}
@@ -57,8 +56,9 @@ func isCached(t *testing.T, db CertDatabase, expDateStr string, issuerStr string
5756
}
5857
serial := types.NewSerialFromHex(serialStr)
5958

60-
kc := db.GetSerialCacheAccessor(expDate, issuer)
61-
cached, err := kc.Contains(serial)
59+
kc := db.GetSerialCacheKey(expDate, issuer)
60+
61+
cached, err := db.cache.SetContains(kc.ID(), serial.BinaryString())
6262
if err != nil {
6363
t.Error(err)
6464
}

go/storage/mockcache.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,16 @@ func (ec *MockRemoteCache) SetInsert(key string, entry string) (bool, error) {
7070
return true, nil
7171
}
7272

73+
func (ec *MockRemoteCache) SetInsertMany(items []SetMemberWithExpiry) error {
74+
for _, item := range items {
75+
_, err := ec.SetInsert(item.Key, item.Value)
76+
if err != nil {
77+
return err
78+
}
79+
}
80+
return nil
81+
}
82+
7383
func (ec *MockRemoteCache) setRemove(key string, entry string) error {
7484
ec.mu.Lock()
7585
defer ec.mu.Unlock()

go/storage/rediscache.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,26 @@ func (rc *RedisCache) SetInsert(key string, entry string) (bool, error) {
8080
return added == 1, err
8181
}
8282

83+
func (rc *RedisCache) SetInsertMany(items []SetMemberWithExpiry) error {
84+
_, err := rc.client.Pipelined(func(pipe redis.Pipeliner) error {
85+
for _, item := range items {
86+
err := pipe.SAdd(item.Key, item.Value).Err()
87+
if err != nil {
88+
return err
89+
}
90+
err = pipe.ExpireAt(item.Key, item.Expiry).Err()
91+
if err != nil {
92+
glog.Errorf("Couldn't set expiration time %v for serials %s: %v", item.Expiry, item.Key, err)
93+
}
94+
}
95+
return nil
96+
})
97+
if err != nil {
98+
return err
99+
}
100+
return nil
101+
}
102+
83103
func (rc *RedisCache) SetRemove(key string, entries []string) error {
84104
batchSize := 1024
85105
for batchStart := 0; batchStart < len(entries); batchStart += batchSize {
@@ -88,15 +108,7 @@ func (rc *RedisCache) SetRemove(key string, entries []string) error {
88108
batchEnd = len(entries)
89109
}
90110
batch := entries[batchStart:batchEnd]
91-
_, err := rc.client.Pipelined(func(pipe redis.Pipeliner) error {
92-
for _, entry := range batch {
93-
err := pipe.SRem(key, entry).Err()
94-
if err != nil {
95-
return err
96-
}
97-
}
98-
return nil
99-
})
111+
err := rc.client.SRem(key, batch).Err()
100112
if err != nil {
101113
return err
102114
}

0 commit comments

Comments
 (0)