Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mq支持pubsub模式 #11

Open
wants to merge 3 commits into
base: di
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions config/mq_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,18 @@ import (
type ChannelType string

const (
Direct ChannelType = "direct"
Direct ChannelType = "normal"
PubSub ChannelType = "pub_sub"
Delay ChannelType = "delay"
)

type MQConfig struct {
Type string `yaml:"type"` // 使用的MQ类型
ConfigSource string `yaml:"config_source"`
Type string `yaml:"type"` // 使用的MQ类型
Mod ChannelType `yaml:"mod"`
ConfigSource string `yaml:"config_source"`

Config map[string]string `yaml:"config"` // 由具体的mq实现决定其内容如何解析
}

type RabbitMQChannelConfig struct {
QueueName string `yaml:"queue"` // 可为空,代表生成一个no-durable的队列,名字由系统给定
ExchangeName string `yaml:"exchange"` // 可为空,若为空则不会使用exchange,而是往queue中直接发送;pubsub和delay必须指定
Type ChannelType `yaml:"type"` // direct/pub_sub/delay
}

func (s *MQConfig) checkAndFix() {
if err := tools.ReadFromEnvIfNeed(s); err != nil {
fmt.Println("warn: RedisConfig checkAndFix failed with err = ", err)
Expand Down
1 change: 1 addition & 0 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var GloryFrameworkErrorTargetInvokerNotFound = &GloryFrameworkError{Code: -301,
type GloryFrameworkError struct {
Code int32
Msg string
Desc string
}

func (g *GloryFrameworkError) Error() string {
Expand Down
2 changes: 2 additions & 0 deletions grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/glory-go/glory/filter/intercepter_impl"
_ "github.com/glory-go/glory/grpc/resolver"
"github.com/glory-go/glory/log"
mwcomm "github.com/glory-go/glory/service/middleware/common"
"github.com/glory-go/glory/service/middleware/jaeger"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
Expand Down Expand Up @@ -42,6 +43,7 @@ func (gc *GrpcClient) setup(unaryMWs ...grpc.UnaryClientInterceptor) {
dialOption := []grpc.DialOption{grpc.WithInsecure()}
// add client middlewares
unaryMWs = append(unaryMWs, jaeger.UnaryClientMW())
unaryMWs = append(unaryMWs, mwcomm.GetUnaryClientMWs()...)
dialOption = append(dialOption,
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(unaryMWs...),
Expand Down
99 changes: 0 additions & 99 deletions loader/env_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func loadConfig(mqConfigs map[string]*config.MQConfig) {
if !ok {
panic(fmt.Sprintf("mq with type %s not register", config.Type))
}
instance, err := factory(config.Config)
instance, err := factory(config.Mod, config.Config)
if err != nil {
panic(fmt.Sprintf("fail to load config for mq %s, err is %v", name, err))
}
Expand Down
4 changes: 3 additions & 1 deletion mq/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/glory-go/glory/config"
"github.com/glory-go/glory/log"
)

Expand All @@ -14,12 +15,13 @@ var (

type MQMsgHandler func(context.Context, []byte) error

type MQServiceFactory func(rawConfig map[string]string) (MQService, error)
type MQServiceFactory func(model config.ChannelType, rawConfig map[string]string) (MQService, error)

type MQService interface {
Connect() error
Send(topic string, msg []byte) (msgID string, err error)
DelaySend(topic string, msg []byte, handleTime time.Time) (msgID string, err error)
Publish(topic string, msg []byte) (msgID string, err error)
RegisterHandler(topic string, handler MQMsgHandler)
}

Expand Down
1 change: 1 addition & 0 deletions mq/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
mq:
mq_name: # 自定义的名称
type: aliyun_rocketmq # 目标mq实现中指定的类型
mod: normal|pubsub # 该mq使用什么消息模式
config_source: env # 可选,包含该配置时,将从指定来源中解析mq配置,目前仅支持环境变量
config: # 以下内容为用户自定义mq所消费
key: value
Expand Down
13 changes: 2 additions & 11 deletions service/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"log"
"net"

"github.com/glory-go/glory/filter/intercepter_impl"
"github.com/glory-go/glory/service/middleware/common"
"github.com/glory-go/glory/service/middleware/jaeger"

"github.com/glory-go/glory/config"
Expand All @@ -33,6 +33,7 @@ func NewGrpcService(name string) *GrpcService {
func (gs *GrpcService) setup() {
gs.unaryMWs = make([]grpc.UnaryServerInterceptor, 0)
gs.RegisterUnaryInterceptor(jaeger.UnaryServerMW())
gs.RegisterUnaryInterceptor(common.GetUnaryServerMWs()...)
}

func (gs *GrpcService) RegisterUnaryInterceptor(mw ...grpc.UnaryServerInterceptor) {
Expand All @@ -59,13 +60,3 @@ func (gs *GrpcService) GetGrpcServer() *grpc.Server {
)
return gs.grpcServer
}

func getOptionFromFilter(filterKeys []string) []grpc.ServerOption {
intercepter, err := intercepter_impl.NewDefaultGRPCIntercepter(filterKeys)
if err != nil {
panic(err)
}
return []grpc.ServerOption{
grpc.UnaryInterceptor(intercepter.ServerIntercepterHandle),
}
}
6 changes: 5 additions & 1 deletion service/http_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/glory-go/glory/config"
ghttp "github.com/glory-go/glory/http"
"github.com/glory-go/glory/service/middleware/common"
"github.com/glory-go/glory/service/middleware/jaeger"
"github.com/gorilla/mux"
"github.com/urfave/negroni"
Expand Down Expand Up @@ -44,11 +45,13 @@ func (hs *HttpService) UseGloryMW(filters ...ghttp.Filter) {
}

func (hs *HttpService) Run(ctx context.Context) {
// handler := cors.Default().Handler(hs.router)
s := negroni.Classic()
for _, handler := range hs.mws {
s.Use(handler)
}
for _, handler := range common.GetNegroniMWs() {
s.Use(handler)
}
s.UseHandler(hs.router)
s.Run(":" + strconv.Itoa(hs.conf.addr.Port))
}
Expand All @@ -61,6 +64,7 @@ func (hs *HttpService) RegisterRouterWithRawHttpHandler(path string, handler fun
func (hs *HttpService) RegisterRouter(path string, handler func(*ghttp.GRegisterController) error, req interface{},
rsp interface{}, method string, filters ...ghttp.Filter) {
filters = append(hs.gloryMWs, filters...)
filters = append(filters, common.GetGloryMWs()...)
ghttp.RegisterRouter(path, hs.router, handler, req, rsp, method, filters)
}

Expand Down
24 changes: 24 additions & 0 deletions service/middleware/common/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import "google.golang.org/grpc"

var (
unaryClientMWs []grpc.UnaryClientInterceptor
unaryServerMWs []grpc.UnaryServerInterceptor
)

func RegisterUnaryClientMWs(filters ...grpc.UnaryClientInterceptor) {
unaryClientMWs = append(unaryClientMWs, filters...)
}

func RegisterUnaryServerMWs(filters ...grpc.UnaryServerInterceptor) {
unaryServerMWs = append(unaryServerMWs, filters...)
}

func GetUnaryClientMWs() []grpc.UnaryClientInterceptor {
return unaryClientMWs
}

func GetUnaryServerMWs() []grpc.UnaryServerInterceptor {
return unaryServerMWs
}
27 changes: 27 additions & 0 deletions service/middleware/common/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package common

import (
ghttp "github.com/glory-go/glory/http"
"github.com/urfave/negroni"
)

var (
gloryMWs []ghttp.Filter
negroniMWs []negroni.Handler
)

func RegisterGloryMWs(filters ...ghttp.Filter) {
gloryMWs = append(gloryMWs, filters...)
}

func RegisterNegroniMWs(filters ...negroni.Handler) {
negroniMWs = append(negroniMWs, filters...)
}

func GetGloryMWs() []ghttp.Filter {
return gloryMWs
}

func GetNegroniMWs() []negroni.Handler {
return negroniMWs
}