25
25
import java .util .Set ;
26
26
import java .util .concurrent .BlockingQueue ;
27
27
import java .util .concurrent .locks .Condition ;
28
- import java .util .concurrent .locks .ReentrantReadWriteLock ;
28
+ import java .util .concurrent .locks .ReentrantLock ;
29
29
import org .apache .yetus .audience .InterfaceAudience ;
30
30
31
31
/**
39
39
* {@link BlockingQueue}.</li>
40
40
* <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
41
41
* </ul>
42
+ * Assumes low-frequency and low-parallelism concurrent access, so protects state using a simplistic
43
+ * synchronization strategy.
42
44
*/
43
45
@ InterfaceAudience .Private
44
46
class RegionNormalizerWorkQueue <E > {
45
47
46
48
/** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
47
49
private LinkedHashSet <E > delegate ;
48
50
49
- /** Lock for puts and takes **/
50
- private final ReentrantReadWriteLock lock ;
51
+ // the locking structure used here follows the example found in LinkedBlockingQueue. The
52
+ // difference is that our locks guard access to `delegate` rather than the head node.
53
+
54
+ /** Lock held by take, poll, etc */
55
+ private final ReentrantLock takeLock ;
56
+
51
57
/** Wait queue for waiting takes */
52
58
private final Condition notEmpty ;
53
59
60
+ /** Lock held by put, offer, etc */
61
+ private final ReentrantLock putLock ;
62
+
54
63
RegionNormalizerWorkQueue () {
55
64
delegate = new LinkedHashSet <>();
56
- lock = new ReentrantReadWriteLock ();
57
- notEmpty = lock .writeLock ().newCondition ();
65
+ takeLock = new ReentrantLock ();
66
+ notEmpty = takeLock .newCondition ();
67
+ putLock = new ReentrantLock ();
68
+ }
69
+
70
+ /**
71
+ * Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock
72
+ * takeLock.)
73
+ */
74
+ private void signalNotEmpty () {
75
+ final ReentrantLock takeLock = this .takeLock ;
76
+ takeLock .lock ();
77
+ try {
78
+ notEmpty .signal ();
79
+ } finally {
80
+ takeLock .unlock ();
81
+ }
82
+ }
83
+
84
+ /**
85
+ * Locks to prevent both puts and takes.
86
+ */
87
+ private void fullyLock () {
88
+ putLock .lock ();
89
+ takeLock .lock ();
90
+ }
91
+
92
+ /**
93
+ * Unlocks to allow both puts and takes.
94
+ */
95
+ private void fullyUnlock () {
96
+ takeLock .unlock ();
97
+ putLock .unlock ();
58
98
}
59
99
60
100
/**
@@ -65,14 +105,16 @@ public void put(E e) {
65
105
if (e == null ) {
66
106
throw new NullPointerException ();
67
107
}
68
- lock .writeLock ().lock ();
108
+
109
+ putLock .lock ();
69
110
try {
70
111
delegate .add (e );
71
- if (!delegate .isEmpty ()) {
72
- notEmpty .signal ();
73
- }
74
112
} finally {
75
- lock .writeLock ().unlock ();
113
+ putLock .unlock ();
114
+ }
115
+
116
+ if (!delegate .isEmpty ()) {
117
+ signalNotEmpty ();
76
118
}
77
119
}
78
120
@@ -96,14 +138,16 @@ public void putAll(Collection<? extends E> c) {
96
138
if (c == null ) {
97
139
throw new NullPointerException ();
98
140
}
99
- lock .writeLock ().lock ();
141
+
142
+ putLock .lock ();
100
143
try {
101
144
delegate .addAll (c );
102
- if (!delegate .isEmpty ()) {
103
- notEmpty .signal ();
104
- }
105
145
} finally {
106
- lock .writeLock ().unlock ();
146
+ putLock .unlock ();
147
+ }
148
+
149
+ if (!delegate .isEmpty ()) {
150
+ signalNotEmpty ();
107
151
}
108
152
}
109
153
@@ -115,17 +159,19 @@ public void putAllFirst(Collection<? extends E> c) {
115
159
if (c == null ) {
116
160
throw new NullPointerException ();
117
161
}
118
- lock .writeLock ().lock ();
162
+
163
+ fullyLock ();
119
164
try {
120
165
final LinkedHashSet <E > copy = new LinkedHashSet <>(c .size () + delegate .size ());
121
166
copy .addAll (c );
122
167
copy .addAll (delegate );
123
168
delegate = copy ;
124
- if (!delegate .isEmpty ()) {
125
- notEmpty .signal ();
126
- }
127
169
} finally {
128
- lock .writeLock ().unlock ();
170
+ fullyUnlock ();
171
+ }
172
+
173
+ if (!delegate .isEmpty ()) {
174
+ signalNotEmpty ();
129
175
}
130
176
}
131
177
@@ -137,13 +183,10 @@ public void putAllFirst(Collection<? extends E> c) {
137
183
*/
138
184
public E take () throws InterruptedException {
139
185
E x ;
140
- // Take a write lock. If the delegate's queue is empty we need it to await(), which will
141
- // drop the lock, then reacquire it; or if the queue is not empty we will use an iterator
142
- // to mutate the head.
143
- lock .writeLock ().lockInterruptibly ();
186
+ takeLock .lockInterruptibly ();
144
187
try {
145
188
while (delegate .isEmpty ()) {
146
- notEmpty .await (); // await drops the lock, then reacquires it
189
+ notEmpty .await ();
147
190
}
148
191
final Iterator <E > iter = delegate .iterator ();
149
192
x = iter .next ();
@@ -152,7 +195,7 @@ public E take() throws InterruptedException {
152
195
notEmpty .signal ();
153
196
}
154
197
} finally {
155
- lock . writeLock () .unlock ();
198
+ takeLock .unlock ();
156
199
}
157
200
return x ;
158
201
}
@@ -162,11 +205,11 @@ public E take() throws InterruptedException {
162
205
* returns.
163
206
*/
164
207
public void clear () {
165
- lock . writeLock () .lock ();
208
+ putLock .lock ();
166
209
try {
167
210
delegate .clear ();
168
211
} finally {
169
- lock . writeLock () .unlock ();
212
+ putLock .unlock ();
170
213
}
171
214
}
172
215
@@ -175,21 +218,21 @@ public void clear() {
175
218
* @return the number of elements in this queue
176
219
*/
177
220
public int size () {
178
- lock . readLock () .lock ();
221
+ takeLock .lock ();
179
222
try {
180
223
return delegate .size ();
181
224
} finally {
182
- lock . readLock () .unlock ();
225
+ takeLock .unlock ();
183
226
}
184
227
}
185
228
186
229
@ Override
187
230
public String toString () {
188
- lock . readLock () .lock ();
231
+ takeLock .lock ();
189
232
try {
190
233
return delegate .toString ();
191
234
} finally {
192
- lock . readLock () .unlock ();
235
+ takeLock .unlock ();
193
236
}
194
237
}
195
238
}
0 commit comments