Skip to content

Commit

Permalink
remove redundant sections in httpendpoint channel (dapr#6663)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaron2 authored Jul 14, 2023
1 parent c98abfc commit 8d7eaee
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 74 deletions.
45 changes: 19 additions & 26 deletions pkg/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,23 @@ type API interface {
MarkStatusAsReady()
MarkStatusAsOutboundReady()
SetAppChannel(appChannel channel.AppChannel)
SetHTTPEndpointsAppChannel(appChannel channel.HTTPEndpointAppChannel)
SetDirectMessaging(directMessaging messaging.DirectMessaging)
SetActorRuntime(actor actors.Actors)
}

type api struct {
universal *universalapi.UniversalAPI
endpoints []Endpoint
publicEndpoints []Endpoint
directMessaging messaging.DirectMessaging
appChannel channel.AppChannel
httpEndpointsAppChannel channel.HTTPEndpointAppChannel
resiliency resiliency.Provider
pubsubAdapter runtimePubsub.Adapter
sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
readyStatus bool
outboundReadyStatus bool
tracingSpec config.TracingSpec
maxRequestBodySize int64 // In bytes
universal *universalapi.UniversalAPI
endpoints []Endpoint
publicEndpoints []Endpoint
directMessaging messaging.DirectMessaging
appChannel channel.AppChannel
resiliency resiliency.Provider
pubsubAdapter runtimePubsub.Adapter
sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
readyStatus bool
outboundReadyStatus bool
tracingSpec config.TracingSpec
maxRequestBodySize int64 // In bytes
}

const (
Expand Down Expand Up @@ -137,14 +135,13 @@ type APIOpts struct {
// NewAPI returns a new API.
func NewAPI(opts APIOpts) API {
api := &api{
appChannel: opts.AppChannel,
httpEndpointsAppChannel: opts.HTTPEndpointsAppChannel,
directMessaging: opts.DirectMessaging,
resiliency: opts.Resiliency,
pubsubAdapter: opts.PubsubAdapter,
sendToOutputBindingFn: opts.SendToOutputBindingFn,
tracingSpec: opts.TracingSpec,
maxRequestBodySize: opts.MaxRequestBodySize,
appChannel: opts.AppChannel,
directMessaging: opts.DirectMessaging,
resiliency: opts.Resiliency,
pubsubAdapter: opts.PubsubAdapter,
sendToOutputBindingFn: opts.SendToOutputBindingFn,
tracingSpec: opts.TracingSpec,
maxRequestBodySize: opts.MaxRequestBodySize,
universal: &universalapi.UniversalAPI{
AppID: opts.AppID,
Logger: log,
Expand Down Expand Up @@ -2260,10 +2257,6 @@ func (a *api) SetAppChannel(appChannel channel.AppChannel) {
a.appChannel = appChannel
}

func (a *api) SetHTTPEndpointsAppChannel(appChannel channel.HTTPEndpointAppChannel) {
a.httpEndpointsAppChannel = appChannel
}

func (a *api) SetDirectMessaging(directMessaging messaging.DirectMessaging) {
a.directMessaging = directMessaging
}
Expand Down
62 changes: 30 additions & 32 deletions pkg/messaging/direct_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,19 @@ type remoteApp struct {

// NewDirectMessaging contains the options for NewDirectMessaging.
type NewDirectMessagingOpts struct {
AppID string
Namespace string
Port int
CompStore *compstore.ComponentStore
Mode modes.DaprMode
AppChannel channel.AppChannel
HTTPEndpointsAppChannel channel.HTTPEndpointAppChannel
ClientConnFn messageClientConnection
Resolver nr.Resolver
MaxRequestBodySize int
Proxy Proxy
ReadBufferSize int
Resiliency resiliency.Provider
IsStreamingEnabled bool
AppID string
Namespace string
Port int
CompStore *compstore.ComponentStore
Mode modes.DaprMode
AppChannel channel.AppChannel
ClientConnFn messageClientConnection
Resolver nr.Resolver
MaxRequestBodySize int
Proxy Proxy
ReadBufferSize int
Resiliency resiliency.Provider
IsStreamingEnabled bool
}

// NewDirectMessaging returns a new direct messaging api.
Expand All @@ -108,22 +107,21 @@ func NewDirectMessaging(opts NewDirectMessagingOpts) DirectMessaging {
hName, _ := os.Hostname()

dm := &directMessaging{
appID: opts.AppID,
namespace: opts.Namespace,
grpcPort: opts.Port,
mode: opts.Mode,
appChannel: opts.AppChannel,
httpEndpointsAppChannel: opts.HTTPEndpointsAppChannel,
connectionCreatorFn: opts.ClientConnFn,
resolver: opts.Resolver,
maxRequestBodySizeMB: opts.MaxRequestBodySize,
proxy: opts.Proxy,
readBufferSize: opts.ReadBufferSize,
resiliency: opts.Resiliency,
isStreamingEnabled: opts.IsStreamingEnabled,
hostAddress: hAddr,
hostName: hName,
compStore: opts.CompStore,
appID: opts.AppID,
namespace: opts.Namespace,
grpcPort: opts.Port,
mode: opts.Mode,
appChannel: opts.AppChannel,
connectionCreatorFn: opts.ClientConnFn,
resolver: opts.Resolver,
maxRequestBodySizeMB: opts.MaxRequestBodySize,
proxy: opts.Proxy,
readBufferSize: opts.ReadBufferSize,
resiliency: opts.Resiliency,
isStreamingEnabled: opts.IsStreamingEnabled,
hostAddress: hAddr,
hostName: hName,
compStore: opts.CompStore,
}

if dm.proxy != nil {
Expand Down Expand Up @@ -267,7 +265,7 @@ func (d *directMessaging) invokeHTTPEndpoint(ctx context.Context, appID, appName
// Set up timers
start := time.Now()
diag.DefaultMonitoring.ServiceInvocationRequestSent(appID, req.Message().Method)
imr, err := d.invokeRemoteUnaryForHTTPEndpoint(ctx, nil, req, nil, appID)
imr, err := d.invokeRemoteUnaryForHTTPEndpoint(ctx, req, appID)

// Diagnostics
if imr != nil {
Expand Down Expand Up @@ -318,7 +316,7 @@ func (d *directMessaging) invokeRemote(ctx context.Context, appID, appNamespace,
return imr, teardown, err
}

func (d *directMessaging) invokeRemoteUnaryForHTTPEndpoint(ctx context.Context, clientV1 internalv1pb.ServiceInvocationClient, req *invokev1.InvokeMethodRequest, opts []grpc.CallOption, appID string) (*invokev1.InvokeMethodResponse, error) {
func (d *directMessaging) invokeRemoteUnaryForHTTPEndpoint(ctx context.Context, req *invokev1.InvokeMethodRequest, appID string) (*invokev1.InvokeMethodResponse, error) {
if d.httpEndpointsAppChannel == nil {
return nil, errors.New("cannot invoke http endpoint: http endpoints app channel not initialized")
}
Expand Down
29 changes: 13 additions & 16 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,6 @@ func (a *DaprRuntime) initRuntime(ctx context.Context) error {
a.daprGRPCAPI.SetAppChannel(a.appChannel)
a.directMessaging.SetAppChannel(a.appChannel)

// add another app channel dedicated to external service invocation
a.daprHTTPAPI.SetHTTPEndpointsAppChannel(a.httpEndpointsAppChannel)
a.directMessaging.SetHTTPEndpointsAppChannel(a.httpEndpointsAppChannel)

a.daprHTTPAPI.SetDirectMessaging(a.directMessaging)
Expand Down Expand Up @@ -948,20 +946,19 @@ func matchRoutingRule(rules []*runtimePubsub.Rule, data map[string]interface{})

func (a *DaprRuntime) initDirectMessaging(resolver nr.Resolver) {
a.directMessaging = messaging.NewDirectMessaging(messaging.NewDirectMessagingOpts{
AppID: a.runtimeConfig.id,
Namespace: a.namespace,
Port: a.runtimeConfig.internalGRPCPort,
Mode: a.runtimeConfig.mode,
AppChannel: a.appChannel,
HTTPEndpointsAppChannel: a.httpEndpointsAppChannel,
ClientConnFn: a.grpc.GetGRPCConnection,
Resolver: resolver,
MaxRequestBodySize: a.runtimeConfig.maxRequestBodySize,
Proxy: a.proxy,
ReadBufferSize: a.runtimeConfig.readBufferSize,
Resiliency: a.resiliency,
IsStreamingEnabled: a.globalConfig.IsFeatureEnabled(config.ServiceInvocationStreaming),
CompStore: a.compStore,
AppID: a.runtimeConfig.id,
Namespace: a.namespace,
Port: a.runtimeConfig.internalGRPCPort,
Mode: a.runtimeConfig.mode,
AppChannel: a.appChannel,
ClientConnFn: a.grpc.GetGRPCConnection,
Resolver: resolver,
MaxRequestBodySize: a.runtimeConfig.maxRequestBodySize,
Proxy: a.proxy,
ReadBufferSize: a.runtimeConfig.readBufferSize,
Resiliency: a.resiliency,
IsStreamingEnabled: a.globalConfig.IsFeatureEnabled(config.ServiceInvocationStreaming),
CompStore: a.compStore,
})
}

Expand Down

0 comments on commit 8d7eaee

Please sign in to comment.