6
6
"fmt"
7
7
"os"
8
8
"strings"
9
+ "sync"
9
10
"testing"
10
11
"time"
11
12
@@ -60,7 +61,7 @@ func doTest(t *testing.T, withHeader bool) {
60
61
61
62
// Start syncer A with one key
62
63
t .Log ("Starting syncer A" )
63
- go runSync (ctxA , syncerA )
64
+ goRunSync (ctxA , syncerA )
64
65
65
66
t .Log ("----------" )
66
67
@@ -71,7 +72,7 @@ func doTest(t *testing.T, withHeader bool) {
71
72
// Starting with an empty LMDB is a special case that will not trigger any
72
73
// local snapshot.
73
74
t .Log ("Starting syncer B" )
74
- go runSync (ctxB , syncerB )
75
+ goRunSync (ctxB , syncerB )
75
76
76
77
t .Log ("----------" )
77
78
@@ -98,7 +99,7 @@ func doTest(t *testing.T, withHeader bool) {
98
99
cancelA ()
99
100
ctxA , cancelA = context .WithCancel (ctx )
100
101
t .Log ("----------" )
101
- go runSync (ctxA , syncerA )
102
+ goRunSync (ctxA , syncerA )
102
103
103
104
t .Log ("----------" )
104
105
@@ -119,7 +120,7 @@ func doTest(t *testing.T, withHeader bool) {
119
120
t .Log ("----------" )
120
121
t .Log ("Starting syncer A again" )
121
122
ctxA , cancelA = context .WithCancel (ctx )
122
- go runSync (ctxA , syncerA )
123
+ goRunSync (ctxA , syncerA )
123
124
t .Log ("----------" )
124
125
125
126
// New value in A should get synced to B
@@ -191,6 +192,32 @@ func requireSnapshotsLenWait(t *testing.T, st simpleblob.Interface, expLen int,
191
192
require .Len (t , list , expLen , instance )
192
193
}
193
194
195
+ // Ensure that we there are never two Sync goroutines running at the same time,
196
+ // because this can cause a data race.
197
+ // Protected by mutex, just in case the test is ever run in parallel mode.
198
+ var (
199
+ runningSyncersMu sync.Mutex
200
+ runningSyncers = map [* Syncer ]* sync.WaitGroup {}
201
+ )
202
+
203
+ func goRunSync (ctx context.Context , syncer * Syncer ) {
204
+ runningSyncersMu .Lock ()
205
+ wg , exists := runningSyncers [syncer ]
206
+ if ! exists {
207
+ wg = & sync.WaitGroup {}
208
+ runningSyncers [syncer ] = wg
209
+ }
210
+ runningSyncersMu .Unlock ()
211
+ go func () {
212
+ logrus .Info ("Wait for any previous Syncer instance to exit" )
213
+ wg .Wait ()
214
+ logrus .Info ("Wait for any previous Syncer done" )
215
+ wg .Add (1 )
216
+ defer wg .Done ()
217
+ runSync (ctx , syncer )
218
+ }()
219
+ }
220
+
194
221
func runSync (ctx context.Context , syncer * Syncer ) {
195
222
err := syncer .Sync (ctx )
196
223
if err != nil && err != context .Canceled {
0 commit comments