-
Notifications
You must be signed in to change notification settings - Fork 443
/
delegates.go
139 lines (114 loc) · 4.91 KB
/
delegates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package nsq
import "time"
type logger interface {
Output(calldepth int, s string) error
}
// LogLevel specifies the severity of a given log message
type LogLevel int
// Log levels
const (
LogLevelDebug LogLevel = iota
LogLevelInfo
LogLevelWarning
LogLevelError
LogLevelMax = iota - 1 // convenience - match highest log level
)
// String returns the string form for a given LogLevel
func (lvl LogLevel) String() string {
switch lvl {
case LogLevelInfo:
return "INF"
case LogLevelWarning:
return "WRN"
case LogLevelError:
return "ERR"
}
return "DBG"
}
// MessageDelegate is an interface of methods that are used as
// callbacks in Message
type MessageDelegate interface {
// OnFinish is called when the Finish() method
// is triggered on the Message
OnFinish(*Message)
// OnRequeue is called when the Requeue() method
// is triggered on the Message
OnRequeue(m *Message, delay time.Duration, backoff bool)
// OnTouch is called when the Touch() method
// is triggered on the Message
OnTouch(*Message)
}
type connMessageDelegate struct {
c *Conn
}
func (d *connMessageDelegate) OnFinish(m *Message) { d.c.onMessageFinish(m) }
func (d *connMessageDelegate) OnRequeue(m *Message, t time.Duration, b bool) {
d.c.onMessageRequeue(m, t, b)
}
func (d *connMessageDelegate) OnTouch(m *Message) { d.c.onMessageTouch(m) }
// ConnDelegate is an interface of methods that are used as
// callbacks in Conn
type ConnDelegate interface {
// OnResponse is called when the connection
// receives a FrameTypeResponse from nsqd
OnResponse(*Conn, []byte)
// OnError is called when the connection
// receives a FrameTypeError from nsqd
OnError(*Conn, []byte)
// OnMessage is called when the connection
// receives a FrameTypeMessage from nsqd
OnMessage(*Conn, *Message)
// OnMessageFinished is called when the connection
// handles a FIN command from a message handler
OnMessageFinished(*Conn, *Message)
// OnMessageRequeued is called when the connection
// handles a REQ command from a message handler
OnMessageRequeued(*Conn, *Message)
// OnBackoff is called when the connection triggers a backoff state
OnBackoff(*Conn)
// OnContinue is called when the connection finishes a message without adjusting backoff state
OnContinue(*Conn)
// OnResume is called when the connection triggers a resume state
OnResume(*Conn)
// OnIOError is called when the connection experiences
// a low-level TCP transport error
OnIOError(*Conn, error)
// OnHeartbeat is called when the connection
// receives a heartbeat from nsqd
OnHeartbeat(*Conn)
// OnClose is called when the connection
// closes, after all cleanup
OnClose(*Conn)
}
// keeps the exported Consumer struct clean of the exported methods
// required to implement the ConnDelegate interface
type consumerConnDelegate struct {
r *Consumer
}
func (d *consumerConnDelegate) OnResponse(c *Conn, data []byte) { d.r.onConnResponse(c, data) }
func (d *consumerConnDelegate) OnError(c *Conn, data []byte) { d.r.onConnError(c, data) }
func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message) { d.r.onConnMessage(c, m) }
func (d *consumerConnDelegate) OnMessageFinished(c *Conn, m *Message) { d.r.onConnMessageFinished(c, m) }
func (d *consumerConnDelegate) OnMessageRequeued(c *Conn, m *Message) { d.r.onConnMessageRequeued(c, m) }
func (d *consumerConnDelegate) OnBackoff(c *Conn) { d.r.onConnBackoff(c) }
func (d *consumerConnDelegate) OnContinue(c *Conn) { d.r.onConnContinue(c) }
func (d *consumerConnDelegate) OnResume(c *Conn) { d.r.onConnResume(c) }
func (d *consumerConnDelegate) OnIOError(c *Conn, err error) { d.r.onConnIOError(c, err) }
func (d *consumerConnDelegate) OnHeartbeat(c *Conn) { d.r.onConnHeartbeat(c) }
func (d *consumerConnDelegate) OnClose(c *Conn) { d.r.onConnClose(c) }
// keeps the exported Producer struct clean of the exported methods
// required to implement the ConnDelegate interface
type producerConnDelegate struct {
w *Producer
}
func (d *producerConnDelegate) OnResponse(c *Conn, data []byte) { d.w.onConnResponse(c, data) }
func (d *producerConnDelegate) OnError(c *Conn, data []byte) { d.w.onConnError(c, data) }
func (d *producerConnDelegate) OnMessage(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageFinished(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageRequeued(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnBackoff(c *Conn) {}
func (d *producerConnDelegate) OnContinue(c *Conn) {}
func (d *producerConnDelegate) OnResume(c *Conn) {}
func (d *producerConnDelegate) OnIOError(c *Conn, err error) { d.w.onConnIOError(c, err) }
func (d *producerConnDelegate) OnHeartbeat(c *Conn) { d.w.onConnHeartbeat(c) }
func (d *producerConnDelegate) OnClose(c *Conn) { d.w.onConnClose(c) }