Skip to content

Commit

Permalink
Mod:code review
Browse files Browse the repository at this point in the history
  • Loading branch information
hxmhlt committed May 24, 2020
1 parent 0d78b2a commit c024ce1
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 41 deletions.
76 changes: 39 additions & 37 deletions metadata/report/delegate/delegate_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package delegate

import (
"encoding/json"
"runtime/debug"
"sync"
"time"
)
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewMetadataReport() (*MetadataReport, error) {
scheduler := gocron.NewScheduler(time.UTC)
_, err := scheduler.Every(1).Day().Do(
func() {
logger.Info("start to publish all metadata.")
logger.Info("start to publish all metadata in metadata report %v.", url)
bmr.allMetadataReportsLock.RLock()
bmr.doHandlerMetadataCollection(bmr.allMetadataReports)
bmr.allMetadataReportsLock.RUnlock()
Expand All @@ -145,30 +146,30 @@ func NewMetadataReport() (*MetadataReport, error) {
}

// retry will do metadata failed reports collection by call metadata report sdk
func (bmr *MetadataReport) retry() bool {
bmr.failedReportsLock.RLock()
defer bmr.failedReportsLock.RUnlock()
return bmr.doHandlerMetadataCollection(bmr.failedReports)
func (mr *MetadataReport) retry() bool {
mr.failedReportsLock.RLock()
defer mr.failedReportsLock.RUnlock()
return mr.doHandlerMetadataCollection(mr.failedReports)
}

// StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition
func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) {
if bmr.syncReport {
bmr.storeMetadataTask(common.PROVIDER, identifier, definer)
func (mr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) {
if mr.syncReport {
mr.storeMetadataTask(common.PROVIDER, identifier, definer)
}
go bmr.storeMetadataTask(common.PROVIDER, identifier, definer)
go mr.storeMetadataTask(common.PROVIDER, identifier, definer)
}

// storeMetadataTask will delegate to call remote metadata's sdk to store
func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) {
func (mr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) {
logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer)
bmr.allMetadataReportsLock.Lock()
bmr.allMetadataReports[identifier] = definer
bmr.allMetadataReportsLock.Unlock()
mr.allMetadataReportsLock.Lock()
mr.allMetadataReports[identifier] = definer
mr.allMetadataReportsLock.Unlock()

bmr.failedReportsLock.Lock()
delete(bmr.failedReports, identifier)
bmr.failedReportsLock.Unlock()
mr.failedReportsLock.Lock()
delete(mr.failedReports, identifier)
mr.failedReportsLock.Unlock()
// data is store the json marshaled definition
var (
data []byte
Expand All @@ -177,17 +178,18 @@ func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.Me

defer func() {
if r := recover(); r != nil {
bmr.failedReportsLock.Lock()
bmr.failedReports[identifier] = definer
bmr.failedReportsLock.Unlock()
bmr.metadataReportRetry.startRetryTask()
logger.Errorf("Failed to put provider metadata %v in %v, cause: %v", identifier, string(data), r)
mr.failedReportsLock.Lock()
mr.failedReports[identifier] = definer
mr.failedReportsLock.Unlock()
mr.metadataReportRetry.startRetryTask()
logger.Errorf("Failed to put provider metadata %v in %v, cause: %v\n%s\n",
identifier, string(data), r, string(debug.Stack()))
}
}()

data, err = json.Marshal(definer)
if err != nil {
logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %v", err)
logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %+v", err)
panic(err)
}
report := instance.GetMetadataReportInstance()
Expand All @@ -198,49 +200,49 @@ func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.Me
}

if err != nil {
logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %v", err)
logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %+v", err)
panic(err)
}
}

// StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition
func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) {
if bmr.syncReport {
bmr.storeMetadataTask(common.CONSUMER, identifier, definer)
func (mr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) {
if mr.syncReport {
mr.storeMetadataTask(common.CONSUMER, identifier, definer)
}
go bmr.storeMetadataTask(common.CONSUMER, identifier, definer)
go mr.storeMetadataTask(common.CONSUMER, identifier, definer)
}

// SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata
func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
func (mr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
report := instance.GetMetadataReportInstance()
if bmr.syncReport {
if mr.syncReport {
return report.SaveServiceMetadata(identifier, url)
}
go report.SaveServiceMetadata(identifier, url)
return nil
}

// RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata
func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error {
func (mr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error {
report := instance.GetMetadataReportInstance()
if bmr.syncReport {
if mr.syncReport {
return report.RemoveServiceMetadata(identifier)
}
go report.RemoveServiceMetadata(identifier)
return nil
}

// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls
func (bmr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string {
func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string {
report := instance.GetMetadataReportInstance()
return report.GetExportedURLs(identifier)
}

// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data
func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
report := instance.GetMetadataReportInstance()
if bmr.syncReport {
if mr.syncReport {
return report.SaveSubscribedData(identifier, urls)
}
go report.SaveSubscribedData(identifier, urls)
Expand All @@ -260,15 +262,15 @@ func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdenti
}

// doHandlerMetadataCollection will store metadata to metadata support with given metadataMap
func (bmr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool {
func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool {
if len(metadataMap) == 0 {
return true
}
for e := range metadataMap {
if common.RoleType(common.PROVIDER).Role() == e.Side {
bmr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition))
mr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition))
} else if common.RoleType(common.CONSUMER).Role() == e.Side {
bmr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string))
mr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string))
}
}
return false
Expand Down
8 changes: 4 additions & 4 deletions metadata/service/remote/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
mts.exportedRevision.Store(exportedRevision)
urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "")
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err)
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
iterator := urls.Iter(inmemory.Comparator{})
Expand All @@ -145,7 +145,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
id := identifier.NewServiceMetadataIdentifier(common.URL(url))
id.Revision = mts.exportedRevision.Load()
if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err)
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
}
Expand All @@ -155,7 +155,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
mts.subscribedRevision.Store(subscribedRevision)
urls, err := mts.inMemoryMetadataService.GetSubscribedURLs()
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err)
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err)
result = false
}
if urls != nil && urls.Len() > 0 {
Expand All @@ -166,7 +166,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
Revision: subscribedRevision,
}
if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err)
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
}
Expand Down

0 comments on commit c024ce1

Please sign in to comment.