Skip to content

Commit e24a0b6

Browse files
authored
fix(worker): prevent concurrent map access race condition (#1143)
### Because - The pipeline worker was crashing with `fatal error: concurrent map iteration and map write` when multiple goroutines accessed workflow memory concurrently - `WorkflowMemory.Get()` returned references to the underlying map data, which were then accessed outside the mutex lock by `recipe.Eval()` → `ToJSONValue()` - This race condition caused OOM crashes during condition evaluation in `ProcessBatchConditionsActivity` ### This commit - Adds `Copy()` method to `data.Map` for creating thread-safe deep copies of map structures - Adds `Copy()` method to `data.Array` for creating thread-safe deep copies of array structures - Adds `copyValue()` helper function to recursively deep copy nested data structures (maps, arrays) - Updates `WorkflowMemory.Get()` to return deep copies of `Map` and `Array` types, preventing shared mutable state across goroutines - Adds comprehensive unit tests (`copy_test.go`) covering simple/nested structures, mixed types, nil/empty cases, and concurrent modification scenarios - Ensures primitive types are returned as-is to minimize performance overhead
1 parent bd4e10e commit e24a0b6

File tree

4 files changed

+348
-1
lines changed

4 files changed

+348
-1
lines changed

pkg/data/array.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,16 @@ func (a Array) ToJSONValue() (v any, err error) {
104104
}
105105
return jsonArr, nil
106106
}
107+
108+
// Copy creates a deep copy of the Array to prevent concurrent access issues.
109+
// This is essential when passing array data across goroutine boundaries.
110+
func (a Array) Copy() Array {
111+
if a == nil {
112+
return nil
113+
}
114+
copied := make(Array, len(a))
115+
for i, v := range a {
116+
copied[i] = copyValue(v)
117+
}
118+
return copied
119+
}

pkg/data/copy_test.go

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
package data
2+
3+
import (
4+
"testing"
5+
6+
"github.com/instill-ai/pipeline-backend/pkg/data/format"
7+
)
8+
9+
func TestMapCopy(t *testing.T) {
10+
t.Run("Copy simple map", func(t *testing.T) {
11+
original := Map{
12+
"key1": NewString("value1"),
13+
"key2": NewNumberFromInteger(42),
14+
"key3": NewBoolean(true),
15+
}
16+
17+
copied := original.Copy()
18+
19+
// Verify copied map has same values
20+
if !copied.Equal(original) {
21+
t.Error("Copied map should equal original")
22+
}
23+
24+
// Verify it's a deep copy by modifying original
25+
original["key1"] = NewString("modified")
26+
if copied["key1"].(*stringData).String() != "value1" {
27+
t.Error("Modifying original should not affect copy")
28+
}
29+
})
30+
31+
t.Run("Copy nested map", func(t *testing.T) {
32+
original := Map{
33+
"outer": Map{
34+
"inner": Map{
35+
"value": NewString("nested"),
36+
},
37+
},
38+
}
39+
40+
copied := original.Copy()
41+
42+
// Verify it's a deep copy
43+
originalInner := original["outer"].(Map)["inner"].(Map)
44+
originalInner["value"] = NewString("modified")
45+
46+
copiedInner := copied["outer"].(Map)["inner"].(Map)
47+
if copiedInner["value"].(*stringData).String() != "nested" {
48+
t.Error("Modifying nested original should not affect copy")
49+
}
50+
})
51+
52+
t.Run("Copy map with array", func(t *testing.T) {
53+
original := Map{
54+
"array": Array{
55+
NewString("item1"),
56+
NewString("item2"),
57+
},
58+
}
59+
60+
copied := original.Copy()
61+
62+
// Modify original array
63+
originalArray := original["array"].(Array)
64+
originalArray[0] = NewString("modified")
65+
66+
// Verify copy is unaffected
67+
copiedArray := copied["array"].(Array)
68+
if copiedArray[0].(*stringData).String() != "item1" {
69+
t.Error("Modifying array in original should not affect copy")
70+
}
71+
})
72+
73+
t.Run("Copy nil map", func(t *testing.T) {
74+
var original Map
75+
copied := original.Copy()
76+
77+
if copied != nil {
78+
t.Error("Copy of nil map should be nil")
79+
}
80+
})
81+
82+
t.Run("Copy empty map", func(t *testing.T) {
83+
original := Map{}
84+
copied := original.Copy()
85+
86+
if copied == nil {
87+
t.Error("Copy of empty map should not be nil")
88+
}
89+
if len(copied) != 0 {
90+
t.Error("Copy of empty map should be empty")
91+
}
92+
})
93+
}
94+
95+
func TestArrayCopy(t *testing.T) {
96+
t.Run("Copy simple array", func(t *testing.T) {
97+
original := Array{
98+
NewString("value1"),
99+
NewNumberFromInteger(42),
100+
NewBoolean(true),
101+
}
102+
103+
copied := original.Copy()
104+
105+
// Verify copied array has same values
106+
if !copied.Equal(original) {
107+
t.Error("Copied array should equal original")
108+
}
109+
110+
// Verify it's a deep copy by modifying original
111+
original[0] = NewString("modified")
112+
if copied[0].(*stringData).String() != "value1" {
113+
t.Error("Modifying original should not affect copy")
114+
}
115+
})
116+
117+
t.Run("Copy nested array", func(t *testing.T) {
118+
original := Array{
119+
Array{
120+
Array{
121+
NewString("nested"),
122+
},
123+
},
124+
}
125+
126+
copied := original.Copy()
127+
128+
// Verify it's a deep copy
129+
originalInner := original[0].(Array)[0].(Array)
130+
originalInner[0] = NewString("modified")
131+
132+
copiedInner := copied[0].(Array)[0].(Array)
133+
if copiedInner[0].(*stringData).String() != "nested" {
134+
t.Error("Modifying nested original should not affect copy")
135+
}
136+
})
137+
138+
t.Run("Copy array with map", func(t *testing.T) {
139+
original := Array{
140+
Map{
141+
"key": NewString("value"),
142+
},
143+
}
144+
145+
copied := original.Copy()
146+
147+
// Modify original map
148+
originalMap := original[0].(Map)
149+
originalMap["key"] = NewString("modified")
150+
151+
// Verify copy is unaffected
152+
copiedMap := copied[0].(Map)
153+
if copiedMap["key"].(*stringData).String() != "value" {
154+
t.Error("Modifying map in original should not affect copy")
155+
}
156+
})
157+
158+
t.Run("Copy nil array", func(t *testing.T) {
159+
var original Array
160+
copied := original.Copy()
161+
162+
if copied != nil {
163+
t.Error("Copy of nil array should be nil")
164+
}
165+
})
166+
167+
t.Run("Copy empty array", func(t *testing.T) {
168+
original := Array{}
169+
copied := original.Copy()
170+
171+
if copied == nil {
172+
t.Error("Copy of empty array should not be nil")
173+
}
174+
if len(copied) != 0 {
175+
t.Error("Copy of empty array should be empty")
176+
}
177+
})
178+
}
179+
180+
func TestCopyValue(t *testing.T) {
181+
t.Run("Copy nil value", func(t *testing.T) {
182+
var original format.Value
183+
copied := copyValue(original)
184+
185+
if copied != nil {
186+
t.Error("Copy of nil value should be nil")
187+
}
188+
})
189+
190+
t.Run("Copy primitive types", func(t *testing.T) {
191+
// Primitive types should be returned as-is since they're immutable
192+
tests := []format.Value{
193+
NewString("test"),
194+
NewNumberFromInteger(123),
195+
NewBoolean(true),
196+
NewNull(),
197+
}
198+
199+
for _, original := range tests {
200+
copied := copyValue(original)
201+
if !copied.Equal(original) {
202+
t.Errorf("Copied value should equal original for type %T", original)
203+
}
204+
}
205+
})
206+
207+
t.Run("Copy complex nested structure", func(t *testing.T) {
208+
original := Map{
209+
"users": Array{
210+
Map{
211+
"name": NewString("Alice"),
212+
"age": NewNumberFromInteger(30),
213+
"tags": Array{
214+
NewString("admin"),
215+
NewString("active"),
216+
},
217+
},
218+
Map{
219+
"name": NewString("Bob"),
220+
"age": NewNumberFromInteger(25),
221+
"tags": Array{
222+
NewString("user"),
223+
},
224+
},
225+
},
226+
"metadata": Map{
227+
"version": NewNumberFromInteger(1),
228+
"active": NewBoolean(true),
229+
},
230+
}
231+
232+
copied := copyValue(original).(Map)
233+
234+
// Verify it's a deep copy
235+
originalUsers := original["users"].(Array)
236+
originalFirstUser := originalUsers[0].(Map)
237+
originalFirstUser["name"] = NewString("Modified")
238+
239+
copiedUsers := copied["users"].(Array)
240+
copiedFirstUser := copiedUsers[0].(Map)
241+
if copiedFirstUser["name"].(*stringData).String() != "Alice" {
242+
t.Error("Deep nested modification should not affect copy")
243+
}
244+
})
245+
}
246+
247+
// TestCopyConcurrency verifies that Copy() prevents race conditions
248+
func TestCopyConcurrency(t *testing.T) {
249+
original := Map{
250+
"key1": NewString("value1"),
251+
"key2": Map{
252+
"nested": NewString("nested_value"),
253+
},
254+
}
255+
256+
// Create a copy
257+
copied := original.Copy()
258+
259+
// Simulate concurrent modifications on original
260+
done := make(chan bool)
261+
go func() {
262+
for i := 0; i < 100; i++ {
263+
original["key1"] = NewString("modified")
264+
original["new_key"] = NewNumberFromInteger(i)
265+
}
266+
done <- true
267+
}()
268+
269+
// Simultaneously access the copy (should not race)
270+
go func() {
271+
for i := 0; i < 100; i++ {
272+
_ = copied["key1"]
273+
_ = copied["key2"]
274+
}
275+
done <- true
276+
}()
277+
278+
<-done
279+
<-done
280+
281+
// Verify copy is unaffected by modifications to original
282+
if copied["key1"].(*stringData).String() != "value1" {
283+
t.Error("Copy should be unaffected by concurrent modifications to original")
284+
}
285+
if _, exists := copied["new_key"]; exists {
286+
t.Error("Copy should not have new keys added to original")
287+
}
288+
}

pkg/data/map.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,34 @@ func (m Map) ToJSONValue() (v any, err error) {
9393
}
9494
return jsonMap, nil
9595
}
96+
97+
// Copy creates a deep copy of the Map to prevent concurrent access issues.
98+
// This is essential when passing map data across goroutine boundaries.
99+
func (m Map) Copy() Map {
100+
if m == nil {
101+
return nil
102+
}
103+
copied := make(Map, len(m))
104+
for k, v := range m {
105+
copied[k] = copyValue(v)
106+
}
107+
return copied
108+
}
109+
110+
// copyValue creates a deep copy of any format.Value type.
111+
func copyValue(v format.Value) format.Value {
112+
if v == nil {
113+
return nil
114+
}
115+
116+
switch val := v.(type) {
117+
case Map:
118+
return val.Copy()
119+
case Array:
120+
return val.Copy()
121+
default:
122+
// For primitive types (string, number, boolean, etc.) which are
123+
// either immutable or contain no shared mutable state, return as-is.
124+
return v
125+
}
126+
}

pkg/memory/workflow.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,23 @@ func (wfm *WorkflowMemory) Get(ctx context.Context, batchIdx int, p string) (mem
340340
if err != nil {
341341
return nil, err
342342
}
343-
return wfm.data[batchIdx].Get(pt)
343+
value, err := wfm.data[batchIdx].Get(pt)
344+
if err != nil {
345+
return nil, err
346+
}
344347

348+
// Return a deep copy to prevent concurrent map access issues.
349+
// This is critical because the returned value may be accessed outside
350+
// the mutex lock (e.g., in recipe.Eval -> ToJSONValue).
351+
switch v := value.(type) {
352+
case data.Map:
353+
return v.Copy(), nil
354+
case data.Array:
355+
return v.Copy(), nil
356+
default:
357+
// Primitive types are safe to return as-is
358+
return value, nil
359+
}
345360
}
346361

347362
func (wfm *WorkflowMemory) GetBatchSize() int {

0 commit comments

Comments
 (0)