-
Notifications
You must be signed in to change notification settings - Fork 16
/
stream.go
491 lines (438 loc) · 15.3 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package arrow // import "github.com/open-telemetry/otel-arrow/collector/exporter/otelarrowexporter/internal/arrow"
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)
// Stream is 1:1 with gRPC stream.
type Stream struct {
// maxStreamLifetime is the max timeout before stream
// should be closed on the client side. This ensures a
// graceful shutdown before max_connection_age is reached
// on the server side.
maxStreamLifetime time.Duration
// producer is exclusive to the holder of the stream.
producer arrowRecord.ProducerAPI
// prioritizer has a reference to the stream, this allows it to be severed.
prioritizer *streamPrioritizer
// perRPCCredentials from the auth extension, or nil.
perRPCCredentials credentials.PerRPCCredentials
// telemetry are a copy of the exporter's telemetry settings
telemetry component.TelemetrySettings
// client uses the exporter's grpc.ClientConn. this is
// initially nil only set when ArrowStream() calls meaning the
// endpoint recognizes OTLP+Arrow.
client arrowpb.ArrowStreamService_ArrowStreamClient
// toWrite is passes a batch from the sender to the stream writer, which
// includes a dedicated channel for the response.
toWrite chan writeItem
// lock protects waiters.
lock sync.Mutex
// waiters is the response channel for each active batch.
waiters map[int64]chan error
}
// writeItem is passed from the sender (a pipeline consumer) to the
// stream writer, which is not bound by the sender's context.
type writeItem struct {
// records is a ptrace.Traces, plog.Logs, or pmetric.Metrics
records interface{}
// md is the caller's metadata, derived from its context.
md map[string]string
// errCh is used by the stream reader to unblock the sender
errCh chan error
}
// newStream constructs a stream
func newStream(
producer arrowRecord.ProducerAPI,
prioritizer *streamPrioritizer,
telemetry component.TelemetrySettings,
perRPCCredentials credentials.PerRPCCredentials,
) *Stream {
return &Stream{
producer: producer,
prioritizer: prioritizer,
perRPCCredentials: perRPCCredentials,
telemetry: telemetry,
toWrite: make(chan writeItem, 1),
waiters: map[int64]chan error{},
}
}
// setBatchChannel places a waiting consumer's batchID into the waiters map, where
// the stream reader may find it.
func (s *Stream) setBatchChannel(batchID int64, errCh chan error) {
s.lock.Lock()
defer s.lock.Unlock()
s.waiters[batchID] = errCh
}
func (s *Stream) logStreamError(err error) {
isEOF := errors.Is(err, io.EOF)
isCanceled := errors.Is(err, context.Canceled)
switch {
case !isEOF && !isCanceled:
s.telemetry.Logger.Error("arrow stream error", zap.Error(err))
case isEOF:
s.telemetry.Logger.Debug("arrow stream end")
case isCanceled:
s.telemetry.Logger.Debug("arrow stream canceled")
}
}
// run blocks the calling goroutine while executing stream logic. run
// will return when the reader and writer are finished. errors will be logged.
func (s *Stream) run(bgctx context.Context, streamClient StreamClientFunc, grpcOptions []grpc.CallOption) {
ctx, cancel := context.WithCancel(bgctx)
defer cancel()
sc, err := streamClient(ctx, grpcOptions...)
if err != nil {
// Returning with stream.client == nil signals the
// lack of an Arrow stream endpoint. When all the
// streams return with .client == nil, the ready
// channel will be closed.
//
// Note: These are gRPC server internal errors and
// will cause downgrade to standard OTLP. These
// cannot be simulated by connecting to a gRPC server
// that does not support the ArrowStream service, with
// or without the WaitForReady flag set. In a real
// gRPC server the first Unimplemented code is
// generally delivered to the Recv() call below, so
// this code path is not taken for an ordinary downgrade.
//
// TODO: a more graceful recovery strategy?
s.telemetry.Logger.Error("cannot start arrow stream", zap.Error(err))
return
}
// Setting .client != nil indicates that the endpoint was valid,
// streaming may start. When this stream finishes, it will be
// restarted.
s.client = sc
// ww is used to wait for the writer. Since we wait for the writer,
// the writer's goroutine is not added to exporter waitgroup (e.wg).
var ww sync.WaitGroup
var writeErr error
ww.Add(1)
go func() {
defer ww.Done()
writeErr = s.write(ctx)
if writeErr != nil {
cancel()
}
}()
// the result from read() is processed after cancel and wait,
// so we can set s.client = nil in case of a delayed Unimplemented.
err = s.read(ctx)
// Wait for the writer to ensure that all waiters are known.
cancel()
ww.Wait()
if err != nil {
// This branch is reached with an unimplemented status
// with or without the WaitForReady flag.
status, ok := status.FromError(err)
if ok {
switch status.Code() {
case codes.Unimplemented:
// This (client == nil) signals the controller
// to downgrade when all streams have returned
// in that status.
//
// TODO: Note there are partial failure modes
// that will continue to function in a
// degraded mode, such as when half of the
// streams are successful and half of streams
// take this return path. Design a graceful
// recovery mechanism?
s.client = nil
s.telemetry.Logger.Info("arrow is not supported",
zap.String("message", status.Message()),
)
case codes.Unavailable, codes.Internal:
// gRPC returns this when max connection age is reached.
// The message string will contain NO_ERROR if it's a
// graceful shutdown.
//
// Having seen:
//
// arrow stream unknown {"kind": "exporter",
// "data_type": "traces", "name": "otlp/traces",
// "code": 13, "message": "stream terminated by
// RST_STREAM with error code: NO_ERROR"}
//
// from the default case below print `"code": 13`, this
// branch is now used for both Unavailable (witnessed
// in local testing) and Internal (witnessed in
// production); in both cases "NO_ERROR" is the key
// signifier.
if strings.Contains(status.Message(), "NO_ERROR") {
s.telemetry.Logger.Debug("arrow stream shutdown")
} else {
s.telemetry.Logger.Error("arrow stream unavailable",
zap.String("message", status.Message()),
)
}
case codes.Canceled:
// Note that when the writer encounters a local error (such
// as a panic in the encoder) it will cancel the context and
// writeErr will be set to an actual error, while the error
// returned from read() will be the cancellation by the
// writer. So if the reader's error is canceled and the
// writer's error is non-nil, use it instead.
if writeErr != nil {
s.telemetry.Logger.Error("arrow stream internal error",
zap.Error(writeErr),
)
// reset the writeErr so it doesn't print below.
writeErr = nil
} else {
s.telemetry.Logger.Error("arrow stream canceled",
zap.String("message", status.Message()),
)
}
default:
s.telemetry.Logger.Error("arrow stream unknown",
zap.Uint32("code", uint32(status.Code())),
zap.String("message", status.Message()),
)
}
} else {
s.logStreamError(err)
}
}
if writeErr != nil {
s.logStreamError(writeErr)
}
// The reader and writer have both finished; respond to any
// outstanding waiters.
for _, ch := range s.waiters {
// Note: the top-level OTLP exporter will retry.
ch <- ErrStreamRestarting
}
}
// write repeatedly places this stream into the next-available queue, then
// performs a blocking send(). This returns when the data is in the write buffer,
// the caller waiting on its error channel.
func (s *Stream) write(ctx context.Context) error {
// headers are encoding using hpack, reusing a buffer on each call.
var hdrsBuf bytes.Buffer
hdrsEnc := hpack.NewEncoder(&hdrsBuf)
timer := time.NewTimer(s.maxStreamLifetime)
for {
// Note: this can't block b/c stream has capacity &
// individual streams shut down synchronously.
s.prioritizer.setReady(s)
// this can block, and if the context is canceled we
// wait for the reader to find this stream.
var wri writeItem
var ok bool
select {
case <-timer.C:
s.prioritizer.removeReady(s)
return s.client.CloseSend()
case wri, ok = <-s.toWrite:
// channel is closed
if !ok {
return nil
}
case <-ctx.Done():
// Because we did not <-stream.toWrite, there
// is a potential sender race since the stream
// is currently in the ready set.
s.prioritizer.removeReady(s)
return ctx.Err()
}
// Note: For the two return statements below there is no potential
// sender race because the stream is not available, as indicated by
// the successful <-stream.toWrite.
batch, err := s.encode(wri.records)
if err != nil {
// This is some kind of internal error. We will restart the
// stream and mark this record as a permanent one.
err = fmt.Errorf("encode: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
return err
}
// Optionally include outgoing metadata, if present.
if len(wri.md) != 0 {
hdrsBuf.Reset()
for key, val := range wri.md {
err := hdrsEnc.WriteField(hpack.HeaderField{
Name: key,
Value: val,
})
if err != nil {
// This case is like the encode-failure case
// above, we will restart the stream but consider
// this a permenent error.
err = fmt.Errorf("hpack: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
return err
}
}
batch.Headers = hdrsBuf.Bytes()
}
// Let the receiver knows what to look for.
s.setBatchChannel(batch.BatchId, wri.errCh)
if err := s.client.Send(batch); err != nil {
// The error will be sent to errCh during cleanup for this stream.
// Note: do not wrap this error, it may contain a Status.
return err
}
}
}
// read repeatedly reads a batch status and releases the consumers waiting for
// a response.
func (s *Stream) read(_ context.Context) error {
// Note we do not use the context, the stream context might
// cancel a call to Recv() but the call to processBatchStatus
// is non-blocking.
for {
// Note: if the client has called CloseSend() and is waiting for a response from the server.
// And if the server fails for some reason, we will wait until some other condition, such as a context
// timeout. TODO: possibly, improve to wait for no outstanding requests and then stop reading.
resp, err := s.client.Recv()
if err != nil {
// Once the send direction of stream is closed the server should return
// an error that mentions an EOF. The expected error code is codes.Unknown.
status, ok := status.FromError(err)
if ok && status.Message() == "EOF" && status.Code() == codes.Unknown {
return nil
}
// Note: do not wrap, contains a Status.
return err
}
if err = s.processBatchStatus(resp); err != nil {
return fmt.Errorf("process: %w", err)
}
}
}
// getSenderChannels takes the stream lock and removes the
// corresonding sender channel for each BatchId. They are returned
// with the same index as the original status, for correlation. Nil
// channels will be returned when there are errors locating the
// sender channel.
func (s *Stream) getSenderChannels(status *arrowpb.BatchStatus) (chan error, error) {
s.lock.Lock()
defer s.lock.Unlock()
ch, ok := s.waiters[status.BatchId]
if !ok {
// Will break the stream.
return nil, fmt.Errorf("unrecognized batch ID: %d", status.BatchId)
}
delete(s.waiters, status.BatchId)
return ch, nil
}
// processBatchStatus processes a single response from the server and unblocks the
// associated sender.
func (s *Stream) processBatchStatus(status *arrowpb.BatchStatus) error {
ch, ret := s.getSenderChannels(status)
if ch == nil {
// In case getSenderChannels encounters a problem, the
// channel is nil.
return ret
}
if status.StatusCode == arrowpb.StatusCode_OK {
ch <- nil
return nil
}
var err error
switch status.StatusCode {
case arrowpb.StatusCode_UNAVAILABLE:
err = fmt.Errorf("destination unavailable: %d: %s", status.BatchId, status.StatusMessage)
case arrowpb.StatusCode_INVALID_ARGUMENT:
err = consumererror.NewPermanent(
fmt.Errorf("invalid argument: %d: %s", status.BatchId, status.StatusMessage))
default:
base := fmt.Errorf("unexpected stream response: %d: %s", status.BatchId, status.StatusMessage)
err = consumererror.NewPermanent(base)
// Will break the stream.
ret = multierr.Append(ret, base)
}
ch <- err
return ret
}
// SendAndWait submits a batch of records to be encoded and sent. Meanwhile, this
// goroutine waits on the incoming context or for the asynchronous response to be
// received by the stream reader.
func (s *Stream) SendAndWait(ctx context.Context, records interface{}) error {
errCh := make(chan error, 1)
// Note that if the OTLP exporter's gRPC Headers field was
// set, those (static) headers were used to establish the
// stream. The caller's context was returned by
// baseExporter.enhanceContext() includes the static headers
// plus optional client metadata. Here, get whatever
// headers that gRPC would have transmitted for a unary RPC
// and convey them via the Arrow batch.
// Note that the "uri" parameter to GetRequestMetadata is
// not used by the headersetter extension and is not well
// documented. Since it's an optional list, we omit it.
var md map[string]string
if s.perRPCCredentials != nil {
var err error
md, err = s.perRPCCredentials.GetRequestMetadata(ctx)
if err != nil {
return err
}
}
s.toWrite <- writeItem{
records: records,
md: md,
errCh: errCh,
}
// Note this ensures the caller's timeout is respected.
select {
case <-ctx.Done():
// This caller's context timed out.
return ctx.Err()
case err := <-errCh:
// Note: includes err == nil and err != nil cases.
return err
}
}
// encode produces the next batch of Arrow records.
func (s *Stream) encode(records interface{}) (_ *arrowpb.BatchArrowRecords, retErr error) {
// Defensively, protect against panics in the Arrow producer function.
defer func() {
if err := recover(); err != nil {
// When this happens, the stacktrace is
// important and lost if we don't capture it
// here.
s.telemetry.Logger.Debug("panic detail in otel-arrow-adapter",
zap.Reflect("recovered", err),
zap.Stack("stacktrace"),
)
retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err)
}
}()
var batch *arrowpb.BatchArrowRecords
var err error
switch data := records.(type) {
case ptrace.Traces:
batch, err = s.producer.BatchArrowRecordsFromTraces(data)
case plog.Logs:
batch, err = s.producer.BatchArrowRecordsFromLogs(data)
case pmetric.Metrics:
batch, err = s.producer.BatchArrowRecordsFromMetrics(data)
default:
return nil, fmt.Errorf("unsupported OTLP type: %T", records)
}
return batch, err
}