Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft of metrics bus #2351

Merged
merged 3 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,8 @@ const (
LoggerFileLocalTimeKey = "logger.file.local-time"
LoggerFileCompressKey = "logger.file.compress"
)

// metrics key
const (
MetricsRegistry = "dubbo.metrics.registry"
)
65 changes: 65 additions & 0 deletions metrics/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

import (
"sync"
)

// eventListener is a struct that encapsulates the listener map and provides thread-safe access to it.
type eventListener struct {
mu sync.RWMutex
listener map[string]chan MetricsEvent
}

var listener = &eventListener{
listener: make(map[string]chan MetricsEvent),
}

// Publish publishes an event to all subscribers of the same type.
func Publish(event MetricsEvent) {
listener.mu.RLock()
defer listener.mu.RUnlock()

if ch, ok := listener.listener[event.Type()]; ok {
select {
case ch <- event:
default:
// If the channel is full, drop the event to avoid blocking.
}
}
}

// Subscribe subscribes to events of the given type.
func Subscribe(typ string, ch chan MetricsEvent) {
listener.mu.Lock()
defer listener.mu.Unlock()

listener.listener[typ] = ch
}

// Unsubscribe unsubscribes from events of the given type.
func Unsubscribe(typ string) {
listener.mu.Lock()
defer listener.mu.Unlock()

if ch, ok := listener.listener[typ]; ok {
close(ch)
delete(listener.listener, typ)
}
}
51 changes: 51 additions & 0 deletions metrics/bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

import (
"github.com/stretchr/testify/assert"
"testing"
)

var mockChan = make(chan MetricsEvent, 16)

type MockEvent struct {
}

func (m MockEvent) Type() string {
return "dubbo.metrics.mock"
}

func NewEmptyMockEvent() *MockEvent {
return &MockEvent{}
}

func init() {
Subscribe("dubbo.metrics.mock", mockChan)
Publish(NewEmptyMockEvent())
}

func TestBusPublish(t *testing.T) {
t.Run("testBusPublish", func(t *testing.T) {
event := <-mockChan

if event, ok := event.(MockEvent); ok {
assert.Equal(t, event, NewEmptyMockEvent())
}
})
}
23 changes: 23 additions & 0 deletions metrics/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

// MetricsEvent represents an event that can be published and subscribed to.
type MetricsEvent interface {
Type() string
}
51 changes: 51 additions & 0 deletions metrics/registry/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package registry

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
)

type RegistryMetricsEvent struct {
//Contains some information, such as time, success, failure

// PostType MetricKey
// FinishType MetricKey
// ErrorType MetricKey
// Level MetricsLevel

// Time
// Start time.time
// End time.time
}

func (r RegistryMetricsEvent) Type() string {
return constant.MetricsRegistry
}

// NewRegistryEvent

// NewSubscribeEvent

// NewNotifyEvent

// NewDirectoryEvent

// NewServerRegistryEvent

// NewServerSubscribeEvent
98 changes: 98 additions & 0 deletions metrics/registry/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package registry

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metrics"
)

var (
//regRegistry metrics.MetricRegistry
registryChan = make(chan metrics.MetricsEvent, 128)
handlers []func(event *RegistryMetricsEvent)
)

// func Collector(m metrics.MetricRegistry, r *metrics.ReporterConfig) {
// regRegistry = m
//
// // init related metrics
// }
func init() {
AddHandler(regHandler, subHandler, notifyHandler, directoryHandler, serverSubHandler, serverRegHandler)
//metrics.AddCollector(Collector)
metrics.Subscribe(constant.MetricsRegistry, registryChan)
go receiveEvent()
}

func receiveEvent() {
for event := range registryChan {
registryEvent, ok := event.(*RegistryMetricsEvent)
if !ok {
continue
}
for _, handler := range handlers {
go func(handler func(event *RegistryMetricsEvent)) {
handler(registryEvent)
}(handler)
}
}
}

func AddHandler(handler ...func(event *RegistryMetricsEvent)) {
for _, h := range handler {
handlers = append(handlers, h)
}
}

func regHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

func subHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry

}

func notifyHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

func directoryHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

func serverRegHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}

func serverSubHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics

// Save metrics to the MetricRegistry
}