This repository has been archived by the owner on Oct 25, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathevent_writer.go
132 lines (120 loc) · 3.81 KB
/
event_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package quark
import (
"context"
"strconv"
"time"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
)
// EventWriter works as an Event response writer.
// Lets an Event to respond, fan-out a topic message or to fail properly by sending the failed Event into either a
// Retry or Dead Letter Queue (DLQ).
//
// Uses a Publisher to write the actual message
type EventWriter interface {
injectHeader(Header)
// Publisher actual publisher used to push Event(s)
Publisher() Publisher
// Header Event metadata
Header() Header
// Write push the given encoded message into the Event-Driven ecosystem.
//
// Returns number of messages published and non-nil error if publisher failed to push Event or
// returns ErrNotEnoughTopics if no topic was specified
// Sometimes, the writer might not publish messages to broker since they have passed the maximum redelivery cap
Write(ctx context.Context, msg []byte, topics ...string) (int, error)
// WriteMessage push the given message into the Event-Driven ecosystem.
//
// Returns number of messages published
// and non-nil error if publisher failed to push Event
// Remember Quark uses Message's "Kind" field as topic name, so the developer must specify it either in mentioned field or in response headers
// Sometimes, the writer might not publish messages to broker since they have passed the maximum redelivery cap
WriteMessage(context.Context, ...*Message) (int, error)
}
type defaultEventWriter struct {
node *node
publisher Publisher
header Header
}
// newEventWriter allocates and creates a default EventWriter
func newEventWriter(n *node, p Publisher) EventWriter {
return &defaultEventWriter{
node: n,
publisher: p,
header: Header{},
}
}
func (d *defaultEventWriter) injectHeader(h Header) {
d.header = h
}
func (d defaultEventWriter) Publisher() Publisher {
return d.publisher
}
func (d *defaultEventWriter) Header() Header {
return d.header
}
func (d *defaultEventWriter) Write(ctx context.Context, msg []byte, topics ...string) (int, error) {
if d.publisher == nil {
return 0, ErrPublisherNotImplemented
} else if len(topics) == 0 {
return 0, ErrNotEnoughTopics
}
errs := new(multierror.Error)
msgPublished := 0
for _, t := range topics {
m := NewMessage(uuid.New().String(), t, msg)
d.parseHeader(m)
if m.Metadata.RedeliveryCount > d.node.setDefaultMaxRetries() {
continue // avoid distributed loops (at macro scale)
}
time.Sleep(d.node.setDefaultRetryBackoff() * time.Duration(m.Metadata.RedeliveryCount))
if err := d.publisher.Publish(ctx, m); err != nil {
errs = multierror.Append(errs, err)
continue
}
msgPublished++
}
return msgPublished, errs.ErrorOrNil()
}
func (d *defaultEventWriter) WriteMessage(ctx context.Context, msgs ...*Message) (int, error) {
if d.publisher == nil {
return 0, ErrPublisherNotImplemented
} else if len(msgs) == 0 {
return 0, ErrNotEnoughTopics
}
errs := new(multierror.Error)
msgPublished := 0
for _, msg := range msgs {
d.parseHeader(msg)
if msg.Metadata.RedeliveryCount > d.node.setDefaultMaxRetries() {
continue // avoid distributed loops (at macro scale)
}
time.Sleep(d.node.setDefaultRetryBackoff() * time.Duration(msg.Metadata.RedeliveryCount))
if err := d.publisher.Publish(ctx, msg); err != nil {
errs = multierror.Append(errs, err)
continue
}
msgPublished++
}
return msgPublished, errs.ErrorOrNil()
}
func (d *defaultEventWriter) parseHeader(msg *Message) {
for k, v := range d.header {
switch k {
case HeaderMessageKind:
msg.Kind = v
case HeaderMessageCorrelationId:
if v != "" {
msg.Metadata.CorrelationId = v
}
case HeaderMessageRedeliveryCount:
if c, err := strconv.Atoi(v); err == nil {
msg.Metadata.RedeliveryCount = c
}
case HeaderMessageHost:
msg.Metadata.Host = v
default:
msg.Metadata.ExternalData[k] = v
}
}
}