Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions apidef/api_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ type ExtendedPathsSet struct {
ValidateJSON []ValidatePathMeta `bson:"validate_json" json:"validate_json,omitempty"`
ValidateRequest []ValidateRequestMeta `bson:"validate_request" json:"validate_request,omitempty"`
Internal []InternalMeta `bson:"internal" json:"internal,omitempty"`
TrafficMirror []TrafficMirrorMeta `bson:"traffic_mirror" json:"traffic_mirror,omitempty"`
GoPlugin []GoPluginMeta `bson:"go_plugin" json:"go_plugin,omitempty"`
PersistGraphQL []PersistGraphQLMeta `bson:"persist_graphql" json:"persist_graphql"`
RateLimit []RateLimitMeta `bson:"rate_limit" json:"rate_limit"`
Expand Down Expand Up @@ -1664,6 +1665,24 @@ type IntrospectionCache struct {
Timeout int64 `bson:"timeout" json:"timeout"`
}

type TrafficMirrorMeta struct {
Disabled bool `bson:"disabled" json:"disabled"`
Path string `bson:"path" json:"path"`
Method string `bson:"method" json:"method"`
IgnoreCase bool `bson:"ignore_case" json:"ignore_case"`
Destinations []TrafficMirrorDestination `bson:"destinations" json:"destinations"`
SampleRate float64 `bson:"sample_rate,omitempty" json:"sample_rate,omitempty"`
Async bool `bson:"async" json:"async"`
Headers map[string]string `bson:"headers,omitempty" json:"headers,omitempty"`
}

type TrafficMirrorDestination struct {
URL string `bson:"url" json:"url"`
Headers map[string]string `bson:"headers,omitempty" json:"headers,omitempty"`
Timeout int `bson:"timeout,omitempty" json:"timeout,omitempty"` // seconds
SampleRate float64 `bson:"sample_rate,omitempty" json:"sample_rate,omitempty"`
}

// WebHookHandlerConf holds configuration related to webhook event handler.
type WebHookHandlerConf struct {
// Disabled enables/disables this webhook.
Expand Down
24 changes: 24 additions & 0 deletions gateway/api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const (
GoPlugin
PersistGraphQL
RateLimit
TrafficMirrored
)

// RequestStatus is a custom type to avoid collisions
Expand Down Expand Up @@ -123,6 +124,7 @@ const (
StatusGoPlugin RequestStatus = "Go plugin"
StatusPersistGraphQL RequestStatus = "Persist GraphQL"
StatusRateLimit RequestStatus = "Rate Limited"
StatusTrafficMirrored RequestStatus = "Traffic Mirrored"
)

type EndPointCacheMeta struct {
Expand Down Expand Up @@ -1294,6 +1296,24 @@ func (a APIDefinitionLoader) compileRateLimitPathsSpec(paths []apidef.RateLimitM
return urlSpec
}

func (a APIDefinitionLoader) compileTrafficMirrorPathsSpec(paths []apidef.TrafficMirrorMeta, stat URLStatus, conf config.Config) []URLSpec {
urlSpec := []URLSpec{}

for _, stringSpec := range paths {
if stringSpec.Disabled {
continue
}

newSpec := URLSpec{}
a.generateRegex(stringSpec.Path, &newSpec, stat, conf)
// Extend with method actions
newSpec.TrafficMirror = stringSpec
urlSpec = append(urlSpec, newSpec)
}

return urlSpec
}

func (a APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef apidef.VersionInfo, apiSpec *APISpec, conf config.Config) ([]URLSpec, bool) {
// TODO: New compiler here, needs to put data into a different structure

Expand Down Expand Up @@ -1321,6 +1341,7 @@ func (a APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef apidef.VersionIn
goPlugins := a.compileGopluginPathsSpec(apiVersionDef.ExtendedPaths.GoPlugin, GoPlugin, apiSpec, conf)
persistGraphQL := a.compilePersistGraphQLPathSpec(apiVersionDef.ExtendedPaths.PersistGraphQL, PersistGraphQL, apiSpec, conf)
rateLimitPaths := a.compileRateLimitPathsSpec(apiVersionDef.ExtendedPaths.RateLimit, RateLimit, conf)
trafficMirrorPaths := a.compileTrafficMirrorPathsSpec(apiVersionDef.ExtendedPaths.TrafficMirror, TrafficMirrored, conf)

combinedPath := []URLSpec{}
combinedPath = append(combinedPath, mockResponsePaths...)
Expand All @@ -1347,6 +1368,7 @@ func (a APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef apidef.VersionIn
combinedPath = append(combinedPath, validateJSON...)
combinedPath = append(combinedPath, internalPaths...)
combinedPath = append(combinedPath, rateLimitPaths...)
combinedPath = append(combinedPath, trafficMirrorPaths...)

return combinedPath, len(whiteListPaths) > 0
}
Expand Down Expand Up @@ -1409,6 +1431,8 @@ func (a *APISpec) getURLStatus(stat URLStatus) RequestStatus {
return StatusPersistGraphQL
case RateLimit:
return StatusRateLimit
case TrafficMirrored:
return StatusTrafficMirrored
default:
log.Error("URL Status was not one of Ignored, Blacklist or WhiteList! Blocking.")
return EndPointNotAllowed
Expand Down
1 change: 1 addition & 0 deletions gateway/api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ func (gw *Gateway) processSpec(
gw.mwAppendEnabled(&chainArray, &TransformHeaders{BaseMiddleware: baseMid.Copy()})
gw.mwAppendEnabled(&chainArray, &URLRewriteMiddleware{BaseMiddleware: baseMid.Copy()})
gw.mwAppendEnabled(&chainArray, &TransformMethod{BaseMiddleware: baseMid.Copy()})
gw.mwAppendEnabled(&chainArray, &TrafficMirrorMiddleware{BaseMiddleware: baseMid.Copy()})

// Earliest we can respond with cache get 200 ok
gw.mwAppendEnabled(&chainArray, newMockResponseMiddleware(baseMid.Copy()))
Expand Down
7 changes: 7 additions & 0 deletions gateway/model_urlspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type URLSpec struct {
GoPluginMeta GoPluginMiddleware
PersistGraphQL apidef.PersistGraphQLMeta
RateLimit apidef.RateLimitMeta
TrafficMirror apidef.TrafficMirrorMeta

IgnoreCase bool
}
Expand Down Expand Up @@ -85,6 +86,10 @@ func (u *URLSpec) modeSpecificSpec(mode URLStatus) (interface{}, bool) {
return &u.GoPluginMeta, true
case PersistGraphQL:
return &u.PersistGraphQL, true
case RateLimit:
return &u.RateLimit, true
case TrafficMirrored:
return &u.TrafficMirror, true
default:
return nil, false
}
Expand Down Expand Up @@ -135,6 +140,8 @@ func (u *URLSpec) matchesMethod(method string) bool {
return method == u.PersistGraphQL.Method
case RateLimit:
return method == u.RateLimit.Method
case TrafficMirrored:
return method == u.TrafficMirror.Method
default:
return false
}
Expand Down
244 changes: 244 additions & 0 deletions gateway/mw_traffic_mirror.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package gateway

import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/TykTechnologies/tyk/apidef"
)

// TrafficMirrorMiddleware mirrors incoming requests to configured destinations
type TrafficMirrorMiddleware struct {
*BaseMiddleware
client *http.Client
}


func (t *TrafficMirrorMiddleware) Name() string {
return "TrafficMirrorMiddleware"
}

func (t *TrafficMirrorMiddleware) EnabledForSpec() bool {
for _, version := range t.Spec.VersionData.Versions {
if len(version.ExtendedPaths.TrafficMirror) > 0 {
return true
}
}
return false
}

func (t *TrafficMirrorMiddleware) Init() {
t.BaseMiddleware.Init()
// Initialize HTTP client with reasonable defaults
t.client = &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
},
}
}

Check failure on line 50 in gateway/mw_traffic_mirror.go

View workflow job for this annotation

GitHub Actions / Go 1.24.x Redis 7

unused-parameter: parameter 'w' seems to be unused, consider removing or renaming it to match ^_ (revive)
func (t *TrafficMirrorMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int) {

Check failure on line 51 in gateway/mw_traffic_mirror.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unused-parameter: parameter 'w' seems to be unused, consider removing or renaming it to match ^_ (revive)
vInfo, _ := t.Spec.Version(r)
versionPaths := t.Spec.RxPaths[vInfo.Name]
found, meta := t.Spec.CheckSpecMatchesStatus(r, versionPaths, TrafficMirrored)

if !found {
return nil, http.StatusOK
}

mirrorSpec, ok := meta.(*apidef.TrafficMirrorMeta)
if !ok {
t.Logger().Error("Invalid mirror specification")
return nil, http.StatusOK
}

// Check global sample rate first
if mirrorSpec.SampleRate > 0 && rand.Float64() > mirrorSpec.SampleRate {
return nil, http.StatusOK
}

// Clone the request for mirroring
mirrorReq, err := t.cloneRequest(r)
if err != nil {
t.Logger().WithError(err).Error("Failed to clone request for mirroring")
return nil, http.StatusOK
}

if mirrorSpec.Async {
// Send mirrors asynchronously
go t.sendMirrors(mirrorReq, mirrorSpec)
} else {
// Send mirrors synchronously (blocks main request)
t.sendMirrors(mirrorReq, mirrorSpec)
}

return nil, http.StatusOK
}

// cloneRequest creates a deep copy of the HTTP request
func (t *TrafficMirrorMiddleware) cloneRequest(r *http.Request) (*http.Request, error) {
// Read the body
var bodyBytes []byte
if r.Body != nil {
var err error
bodyBytes, err = io.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("failed to read request body: %w", err)
}
// Restore the original request body
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
}

// Create new request
clonedReq := &http.Request{
Method: r.Method,
URL: &url.URL{},
Proto: r.Proto,
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMinor,
Header: make(http.Header),
ContentLength: r.ContentLength,
Host: r.Host,
RemoteAddr: r.RemoteAddr,
RequestURI: r.RequestURI,
}

// Deep copy URL
*clonedReq.URL = *r.URL
if r.URL.User != nil {
clonedReq.URL.User = &url.Userinfo{}
*clonedReq.URL.User = *r.URL.User
}

// Deep copy headers
for k, v := range r.Header {
clonedReq.Header[k] = make([]string, len(v))
copy(clonedReq.Header[k], v)
}

// Set body if exists
if len(bodyBytes) > 0 {
clonedReq.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
}

return clonedReq, nil
}

// sendMirrors sends the request to all configured mirror destinations
func (t *TrafficMirrorMiddleware) sendMirrors(r *http.Request, spec *apidef.TrafficMirrorMeta) {
var wg sync.WaitGroup

for _, dest := range spec.Destinations {
// Check destination-specific sample rate
if dest.SampleRate > 0 && rand.Float64() > dest.SampleRate {
continue
}

wg.Add(1)
go func(destination apidef.TrafficMirrorDestination) {
defer wg.Done()
t.sendSingleMirror(r, destination, spec)
}(dest)
}

wg.Wait()
}

// sendSingleMirror sends request to a single mirror destination
func (t *TrafficMirrorMiddleware) sendSingleMirror(r *http.Request, dest apidef.TrafficMirrorDestination, spec *apidef.TrafficMirrorMeta) {
// Clone the request for this destination
mirrorReq, err := t.cloneRequest(r)
if err != nil {
t.Logger().WithError(err).Error("Failed to clone request for mirror destination")
return
}

// Parse destination URL
destURL, err := url.Parse(dest.URL)
if err != nil {
t.Logger().WithError(err).WithField("url", dest.URL).Error("Invalid mirror destination URL")
return
}

// Update request URL to point to mirror destination
mirrorReq.URL.Scheme = destURL.Scheme
mirrorReq.URL.Host = destURL.Host

// Preserve the original path unless destination has a path
if destURL.Path != "" && destURL.Path != "/" {
mirrorReq.URL.Path = strings.TrimSuffix(destURL.Path, "/") + "/" + strings.TrimPrefix(mirrorReq.URL.Path, "/")
}

// Set destination host
mirrorReq.Host = destURL.Host

// Add global headers
for k, v := range spec.Headers {
mirrorReq.Header.Set(k, v)
}

// Add destination-specific headers
for k, v := range dest.Headers {
mirrorReq.Header.Set(k, v)
}

// Add mirroring metadata headers
mirrorReq.Header.Set("X-Tyk-Mirror", "true")
mirrorReq.Header.Set("X-Tyk-Mirror-Source", r.Host)

// Add timestamp for tracking
mirrorReq.Header.Set("X-Tyk-Mirror-Timestamp", fmt.Sprintf("%d", time.Now().Unix()))

// Create client with destination-specific timeout
client := t.client
if dest.Timeout > 0 {
client = &http.Client{
Timeout: time.Duration(dest.Timeout) * time.Second,
Transport: t.client.Transport,
}
}

// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), client.Timeout)
defer cancel()

mirrorReq = mirrorReq.WithContext(ctx)

// Send the mirrored request
resp, err := client.Do(mirrorReq)
if err != nil {
t.Logger().WithError(err).
WithField("destination", dest.URL).
Debug("Failed to send mirrored request")
return
}
defer resp.Body.Close()

// Log successful mirror (debug level to avoid spam)
t.Logger().WithField("destination", dest.URL).
WithField("status", resp.StatusCode).
Debug("Successfully sent mirrored request")

// Drain response body to allow connection reuse

Check failure on line 233 in gateway/mw_traffic_mirror.go

View workflow job for this annotation

GitHub Actions / Go 1.24.x Redis 7

Error return value of `io.Copy` is not checked (errcheck)
io.Copy(io.Discard, resp.Body)

Check failure on line 234 in gateway/mw_traffic_mirror.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `io.Copy` is not checked (errcheck)
}

// Unload cleans up resources
func (t *TrafficMirrorMiddleware) Unload() {
if t.client != nil && t.client.Transport != nil {
if transport, ok := t.client.Transport.(*http.Transport); ok {
transport.CloseIdleConnections()
}
}
}
Loading
Loading