Skip to content

Commit fe6dbe9

Browse files
authored
fix: add mutex around calls to render (#72)
* fix: add mutex around calls to render Signed-off-by: Jonathan Ogilvie <jonathan.ogilvie@sumologic.com> * Add test coverage for new serial render pkg Signed-off-by: Jonathan Ogilvie <jonathan.ogilvie@sumologic.com> * Fix lint Signed-off-by: Jonathan Ogilvie <jonathan.ogilvie@sumologic.com> * More lint Signed-off-by: Jonathan Ogilvie <jonathan.ogilvie@sumologic.com> * Dumb down queue objects Signed-off-by: Jonathan Ogilvie <jonathan.ogilvie@sumologic.com> * Simplify renderfunc tests Signed-off-by: Jonathan Ogilvie <jonathan.ogilvie@sumologic.com> --------- Signed-off-by: Jonathan Ogilvie <jonathan.ogilvie@sumologic.com>
1 parent 6c8e610 commit fe6dbe9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1178
-26
lines changed

cmd/diff/cmd_utils.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818

1919
import (
2020
"context"
21+
"sync"
2122
"time"
2223

2324
"github.com/alecthomas/kong"
@@ -26,10 +27,15 @@ import (
2627

2728
"github.com/crossplane/crossplane-runtime/v2/pkg/errors"
2829
"github.com/crossplane/crossplane-runtime/v2/pkg/logging"
29-
30-
"github.com/crossplane/crossplane/v2/cmd/crank/render"
3130
)
3231

32+
// globalRenderMutex serializes all render operations globally across all diff processors.
33+
// This prevents concurrent Docker container operations that can overwhelm the Docker daemon
34+
// when processing multiple XRs with the same functions. See issue #59.
35+
//
36+
//nolint:gochecknoglobals // Required for global serialization across processors
37+
var globalRenderMutex sync.Mutex
38+
3339
// CommonCmdFields contains common fields shared by both XR and Comp commands.
3440
type CommonCmdFields struct {
3541
// Configuration options
@@ -101,6 +107,5 @@ func defaultProcessorOptions() []dp.ProcessorOption {
101107
return []dp.ProcessorOption{
102108
dp.WithColorize(true),
103109
dp.WithCompact(false),
104-
dp.WithRenderFunc(render.Render),
105110
}
106111
}

cmd/diff/comp.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func makeDefaultCompProc(c *CompCmd, ctx *AppContext, log logging.Logger) dp.Com
9595
dp.WithLogger(log),
9696
dp.WithColorize(!c.NoColor), // Override default if NoColor is set
9797
dp.WithCompact(c.Compact), // Override default if Compact is set
98+
dp.WithRenderMutex(&globalRenderMutex),
9899
)
99100

100101
// Create XR processor first (peer processor)

cmd/diff/diff_integration_test.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"os"
87
"path/filepath"
98
run "runtime"
109
"strconv"
@@ -153,26 +152,12 @@ func runIntegrationTest(t *testing.T, testType DiffTestType, tests map[string]In
153152
}
154153

155154
// Set up the test files
156-
tempDir := t.TempDir()
157-
158155
var testFiles []string
159156

160157
// Handle any additional input files
161-
for i, inputFile := range tt.inputFiles {
162-
testFile := filepath.Join(tempDir, fmt.Sprintf("test_%d.yaml", i))
163-
164-
content, err := os.ReadFile(inputFile)
165-
if err != nil {
166-
t.Fatalf("failed to read input file: %v", err)
167-
}
168-
169-
err = os.WriteFile(testFile, content, 0o644)
170-
if err != nil {
171-
t.Fatalf("failed to write test file: %v", err)
172-
}
173-
174-
testFiles = append(testFiles, testFile)
175-
}
158+
// Note: NewCompositeLoader handles both individual files and directories,
159+
// so we can pass paths directly without special handling
160+
testFiles = append(testFiles, tt.inputFiles...)
176161

177162
// Create a buffer to capture the output
178163
var stdout bytes.Buffer
@@ -1302,6 +1287,25 @@ Summary: 1 added`,
13021287
expectedError: false,
13031288
noColor: true,
13041289
},
1290+
"Concurrent rendering with multiple functions and XRs from directory": {
1291+
// This test reproduces issue #59 - concurrent function startup failures
1292+
// when processing multiple XR files from a directory
1293+
inputFiles: []string{
1294+
"testdata/diff/concurrent-xrs", // Pass the directory containing all XR files
1295+
},
1296+
setupFiles: []string{
1297+
"testdata/diff/resources/xrd-concurrent.yaml",
1298+
"testdata/diff/resources/composition-multi-functions.yaml",
1299+
"testdata/diff/resources/functions.yaml",
1300+
},
1301+
// We expect successful processing of all 5 XRs
1302+
// Each XR should produce 3 base resources + 2 additional resources = 5 resources per XR
1303+
// Plus the XR itself = 6 additions per XR
1304+
// Total: 5 XRs * 6 additions = 30 additions
1305+
expectedOutput: "Summary: 30 added",
1306+
expectedError: false,
1307+
noColor: true,
1308+
},
13051309
}
13061310

13071311
runIntegrationTest(t, XRDiffTest, tests)

cmd/diff/diffprocessor/diff_processor.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
k8 "github.com/crossplane-contrib/crossplane-diff/cmd/diff/client/kubernetes"
1313
"github.com/crossplane-contrib/crossplane-diff/cmd/diff/renderer"
1414
dt "github.com/crossplane-contrib/crossplane-diff/cmd/diff/renderer/types"
15+
"github.com/crossplane-contrib/crossplane-diff/cmd/diff/serial"
1516
"github.com/crossplane-contrib/crossplane-diff/cmd/diff/types"
1617
un "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1718
"k8s.io/apimachinery/pkg/runtime"
@@ -70,6 +71,12 @@ func NewDiffProcessor(k8cs k8.Clients, xpcs xp.Clients, opts ...ProcessorOption)
7071
// Set default factory functions if not provided
7172
config.SetDefaultFactories()
7273

74+
// Wrap the RenderFunc with serialization if a mutex was provided
75+
// This transparently handles serialization without requiring callers to worry about it
76+
if config.RenderMutex != nil {
77+
config.RenderFunc = serial.RenderFunc(config.RenderFunc, config.RenderMutex)
78+
}
79+
7380
// Create the diff options based on configuration
7481
diffOpts := config.GetDiffOptions()
7582

@@ -210,6 +217,9 @@ func (p *DefaultDiffProcessor) DiffSingleResource(ctx context.Context, res *un.U
210217
return nil, errors.Wrap(err, "cannot get functions from pipeline")
211218
}
212219

220+
// Note: Serialization mutex prevents concurrent Docker operations.
221+
// In e2e tests, named Docker containers (via annotations) reuse containers across renders.
222+
213223
// Apply XRD defaults before rendering
214224
err = p.applyXRDDefaults(ctx, xr, resourceID)
215225
if err != nil {

cmd/diff/diffprocessor/processor_config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package diffprocessor
22

33
import (
4+
"sync"
5+
46
xp "github.com/crossplane-contrib/crossplane-diff/cmd/diff/client/crossplane"
57
k8 "github.com/crossplane-contrib/crossplane-diff/cmd/diff/client/kubernetes"
68
"github.com/crossplane-contrib/crossplane-diff/cmd/diff/renderer"
@@ -25,6 +27,9 @@ type ProcessorConfig struct {
2527
// RenderFunc is the function to use for rendering resources
2628
RenderFunc RenderFunc
2729

30+
// RenderMutex is the mutex used to serialize render operations (for internal use)
31+
RenderMutex *sync.Mutex
32+
2833
// Factories provide factory functions for creating components
2934
Factories ComponentFactories
3035
}
@@ -85,6 +90,13 @@ func WithRenderFunc(renderFn RenderFunc) ProcessorOption {
8590
}
8691
}
8792

93+
// WithRenderMutex sets the mutex for serializing render operations.
94+
func WithRenderMutex(mu *sync.Mutex) ProcessorOption {
95+
return func(config *ProcessorConfig) {
96+
config.RenderMutex = mu
97+
}
98+
}
99+
88100
// WithResourceManagerFactory sets the ResourceManager factory function.
89101
func WithResourceManagerFactory(factory func(k8.ResourceClient, xp.DefinitionClient, logging.Logger) ResourceManager) ProcessorOption {
90102
return func(config *ProcessorConfig) {

cmd/diff/serial/serial.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright 2025 The Crossplane Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package serial provides utilities for serializing render operations.
18+
package serial
19+
20+
import (
21+
"context"
22+
"sync"
23+
"time"
24+
25+
"github.com/crossplane/crossplane-runtime/v2/pkg/logging"
26+
27+
"github.com/crossplane/crossplane/v2/cmd/crank/render"
28+
)
29+
30+
// RenderFunc wraps a render function to serialize all render calls using the provided mutex.
31+
// This prevents concurrent Docker container operations that can overwhelm the Docker daemon
32+
// when processing many XRs with the same functions. The serialization ensures:
33+
//
34+
// 1. Only one render operation runs at a time globally
35+
// 2. Named Docker containers (via annotations) can be reused safely between renders
36+
// 3. Container startup races are eliminated
37+
//
38+
// For e2e tests, combine this with versioned named container annotations for optimal performance.
39+
// For production, this works without requiring users to annotate their Function resources.
40+
func RenderFunc(
41+
renderFunc func(context.Context, logging.Logger, render.Inputs) (render.Outputs, error),
42+
mu *sync.Mutex,
43+
) func(context.Context, logging.Logger, render.Inputs) (render.Outputs, error) {
44+
renderCount := 0
45+
46+
return func(ctx context.Context, log logging.Logger, in render.Inputs) (render.Outputs, error) {
47+
mu.Lock()
48+
defer mu.Unlock()
49+
50+
renderCount++
51+
log.Debug("Starting serialized render",
52+
"renderNumber", renderCount,
53+
"functionCount", len(in.Functions))
54+
55+
start := time.Now()
56+
result, err := renderFunc(ctx, log, in)
57+
duration := time.Since(start)
58+
59+
if err != nil {
60+
log.Debug("Render completed with error",
61+
"renderNumber", renderCount,
62+
"error", err,
63+
"duration", duration)
64+
} else {
65+
log.Debug("Render completed successfully",
66+
"renderNumber", renderCount,
67+
"duration", duration,
68+
"composedResourceCount", len(result.ComposedResources))
69+
}
70+
71+
return result, err
72+
}
73+
}

cmd/diff/serial/serial_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Copyright 2025 The Crossplane Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package serial
18+
19+
import (
20+
"context"
21+
"errors"
22+
"sync"
23+
"sync/atomic"
24+
"testing"
25+
"time"
26+
27+
"github.com/crossplane/crossplane-runtime/v2/pkg/logging"
28+
"github.com/crossplane/crossplane-runtime/v2/pkg/resource/unstructured/composed"
29+
30+
pkgv1 "github.com/crossplane/crossplane/v2/apis/pkg/v1"
31+
"github.com/crossplane/crossplane/v2/cmd/crank/render"
32+
)
33+
34+
func TestRenderFunc_Passthrough(t *testing.T) {
35+
type ctxKey string
36+
key := ctxKey("test")
37+
ctx := context.WithValue(t.Context(), key, "test-value")
38+
inputs := render.Inputs{Functions: []pkgv1.Function{{}, {}}}
39+
40+
var mu sync.Mutex
41+
mockFunc := func(ctx context.Context, _ logging.Logger, in render.Inputs) (render.Outputs, error) {
42+
// Verify context is passed through
43+
if ctx.Value(key) != "test-value" {
44+
t.Error("context not passed through")
45+
}
46+
// Verify inputs are passed through
47+
if len(in.Functions) != 2 {
48+
t.Errorf("expected 2 functions, got %d", len(in.Functions))
49+
}
50+
return render.Outputs{ComposedResources: []composed.Unstructured{*composed.New(), *composed.New()}}, nil
51+
}
52+
53+
serialized := RenderFunc(mockFunc, &mu)
54+
outputs, err := serialized(ctx, logging.NewNopLogger(), inputs)
55+
56+
if err != nil {
57+
t.Fatalf("unexpected error: %v", err)
58+
}
59+
// Verify outputs are returned
60+
if len(outputs.ComposedResources) != 2 {
61+
t.Errorf("expected 2 composed resources, got %d", len(outputs.ComposedResources))
62+
}
63+
}
64+
65+
func TestRenderFunc_Error(t *testing.T) {
66+
var mu sync.Mutex
67+
expectedErr := errors.New("render failed")
68+
69+
mockFunc := func(_ context.Context, _ logging.Logger, _ render.Inputs) (render.Outputs, error) {
70+
return render.Outputs{}, expectedErr
71+
}
72+
73+
serialized := RenderFunc(mockFunc, &mu)
74+
_, err := serialized(t.Context(), logging.NewNopLogger(), render.Inputs{})
75+
76+
if !errors.Is(err, expectedErr) {
77+
t.Errorf("expected error %v, got %v", expectedErr, err)
78+
}
79+
}
80+
81+
func TestRenderFunc_Serialization(t *testing.T) {
82+
var mu sync.Mutex
83+
var concurrentCount atomic.Int32
84+
var maxConcurrent atomic.Int32
85+
86+
mockFunc := func(_ context.Context, _ logging.Logger, _ render.Inputs) (render.Outputs, error) {
87+
current := concurrentCount.Add(1)
88+
89+
// Update maxConcurrent if needed
90+
for {
91+
maxVal := maxConcurrent.Load()
92+
if current <= maxVal || maxConcurrent.CompareAndSwap(maxVal, current) {
93+
break
94+
}
95+
}
96+
97+
time.Sleep(10 * time.Millisecond)
98+
concurrentCount.Add(-1)
99+
100+
return render.Outputs{}, nil
101+
}
102+
103+
serialized := RenderFunc(mockFunc, &mu)
104+
105+
// Run multiple renders concurrently
106+
const numCalls = 10
107+
var wg sync.WaitGroup
108+
wg.Add(numCalls)
109+
110+
for range numCalls {
111+
go func() {
112+
defer wg.Done()
113+
if _, err := serialized(t.Context(), logging.NewNopLogger(), render.Inputs{}); err != nil {
114+
t.Errorf("unexpected error: %v", err)
115+
}
116+
}()
117+
}
118+
119+
wg.Wait()
120+
121+
// Verify that only one render ran at a time
122+
if maxVal := maxConcurrent.Load(); maxVal != 1 {
123+
t.Errorf("expected max concurrent executions to be 1, got %d", maxVal)
124+
}
125+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
apiVersion: concurrent.diff.example.org/v1alpha1
2+
kind: XConcurrentTest
3+
metadata:
4+
name: concurrent-test-1
5+
namespace: default
6+
spec:
7+
config: "test-config-1"
8+
dataField: "data-1"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
apiVersion: concurrent.diff.example.org/v1alpha1
2+
kind: XConcurrentTest
3+
metadata:
4+
name: concurrent-test-2
5+
namespace: default
6+
spec:
7+
config: "test-config-2"
8+
dataField: "data-2"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
apiVersion: concurrent.diff.example.org/v1alpha1
2+
kind: XConcurrentTest
3+
metadata:
4+
name: concurrent-test-3
5+
namespace: default
6+
spec:
7+
config: "test-config-3"
8+
dataField: "data-3"

0 commit comments

Comments
 (0)