Skip to content

Commit

Permalink
Add context and update messages
Browse files Browse the repository at this point in the history
  • Loading branch information
powersj committed Feb 8, 2024
1 parent 0290258 commit 9824422
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 26 deletions.
11 changes: 7 additions & 4 deletions plugins/common/opcua/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (o *OpcUAClient) StatusCodeOK(code ua.StatusCode) bool {
}

// Connect to an OPC UA device
func (o *OpcUAClient) Connect() error {
func (o *OpcUAClient) Connect(ctx context.Context) error {
o.Log.Debug("Connecting OPC UA Client to server")
u, err := url.Parse(o.Config.Endpoint)
if err != nil {
Expand All @@ -189,14 +189,17 @@ func (o *OpcUAClient) Connect() error {

if o.Client != nil {
o.Log.Warnf("Closing connection to %q as already connected", u)
if err := o.Client.Close(); err != nil {
if err := o.Client.Close(ctx); err != nil {
// Only log the error but to not bail-out here as this prevents
// reconnections for multiple parties (see e.g. #9523).
o.Log.Errorf("Closing connection failed: %v", err)
}
}

o.Client = opcua.NewClient(o.Config.Endpoint, o.opts...)
o.Client, err = opcua.NewClient(o.Config.Endpoint, o.opts...)
if err != nil {
return fmt.Errorf("error in new client: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
if err := o.Client.Connect(ctx); err != nil {
Expand All @@ -220,7 +223,7 @@ func (o *OpcUAClient) Disconnect(ctx context.Context) error {
switch u.Scheme {
case "opc.tcp":
// We can't do anything about failing to close a connection
err := o.Client.CloseWithContext(ctx)
err := o.Client.Close(ctx)
o.Client = nil
return err
default:
Expand Down
2 changes: 1 addition & 1 deletion plugins/common/opcua/input/input_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ func TestMetricForNode(t *testing.T) {
status: ua.StatusOK,
expected: metric.New("testingmetric",
map[string]string{"t1": "v1", "id": "ns=3;s=hi"},
map[string]interface{}{"Quality": "OK (0x0)", "fn": 16},
map[string]interface{}{"Quality": "The operation succeeded. StatusGood (0x0)", "fn": 16},
time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{})),
},
}
Expand Down
10 changes: 5 additions & 5 deletions plugins/inputs/opcua/opcua_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,12 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) {
"DateTime",
}
testopcquality := []string{
"OK (0x0)",
"OK (0x0)",
"OK (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
"User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)",
"OK (0x0)",
"OK (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
}
expectedopcmetrics := []telegraf.Metric{}
for i, x := range testopctags {
Expand Down
9 changes: 6 additions & 3 deletions plugins/inputs/opcua/read_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ReadClient struct {

// internal values
req *ua.ReadRequest
ctx context.Context
}

func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) {
Expand All @@ -52,7 +53,9 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient,
}

func (o *ReadClient) Connect() error {
if err := o.OpcUAClient.Connect(); err != nil {
o.ctx = context.Background()

if err := o.OpcUAClient.Connect(o.ctx); err != nil {
return fmt.Errorf("connect failed: %w", err)
}

Expand All @@ -68,7 +71,7 @@ func (o *ReadClient) Connect() error {
readValueIds = append(readValueIds, &ua.ReadValueID{NodeID: nid})
}
} else {
regResp, err := o.Client.RegisterNodes(&ua.RegisterNodesRequest{
regResp, err := o.Client.RegisterNodes(o.ctx, &ua.RegisterNodesRequest{
NodesToRegister: o.NodeIDs,
})
if err != nil {
Expand Down Expand Up @@ -133,7 +136,7 @@ func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) {
}

func (o *ReadClient) read() error {
resp, err := o.Client.Read(o.req)
resp, err := o.Client.Read(o.ctx, o.req)
if err != nil {
o.ReadError.Incr(1)
return fmt.Errorf("RegisterNodes Read failed: %w", err)
Expand Down
10 changes: 5 additions & 5 deletions plugins/inputs/opcua_listener/opcua_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,12 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
"DateTime",
}
testopcquality := []string{
"OK (0x0)",
"OK (0x0)",
"OK (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
"User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)",
"OK (0x0)",
"OK (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
}
expectedopcmetrics := []telegraf.Metric{}
for i, x := range testopctags {
Expand Down
18 changes: 10 additions & 8 deletions plugins/inputs/opcua_listener/subscribe_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type SubscribeClient struct {
dataNotifications chan *opcua.PublishNotificationData
metrics chan telegraf.Metric

processingCtx context.Context
processingCancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
}

func checkDataChangeFilterParameters(params *input.DataChangeFilter) error {
Expand Down Expand Up @@ -90,6 +90,7 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
return nil, err
}

processingCtx, processingCancel := context.WithCancel(context.Background())
subClient := &SubscribeClient{
OpcUAInputClient: client,
Config: *sc,
Expand All @@ -99,6 +100,8 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
// the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval.
dataNotifications: make(chan *opcua.PublishNotificationData, 100),
metrics: make(chan telegraf.Metric, 100),
ctx: processingCtx,
cancel: processingCancel,
}

log.Debugf("Creating monitored items")
Expand All @@ -115,13 +118,13 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
}

func (o *SubscribeClient) Connect() error {
err := o.OpcUAClient.Connect()
err := o.OpcUAClient.Connect(o.ctx)
if err != nil {
return err
}

o.Log.Debugf("Creating OPC UA subscription")
o.sub, err = o.Client.Subscribe(&opcua.SubscriptionParameters{
o.sub, err = o.Client.Subscribe(o.ctx, &opcua.SubscriptionParameters{
Interval: time.Duration(o.Config.SubscriptionInterval),
}, o.dataNotifications)
if err != nil {
Expand All @@ -144,7 +147,7 @@ func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} {
}
}
closing := o.OpcUAInputClient.Stop(ctx)
o.processingCancel()
o.cancel()
return closing
}

Expand All @@ -166,7 +169,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
return nil, err
}

resp, err := o.sub.MonitorWithContext(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...)
resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...)
if err != nil {
return nil, fmt.Errorf("failed to start monitoring items: %w", err)
}
Expand All @@ -178,7 +181,6 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
}
}

o.processingCtx, o.processingCancel = context.WithCancel(context.Background())
go o.processReceivedNotifications()

return o.metrics, nil
Expand All @@ -187,7 +189,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
func (o *SubscribeClient) processReceivedNotifications() {
for {
select {
case <-o.processingCtx.Done():
case <-o.ctx.Done():
o.Log.Debug("Processing received notifications stopped")
return

Expand Down

0 comments on commit 9824422

Please sign in to comment.