Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions internal/events/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error)
client := ffresty.NewWithConfig(ctx, *ffrestyConfig)

*wh = WebHooks{
ctx: log.WithLogField(ctx, "webhook", wh.connID),
ctx: log.WithLogField(ctx, "webhook", connID),
capabilities: &events.Capabilities{
BatchDelivery: true,
},
Expand Down Expand Up @@ -139,7 +139,7 @@ func (p *whPayload) firstData() fftypes.JSONObject {
}

func (wh *WebHooks) buildPayload(ctx context.Context, sub *core.Subscription, event *core.CombinedEventDataDelivery) *whPayload {
log.L(wh.ctx).Debugf("Webhook-> %s event %s on subscription %s", sub.Options.URL, event.Event.ID, sub.ID)
log.L(ctx).Debugf("Webhook-> %s event %s on subscription %s", sub.Options.URL, event.Event.ID, sub.ID)
withData := sub.Options.WithData != nil && *sub.Options.WithData
options := sub.Options.TransportOptions()
p := &whPayload{
Expand Down Expand Up @@ -402,7 +402,7 @@ func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription,
Status: resp.StatusCode(),
Headers: fftypes.JSONObject{},
}
log.L(wh.ctx).Debugf("Webhook<- %s %s on subscription %s returned %d", req.method, req.url, sub.ID, res.Status)
log.L(ctx).Debugf("Webhook<- %s %s on subscription %s returned %d", req.method, req.url, sub.ID, res.Status)
header := resp.Header()
for h := range header {
res.Headers[h] = header.Get(h)
Expand Down Expand Up @@ -440,7 +440,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s
if gwErr != nil {
// Generate a bad-gateway error response - we always want to send something back,
// rather than just causing timeouts
log.L(wh.ctx).Errorf("Failed to invoke webhook: %s", gwErr)
log.L(ctx).Errorf("Failed to invoke webhook: %s on subscription %s", gwErr, sub.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the CTX that should have the sub.ID in it - this is a transport layer that could be used in anything, it so happens that you want more logging as part of subscriptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that his was already the case before in some order log lines, so worth validating that when we call buildPayload the ctx does have the subscription - which means the connID should be added now

b, _ := json.Marshal(&fftypes.RESTError{
Error: gwErr.Error(),
})
Expand All @@ -453,7 +453,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s
}
}
b, _ := json.Marshal(&res)
log.L(wh.ctx).Tracef("Webhook response: %s", string(b))
log.L(ctx).Tracef("Webhook response: %s for subscription %s", string(b), sub.ID)

// For each event emit a response
for _, combinedEvent := range events {
Expand All @@ -465,7 +465,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s
txType = fftypes.FFEnum(strings.ToLower(req.replyTx))
}
if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok {
log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID)
log.L(ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID)
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Rejected: false,
Expand Down Expand Up @@ -501,12 +501,13 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s
}

func (wh *WebHooks) DeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error {
ctx = log.WithLogField(ctx, "webhook", wh.connID)
reply := sub.Options.TransportOptions().GetBool("reply")
if reply && event.Message != nil && event.Message.Header.CID != nil {
// We cowardly refuse to dispatch a message that is itself a reply, as it's hard for users to
// avoid loops - and there's no way for us to detect here if a user has configured correctly
// to avoid a loop.
log.L(wh.ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID)
log.L(ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID)
if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok {
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Expand Down Expand Up @@ -540,6 +541,7 @@ func (wh *WebHooks) DeliveryRequest(ctx context.Context, connID string, sub *cor
}

func (wh *WebHooks) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error {
ctx = log.WithLogField(ctx, "webhook", wh.connID)
reply := sub.Options.TransportOptions().GetBool("reply")
if reply {
nonReplyEvents := []*core.CombinedEventDataDelivery{}
Expand All @@ -549,7 +551,7 @@ func (wh *WebHooks) BatchDeliveryRequest(ctx context.Context, connID string, sub
// avoid loops - and there's no way for us to detect here if a user has configured correctly
// to avoid a loop.
if event.Message != nil && event.Message.Header.CID != nil {
log.L(wh.ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID)
log.L(ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID)
if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok {
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Expand Down
79 changes: 71 additions & 8 deletions internal/events/webhooks/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ import (
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly-common/pkg/fftls"
"github.com/hyperledger/firefly-common/pkg/fftypes"
fflog "github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/internal/coreconfig"
"github.com/hyperledger/firefly/mocks/eventsmocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/hyperledger/firefly/pkg/events"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -470,14 +472,10 @@ func TestRequestWithBodyReplyEndToEndWithTLS(t *testing.T) {

ctx, cancelCtx := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
shutdownContext, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := server.Shutdown(shutdownContext); err != nil {
return
}
}
<-ctx.Done()
shutdownContext, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = server.Shutdown(shutdownContext)
}()

server.Handler = r
Expand Down Expand Up @@ -1490,3 +1488,68 @@ func TestRequestWithBodyReplyEndToEndWithBatch(t *testing.T) {
func TestFirstDataNeverNil(t *testing.T) {
assert.NotNil(t, (&whPayload{}).firstData())
}

// testHook captures logrus entries for assertions
type testHook struct{ entries []*logrus.Entry }

func (h *testHook) Levels() []logrus.Level { return logrus.AllLevels }
func (h *testHook) Fire(e *logrus.Entry) error {
h.entries = append(h.entries, e)
return nil
}

func TestLoggingContextPreserved(t *testing.T) {
wh, cancel := newTestWebHooks(t)
defer cancel()

// Capture logs at debug level
logger := logrus.StandardLogger()
origHooks := logger.Hooks
hook := &testHook{}
logger.AddHook(hook)
logrus.SetLevel(logrus.DebugLevel)
defer logger.ReplaceHooks(origHooks)

// Minimal HTTP server to exercise delivery path
r := mux.NewRouter()
r.HandleFunc("/ping", func(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(200)
_, _ = res.Write([]byte(`ok`))
}).Methods(http.MethodPost)
server := httptest.NewServer(r)
defer server.Close()

sub := &core.Subscription{
SubscriptionRef: core.SubscriptionRef{Namespace: "ns1"},
}
to := sub.Options.TransportOptions()
to["url"] = fmt.Sprintf("http://%s/ping", server.Listener.Addr())
// Ensure we log via buildPayload/attemptRequest path
event := &core.EventDelivery{
EnrichedEvent: core.EnrichedEvent{Event: core.Event{ID: fftypes.NewUUID()}},
Subscription: core.SubscriptionRef{ID: sub.ID},
}

parentCtx := fflog.WithLogField(context.Background(), "httpReq", "req-123")

// Expect the DeliveryResponse callback invoked along the non-reply path
mcb := wh.callbacks.handlers["ns1"].(*eventsmocks.Callbacks)
mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(resp *core.EventDeliveryResponse) bool {
return !resp.Rejected
})).Return(nil)

err := wh.DeliveryRequest(parentCtx, mock.Anything, sub, event, nil)
assert.NoError(t, err)

// Find any log entry with the preserved fields
found := false
for _, e := range hook.entries {
if e.Data["httpReq"] == "req-123" && e.Data["webhook"] != nil {
found = true
break
}
}
assert.True(t, found, "expected log entry with preserved httpReq and webhook fields")

mcb.AssertExpectations(t)
}
Loading