Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement meta cache #2371

Merged
merged 11 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve code
  • Loading branch information
FinalT committed Aug 10, 2023
commit 3fff2884a039c9ebbf7b730b542907f4c58e9fb7
4 changes: 2 additions & 2 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ const (

// default meta cache config
const (
DefaultMetaCacheName = "meta"
DefaultMetaFileName = ".matadata"
DefaultMetaCacheName = "dubbo.meta"
DefaultMetaFileName = "dubbo.metadata"
DefaultEntrySize = 100
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package servicediscovery

import (
"reflect"
"time"
)

import (
Expand All @@ -42,7 +41,11 @@ var (
)

func init() {
metaCache, _ = store.NewCacheManager(constant.DefaultMetaCacheName, constant.DefaultMetaFileName, 10*time.Minute, constant.DefaultEntrySize)
cache, err := store.NewCacheManager(constant.DefaultMetaCacheName, constant.DefaultMetaFileName, constant.DefaultEntrySize)
if err != nil {
logger.Fatal("Failed to create cache [%s],the err is %v", constant.DefaultMetaCacheName, err)
}
metaCache = cache
}

// ServiceInstancesChangedListenerImpl The Service Discovery Changed Event Listener
Expand Down Expand Up @@ -122,6 +125,7 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error
for revision, metadataInfo := range newRevisionToMetadata {
metaCache.Set(revision, metadataInfo)
}
metaCache.DumpCache()

for serviceInfo, revisions := range localServiceToRevisions {
revisionsToUrls := protocolRevisionsToUrls[serviceInfo.Protocol]
Expand Down Expand Up @@ -235,6 +239,9 @@ func GetMetadataInfo(instance registry.ServiceInstance, revision string) (*commo
return nil, err
}
}

metaCache.Set(revision, metadataInfo)
metaCache.DumpCache()

return metadataInfo, nil
}
50 changes: 14 additions & 36 deletions registry/servicediscovery/store/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/gob"
"os"
"sync"
"time"
)

import (
Expand All @@ -31,12 +30,10 @@ import (
)

type CacheManager struct {
name string // The name of the cache manager
cacheFile string // The file path where the cache is stored
cacheExpired time.Duration // The duration after which the cache expires
stop chan struct{} // Channel used to stop the cache expiration routine
cache *lru.Cache // The LRU cache implementation
lock sync.Mutex
name string // The name of the cache manager
cacheFile string // The file path where the cache is stored
cache *lru.Cache // The LRU cache implementation
lock sync.Mutex
}

type Item struct {
Expand All @@ -46,22 +43,23 @@ type Item struct {

// NewCacheManager creates a new CacheManager instance.
// It initializes the cache manager with the provided parameters and starts a routine for cache expiration.
func NewCacheManager(name, cacheFile string, cacheExpired time.Duration, maxCacheSize int) (*CacheManager, error) {
func NewCacheManager(name, cacheFile string, maxCacheSize int) (*CacheManager, error) {
cm := &CacheManager{
name: name,
cacheFile: cacheFile,
cacheExpired: cacheExpired,
stop: make(chan struct{}),
name: name,
cacheFile: cacheFile,
}
cm.cache, _ = lru.New(maxCacheSize)
cache, err := lru.New(maxCacheSize)
if err != nil {
return nil, err
}
cm.cache = cache

// Check if the cache file exists and load the cache if it does
if _, err := os.Stat(cacheFile); err == nil {
if err = cm.loadCache(); err != nil {
logger.Warnf("Failed to load the cache file:[%s].The err is %v", cm.cacheFile, err)
}
}
go cm.RunDumpTask()

return cm, nil
}
Expand Down Expand Up @@ -118,7 +116,7 @@ func (cm *CacheManager) loadCache() error {
}

// dumpCache dumps the cache to the cache file.
func (cm *CacheManager) dumpCache() error {
func (cm *CacheManager) DumpCache() error {

cm.lock.Lock()
defer cm.lock.Unlock()
Expand All @@ -145,29 +143,9 @@ func (cm *CacheManager) dumpCache() error {

}

func (cm *CacheManager) RunDumpTask() {
ticker := time.NewTicker(cm.cacheExpired)
for {
select {
case <-ticker.C:
// Dump the cache to the file
if err := cm.dumpCache(); err != nil {
// Handle error
logger.Warnf("Failed to dump cache,the err is %v", err)
} else {
logger.Infof("Dumping [%s] caches, latest entries %d", cm.name, cm.cache.Len())
}
case <-cm.stop:
ticker.Stop()
return
}
}
}

// destroy stops the cache expiration routine, clears the cache and removes the cache file.
func (cm *CacheManager) destroy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块逻辑看着没有地方使用,是否需要?

cm.stop <- struct{}{} // Stop the cache expiration routine
cm.cache.Purge() // Clear the cache
cm.cache.Purge() // Clear the cache

// Delete the cache file if it exists
if _, err := os.Stat(cm.cacheFile); err == nil {
Expand Down
14 changes: 8 additions & 6 deletions registry/servicediscovery/store/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestCacheManager(t *testing.T) {
cm, err := NewCacheManager("test", "test_cache", time.Second, 10)
cm, err := NewCacheManager("test", "test_cache", 10)
if err != nil {
t.Fatalf("failed to create cache manager: %v", err)
}
Expand Down Expand Up @@ -58,13 +58,14 @@ func TestCacheManager(t *testing.T) {
}

// Test cache file creation and loading
cm2, err := NewCacheManager("test2", "nonexistent_cache", time.Second, 10)
cm2, err := NewCacheManager("test2", "nonexistent_cache", 10)
if err != nil {
t.Fatalf("failed to create cache manager: %v", err)
}
cm2.Set("key4", "value4")
cm2.DumpCache()
time.Sleep(time.Second * 4)
cm3, err := NewCacheManager("test3", "nonexistent_cache", time.Second, 10)
cm3, err := NewCacheManager("test3", "nonexistent_cache", 10)
if err != nil {
t.Fatalf("failed to create cache manager: %v", err)
}
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestMetaInfoCacheManager(t *testing.T) {
metadataInfo3 := common.NewMetadataInfo("3", "3", serverInfo)
metadataInfo4 := common.NewMetadataInfo("4", "4", serverInfo)

cm, err := NewCacheManager("metaTest1", "test_meta_cache", time.Second, 10)
cm, err := NewCacheManager("metaTest1", "test_meta_cache", 10)
if err != nil {
t.Fatalf("failed to create cache manager: %v", err)
}
Expand Down Expand Up @@ -124,13 +125,14 @@ func TestMetaInfoCacheManager(t *testing.T) {
}

// Test cache file creation and loading
cm2, err := NewCacheManager("metaTest2", "nonexistent_meta_cache", time.Second, 10)
cm2, err := NewCacheManager("metaTest2", "nonexistent_meta_cache", 10)
if err != nil {
t.Fatalf("failed to create cache manager: %v", err)
}
cm2.Set("key4", metadataInfo4)
cm2.DumpCache()
time.Sleep(time.Second * 4)
cm3, err := NewCacheManager("test3", "nonexistent_meta_cache", time.Second, 10)
cm3, err := NewCacheManager("test3", "nonexistent_meta_cache", 10)
if err != nil {
t.Fatalf("failed to create cache manager: %v", err)
}
Expand Down