@@ -12,6 +12,7 @@ import (
12
12
"fmt"
13
13
"net/http"
14
14
"os"
15
+ "path"
15
16
"path/filepath"
16
17
"regexp"
17
18
"slices"
@@ -1984,6 +1985,39 @@ func (r *tester) previewTransform(ctx context.Context, transformId string) ([]co
1984
1985
return preview .Documents , nil
1985
1986
}
1986
1987
1988
+ func (r * tester ) resetTransform (ctx context.Context , transformId string ) error {
1989
+ resp , err := r .esAPI .TransformResetTransform (transformId ,
1990
+ r .esAPI .TransformResetTransform .WithContext (ctx ),
1991
+ r .esAPI .TransformResetTransform .WithForce (true ),
1992
+ )
1993
+ if err != nil {
1994
+ return err
1995
+ }
1996
+ defer resp .Body .Close ()
1997
+
1998
+ if resp .IsError () {
1999
+ return fmt .Errorf ("failed to reset transform %q: %s" , transformId , resp .String ())
2000
+ }
2001
+
2002
+ return nil
2003
+ }
2004
+
2005
+ func (r * tester ) startTransform (ctx context.Context , transformId string ) error {
2006
+ resp , err := r .esAPI .TransformStartTransform (transformId ,
2007
+ r .esAPI .TransformStartTransform .WithContext (ctx ),
2008
+ )
2009
+ if err != nil {
2010
+ return err
2011
+ }
2012
+ defer resp .Body .Close ()
2013
+
2014
+ if resp .IsError () {
2015
+ return fmt .Errorf ("failed to start transform %q: %s" , transformId , resp .String ())
2016
+ }
2017
+
2018
+ return nil
2019
+ }
2020
+
1987
2021
func (r * tester ) scheduleTransform (ctx context.Context , transformId string ) error {
1988
2022
resp , err := r .esAPI .TransformScheduleNowTransform (transformId ,
1989
2023
r .esAPI .TransformScheduleNowTransform .WithContext (ctx ),
@@ -2053,9 +2087,45 @@ func (r *tester) getTransformStats(ctx context.Context, transformId string) (*tr
2053
2087
return & response .Transforms [0 ], nil
2054
2088
}
2055
2089
2090
+ func (r * tester ) checkTransformAuditMessages (ctx context.Context , transformId string ) error {
2091
+ // XXX: This is an internal API, are these audit messages available somewhere else?
2092
+ const internalTransformsPath = "/internal/transform/transforms"
2093
+ messagesPath := path .Join (internalTransformsPath , transformId , "messages" )
2094
+ query := "?sortField=timestamp&sortDirection=desc" // Required
2095
+ statusCode , body , err := r .kibanaClient .SendRequest (ctx , http .MethodGet , messagesPath + query , nil )
2096
+ if err != nil {
2097
+ return fmt .Errorf ("could not get transform audit messages: %w" , err )
2098
+ }
2099
+ if statusCode >= 400 {
2100
+ return fmt .Errorf ("could not get transform audit messages: status code %d, body: %s" , statusCode , body )
2101
+ }
2102
+
2103
+ var resp struct {
2104
+ Messages []struct {
2105
+ TransformID string `json:"transform_id"`
2106
+ Message string `json:"message"`
2107
+ Level string `json:"level"`
2108
+ Timestamp int `json:"timestamp"`
2109
+ NodeName string `json:"node_name"`
2110
+ } `json:"messages"`
2111
+ }
2112
+ err = json .Unmarshal (body , & resp )
2113
+ if err != nil {
2114
+ return fmt .Errorf ("could not decode response: %w" , err )
2115
+ }
2116
+
2117
+ for _ , message := range resp .Messages {
2118
+ if message .Level == "error" {
2119
+ return fmt .Errorf ("failure found in transform: %s" , message .Message )
2120
+ }
2121
+ }
2122
+ return nil
2123
+ }
2124
+
2056
2125
// checkRunningTransformHealth checks the following for a given transform:
2057
2126
// - That it is started.
2058
2127
// - That it can execute at least once during the check.
2128
+ // - That it hasn't generated any error message.
2059
2129
// - That it is healthy after executing at least once.
2060
2130
func (r * tester ) checkRunningTransformHealth (ctx context.Context , transformId string ) error {
2061
2131
const (
@@ -2065,6 +2135,18 @@ func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId st
2065
2135
lastSearchTime := 0
2066
2136
last := - 1
2067
2137
running := false
2138
+
2139
+ // Reset transform to clean any previous state.
2140
+ /* XXX: It fails to create the index after reset :?
2141
+ err := r.resetTransform(ctx, transformId)
2142
+ if err != nil {
2143
+ return fmt.Errorf("failed to reset transform: %w", err)
2144
+ }
2145
+ err = r.startTransform(ctx, transformId)
2146
+ if err != nil {
2147
+ return fmt.Errorf("failed to start transform after reset: %w", err)
2148
+ }
2149
+ */
2068
2150
ok , err := wait .UntilTrue (ctx , func (ctx context.Context ) (bool , error ) {
2069
2151
stats , err := r .getTransformStats (ctx , transformId )
2070
2152
if err != nil {
@@ -2106,6 +2188,12 @@ func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId st
2106
2188
return false , nil
2107
2189
}
2108
2190
2191
+ // We need to check the audit messages in case a document is removed but caused issues.
2192
+ err = r .checkTransformAuditMessages (ctx , transformId )
2193
+ if err != nil {
2194
+ return false , err
2195
+ }
2196
+
2109
2197
err = healthError (stats .Health )
2110
2198
return err == nil , err
2111
2199
}, period , timeout )
0 commit comments