@@ -13,7 +13,15 @@ import (
13
13
"github.com/valyala/fasthttp"
14
14
)
15
15
16
- // --- Coordinator definition ---
16
+ // --- Retry Coordinator Definition ---
17
+
18
+ // RetryCoordinator provides global concurrency control for retry operations
19
+ // across all drain writers. It uses a semaphore pattern to enforce a configurable
20
+ // limit on the number of concurrent retries, preventing resource exhaustion
21
+ // and noisy-neighbor problems during periods of high retry load. When all
22
+ // retry slots are in use, additional retries will wait until a slot becomes
23
+ // available, ensuring the system remains stable while still attempting
24
+ // delivery of all messages. This coordinator will be used as a singleton.
17
25
type RetryCoordinator struct {
18
26
sem chan struct {}
19
27
}
@@ -31,6 +39,8 @@ func WithParallelRetries(n int) {
31
39
}
32
40
}
33
41
42
+ // GetGlobalRetryCoordinator returns a singleton instance of RetryCoordinator.
43
+ // It initializes the coordinator with a semaphore that limits the number of concurrent retries.
34
44
func GetGlobalRetryCoordinator () * RetryCoordinator {
35
45
globalRetryCoordinatorOnce .Do (func () {
36
46
globalRetryCoordinator = & RetryCoordinator {
@@ -55,6 +65,10 @@ func (c *RetryCoordinator) Release() {
55
65
<- c .sem
56
66
}
57
67
68
+ // --- RetryWriter Definition ---
69
+
70
+ // InternalRetryWriter is an interface that defines methods for configuring retry behavior
71
+ // for syslog writers. It allows setting a retry duration function and the maximum number of retries.
58
72
type InternalRetryWriter interface {
59
73
ConfigureRetry (retryDuration RetryDuration , maxRetries int )
60
74
}
@@ -66,6 +80,12 @@ type Retryer struct {
66
80
coordinator * RetryCoordinator
67
81
}
68
82
83
+ // Retryer handles retry logic for failed operations with configurable policies.
84
+ // It coordinates with the global RetryCoordinator to limit concurrent retries,
85
+ // implements exponential backoff with configurable intervals, and respects
86
+ // context cancellation for graceful shutdown. The first attempt is always
87
+ // performed without acquiring a retry slot (fast path), while subsequent
88
+ // retries are subject to global concurrency limits.
69
89
func NewRetryer (
70
90
binding * URLBinding ,
71
91
retryDuration RetryDuration ,
@@ -124,6 +144,8 @@ func (r *Retryer) Retry(batch []byte, msgCount float64, funcToRetry func([]byte,
124
144
return true
125
145
}
126
146
147
+ // --- HTTPSBatchWriter definition ---
148
+
127
149
type HTTPSBatchWriter struct {
128
150
HTTPSWriter
129
151
batchSize int
@@ -154,8 +176,6 @@ func WithSendInterval(interval time.Duration) Option {
154
176
}
155
177
}
156
178
157
- // --- HTTPSBatchWriter definition ---
158
-
159
179
// HTTPSBatchWriter is an egress.WriteCloser implementation that batches syslog messages
160
180
// and sends them via HTTPS in configurable batch sizes and intervals. It provides
161
181
// backpressure to upstream callers by using a blocking channel for incoming messages.
0 commit comments