-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathpublish.go
153 lines (129 loc) · 4.84 KB
/
publish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package input_logfile
import (
"time"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/libbeat/statestore"
)
// Publisher is used to publish an event and update the cursor in a single call to Publish.
// Inputs are allowed to pass `nil` as cursor state. In this case the state is not updated, but the
// event will still be published as is.
type Publisher interface {
Publish(event beat.Event, cursor interface{}) error
}
// cursorPublisher implements the Publisher interface and used internally by the managedInput.
// When publishing an event with cursor state updates, the cursorPublisher
// updates the in memory state and create an updateOp that is used to schedule
// an update for the persistent store. The updateOp is run by the inputs ACK
// handler, persisting the pending update.
type cursorPublisher struct {
canceler input.Canceler
client beat.Client
cursor *Cursor
}
// updateOp keeps track of pending updates that are not written to the persistent store yet.
// Update operations are ordered. The input manager guarantees that only one
// input can create update operation for a source, such that new input
// instances can add update operations to be executed after already pending
// update operations from older inputs instances that have been shutdown.
type updateOp struct {
resource *resource
// state updates to persist
timestamp time.Time
delta interface{}
}
func newUpdateOp(resource *resource, ts time.Time, delta interface{}) *updateOp {
return &updateOp{
resource: resource,
timestamp: ts,
delta: delta,
}
}
func (op *updateOp) Key() string {
return op.resource.key
}
// Publish publishes an event. Publish returns false if the inputs cancellation context has been marked as done.
// If cursorUpdate is not nil, Publish updates the in memory state and create and updateOp for the pending update.
// It overwrite event.Private with the update operation, before finally sending the event.
// The ACK ordering in the publisher pipeline guarantees that update operations
// will be ACKed and executed in the correct order.
func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) error {
if cursorUpdate == nil {
return c.forward(event)
}
op, err := createUpdateOp(c.cursor.resource, cursorUpdate)
if err != nil {
return err
}
event.Private = op
return c.forward(event)
}
func (c *cursorPublisher) forward(event beat.Event) error {
c.client.Publish(event)
if c.canceler == nil {
return nil
}
return c.canceler.Err()
}
func createUpdateOp(resource *resource, updates interface{}) (*updateOp, error) {
ts := time.Now()
resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()
resource.pendingUpdate = updates
resource.Retain()
resource.activeCursorOperations++
return newUpdateOp(resource, ts, updates), nil
}
// done releases resources held by the last N updateOps.
func (op *updateOp) done(n uint) {
op.resource.UpdatesReleaseN(n)
op.resource = nil
}
// Execute updates the persistent store with the scheduled changes and releases the resource.
func (op *updateOp) Execute(store *store, n uint) {
resource := op.resource
resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()
if resource.lockedVersion != op.resource.version || resource.isDeleted() {
return
}
defer op.done(n)
resource.activeCursorOperations -= n
if resource.activeCursorOperations == 0 {
resource.cursor = resource.pendingCursor()
resource.pendingCursorValue = nil
} else {
err := typeconv.Convert(&resource.cursor, op.delta)
if err != nil {
store.log.Errorf("failed to perform type conversion: %w", err)
}
}
if resource.internalState.Updated.Before(op.timestamp) {
resource.internalState.Updated = op.timestamp
}
err := store.persistentStore.Set(resource.key, resource.inSyncStateSnapshot())
if err != nil {
if !statestore.IsClosed(err) {
store.log.Errorf("Failed to update state in the registry for '%v': %s", resource.key, err)
}
} else {
resource.stored = true
}
}