Skip to content

Commit

Permalink
fix alibaba#83; fix consumer interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
gorexlv committed Mar 9, 2020
1 parent f39e051 commit 8bb7e34
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
11 changes: 7 additions & 4 deletions adapter/rocketmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,23 @@ import (
"github.com/apache/rocketmq-client-go/v2/primitive"
)

// SentinelConsumerInterceptor returns interceptor for consumer
func SentinelConsumerInterceptor(opts ...Option) primitive.Interceptor {
options := evaluateOptions(opts)
return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
producerCtx := primitive.GetProducerCtx(ctx)
resourceName := producerCtx.Message.Topic
consumerCtx, _ := primitive.GetConsumerCtx(ctx)
resourceName := consumerCtx.MQ.Topic

if options.resourceExtract != nil {
resourceName = options.resourceExtract(producerCtx)
if options.consumerResourceExtract != nil {
resourceName = options.consumerResourceExtract(consumerCtx)
}

entry, err := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeMQ),
sentinel.WithTrafficType(base.Inbound),
sentinel.WithArgs("topic", consumerCtx.MQ.Topic),
sentinel.WithArgs("broker", consumerCtx.MQ.BrokerName),
)

if err != nil {
Expand Down
16 changes: 13 additions & 3 deletions adapter/rocketmq/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,27 @@ import (
type (
Option func(*options)
options struct {
resourceExtract func(*primitive.ProducerCtx) string
consumerResourceExtract func(*primitive.ConsumeMessageContext) string
providerResourceExtract func(*primitive.ProducerCtx) string
blockFallback primitive.Interceptor
}
)

func WithResourceExtract(fn func(ctx *primitive.ProducerCtx) string) Option {
// WithConsumerResourceExtract set consumerResourceExtract
func WithConsumerResourceExtract(fn func(ctx *primitive.ConsumeMessageContext) string) Option {
return func(options *options) {
options.resourceExtract = fn
options.consumerResourceExtract = fn
}
}

// WithProviderResourceExtract set providerResourceExtract
func WithProviderResourceExtract(fn func(ctx *primitive.ProducerCtx) string) Option {
return func(options *options) {
options.providerResourceExtract = fn
}
}

// WithBlockFallback set blockFallback
func WithBlockFallback(fn primitive.Interceptor) Option {
return func(options *options) {
options.blockFallback = fn
Expand Down
7 changes: 5 additions & 2 deletions adapter/rocketmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,23 @@ import (
"github.com/apache/rocketmq-client-go/v2/primitive"
)

// SentinelProviderInterceptor returns interceptor for producer
func SentinelProviderInterceptor(opts ...Option) primitive.Interceptor {
options := evaluateOptions(opts)
return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
producerCtx := primitive.GetProducerCtx(ctx)
resourceName := producerCtx.Message.Topic

if options.resourceExtract != nil {
resourceName = options.resourceExtract(producerCtx)
if options.providerResourceExtract != nil {
resourceName = options.providerResourceExtract(producerCtx)
}

entry, err := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeMQ),
sentinel.WithTrafficType(base.Outbound),
sentinel.WithArgs("topic", producerCtx.Message.Topic),
sentinel.WithArgs("broker", producerCtx.MQ.BrokerName),
)

if err != nil {
Expand Down

0 comments on commit 8bb7e34

Please sign in to comment.