Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement meta cache #2371

Merged
merged 11 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/google/uuid v1.3.0
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/vault/sdk v0.7.0
github.com/influxdata/tdigest v0.0.1
github.com/jinzhu/copier v0.3.5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package servicediscovery

import (
"reflect"
"time"
)

import (
Expand All @@ -32,9 +33,24 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/store"
"dubbo.apache.org/dubbo-go/v3/remoting"
)

var (
metaCache *store.CacheManager
)

const (
defaultCacheName = "meta"
FinalT marked this conversation as resolved.
Show resolved Hide resolved
defaultFileName = ".matadata"
defaultEntrySize = 100
)

func init() {
metaCache, _ = store.NewCacheManager("mata", defaultFileName, 10*time.Minute, defaultEntrySize)
}

// ServiceInstancesChangedListenerImpl The Service Discovery Changed Event Listener
type ServiceInstancesChangedListenerImpl struct {
serviceNames *gxset.HashSet
Expand Down Expand Up @@ -88,9 +104,14 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error
revisionToInstances[revision] = append(subInstances, instance)
metadataInfo := lstn.revisionToMetadata[revision]
if metadataInfo == nil {
metadataInfo, err = GetMetadataInfo(instance, revision)
if err != nil {
return err
if val, ok := metaCache.Get(revision); ok {
metadataInfo = val.(*common.MetadataInfo)
} else {
metadataInfo, err = GetMetadataInfo(instance, revision)
if err != nil {
return err
}
metaCache.Set(revision, metadataInfo)
FinalT marked this conversation as resolved.
Show resolved Hide resolved
}
}
instance.SetServiceMetadata(metadataInfo)
Expand All @@ -104,6 +125,9 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error
newRevisionToMetadata[revision] = metadataInfo
}
lstn.revisionToMetadata = newRevisionToMetadata
for revision, metadataInfo := range newRevisionToMetadata {
metaCache.Set(revision, metadataInfo)
}

for serviceInfo, revisions := range localServiceToRevisions {
revisionsToUrls := protocolRevisionsToUrls[serviceInfo.Protocol]
Expand Down Expand Up @@ -187,6 +211,11 @@ func (lstn *ServiceInstancesChangedListenerImpl) GetEventType() reflect.Type {

// GetMetadataInfo get metadata info when MetadataStorageTypePropertyName is null
func GetMetadataInfo(instance registry.ServiceInstance, revision string) (*common.MetadataInfo, error) {

if metadataInfo, ok := metaCache.Get(revision); ok {
return metadataInfo.(*common.MetadataInfo), nil
}

var metadataStorageType string
var metadataInfo *common.MetadataInfo
if instance.GetMetadata() == nil {
Expand All @@ -212,5 +241,6 @@ func GetMetadataInfo(instance registry.ServiceInstance, revision string) (*commo
return nil, err
}
}
metaCache.Set(revision, metadataInfo)
return metadataInfo, nil
}
178 changes: 178 additions & 0 deletions registry/servicediscovery/store/cache_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package store

import (
"encoding/gob"
"os"
"sync"
"time"
)

import (
"github.com/dubbogo/gost/log/logger"

"github.com/hashicorp/golang-lru"
)

type CacheManager struct {
name string // The name of the cache manager
cacheFile string // The file path where the cache is stored
cacheExpired time.Duration // The duration after which the cache expires
stop chan struct{} // Channel used to stop the cache expiration routine
cache *lru.Cache // The LRU cache implementation
lock sync.Mutex
}

type Item struct {
Key string
Value interface{}
}

// NewCacheManager creates a new CacheManager instance.
// It initializes the cache manager with the provided parameters and starts a routine for cache expiration.
func NewCacheManager(name, cacheFile string, cacheExpired time.Duration, maxCacheSize int) (*CacheManager, error) {
cm := &CacheManager{
name: name,
cacheFile: cacheFile,
cacheExpired: cacheExpired,
stop: make(chan struct{}),
}
cm.cache, _ = lru.New(maxCacheSize)
FinalT marked this conversation as resolved.
Show resolved Hide resolved

// Check if the cache file exists and load the cache if it does
if _, err := os.Stat(cacheFile); err == nil {
if err = cm.loadCache(); err != nil {
logger.Warnf("Failed to load the cache file:[%s].The err is %v", cm.cacheFile, err)
}
}
go cm.RunDumpTask()

return cm, nil
}

// Get retrieves the value associated with the given key from the cache.
func (cm *CacheManager) Get(key string) (interface{}, bool) {
return cm.cache.Get(key)
}

// Set sets the value associated with the given key in the cache.
func (cm *CacheManager) Set(key string, value interface{}) {
cm.cache.Add(key, value)
}

// Delete removes the value associated with the given key from the cache.
func (cm *CacheManager) Delete(key string) {
cm.cache.Remove(key)
}
FinalT marked this conversation as resolved.
Show resolved Hide resolved

// GetAll returns all the key-value pairs in the cache.
func (cm *CacheManager) GetAll() map[string]interface{} {
keys := cm.cache.Keys()

result := make(map[string]interface{})
for _, k := range keys {
result[k.(string)], _ = cm.cache.Get(k)
}

return result
}

// loadCache loads the cache from the cache file.
func (cm *CacheManager) loadCache() error {
cf, err := os.Open(cm.cacheFile)
if err != nil {
return err
}

decoder := gob.NewDecoder(cf)
for {
var it Item
err = decoder.Decode(&it)
if err != nil {
if err.Error() == "EOF" {
break // Reached end of file
}
return err
}
// Add the loaded keys to the front of the LRU list
cm.cache.Add(it.Key, it.Value)
}

return cf.Close()
}

// dumpCache dumps the cache to the cache file.
func (cm *CacheManager) dumpCache() error {

cm.lock.Lock()
defer cm.lock.Unlock()

items := cm.GetAll()

file, err := os.Create(cm.cacheFile)
if err != nil {
return err
}

encoder := gob.NewEncoder(file)
for k, v := range items {
gob.Register(v)
err = encoder.Encode(&Item{
Key: k,
Value: v,
})
if err != nil {
return err
}
}
return file.Close()

}

func (cm *CacheManager) RunDumpTask() {
ticker := time.NewTicker(cm.cacheExpired)
FinalT marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ticker.C:
// Dump the cache to the file
if err := cm.dumpCache(); err != nil {
// Handle error
logger.Warnf("Failed to dump cache,the err is %v", err)
} else {
logger.Infof("Dumping [%s] caches, latest entries %d", cm.name, cm.cache.Len())
}
case <-cm.stop:
ticker.Stop()
return
}
}
}
FinalT marked this conversation as resolved.
Show resolved Hide resolved

// destroy stops the cache expiration routine, clears the cache and removes the cache file.
func (cm *CacheManager) destroy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块逻辑看着没有地方使用,是否需要?

cm.stop <- struct{}{} // Stop the cache expiration routine
cm.cache.Purge() // Clear the cache

// Delete the cache file if it exists
if _, err := os.Stat(cm.cacheFile); err == nil {
if err := os.Remove(cm.cacheFile); err == nil {
logger.Infof("The cacheFile [%s] was cleared", cm.cacheFile)
}
}
}
Loading
Loading