3131import com .google .firebase .firestore .util .Util ;
3232import com .google .protobuf .ByteString ;
3333import java .util .ArrayList ;
34+ import java .util .Collection ;
3435import java .util .Collections ;
3536import java .util .Iterator ;
3637import java .util .List ;
38+ import java .util .Map ;
39+ import java .util .TreeMap ;
3740
3841final class MemoryMutationQueue implements MutationQueue {
3942
@@ -54,7 +57,7 @@ final class MemoryMutationQueue implements MutationQueue {
5457 * <p>Once the held write acknowledgements become visible they are removed from the head of the
5558 * queue along with any tombstones that follow.
5659 */
57- private final List < MutationBatch > queue ;
60+ private final TreeMap < Integer , MutationBatch > queue = new TreeMap <>() ;
5861
5962 /** An ordered mapping between documents and the mutation batch IDs. */
6063 private ImmutableSortedSet <DocumentReference > batchesByDocumentKey ;
@@ -74,7 +77,6 @@ final class MemoryMutationQueue implements MutationQueue {
7477
7578 MemoryMutationQueue (MemoryPersistence persistence , User user ) {
7679 this .persistence = persistence ;
77- queue = new ArrayList <>();
7880
7981 batchesByDocumentKey = new ImmutableSortedSet <>(emptyList (), DocumentReference .BY_KEY );
8082 nextBatchId = 1 ;
@@ -105,16 +107,20 @@ public boolean isEmpty() {
105107 @ Override
106108 public void acknowledgeBatch (MutationBatch batch , ByteString streamToken ) {
107109 int batchId = batch .getBatchId ();
108- int batchIndex = indexOfExistingBatchId (batchId , "acknowledged" );
109- hardAssert (batchIndex == 0 , "Can only acknowledge the first batch in the mutation queue" );
110+ hardAssert (
111+ queue .containsKey (batchId ),
112+ "Can not acknowledge unknown batch ID: %d (batch=%s)" ,
113+ batchId ,
114+ batch );
110115
111- // Verify that the batch in the queue is the one to be acknowledged.
112- MutationBatch check = queue .get (batchIndex );
116+ int firstBatchId = queue .firstKey ();
113117 hardAssert (
114- batchId == check .getBatchId (),
115- "Queue ordering failure: expected batch %d, got batch %d" ,
118+ batchId == firstBatchId ,
119+ "Can only acknowledge the first batch in the mutation queue:"
120+ + " got batchId %d, but expected %d (batch=%s)" ,
116121 batchId ,
117- check .getBatchId ());
122+ firstBatchId ,
123+ batch );
118124
119125 lastStreamToken = checkNotNull (streamToken );
120126 }
@@ -137,15 +143,17 @@ public MutationBatch addMutationBatch(
137143 int batchId = nextBatchId ;
138144 nextBatchId += 1 ;
139145
140- int size = queue .size ();
141- if (size > 0 ) {
142- MutationBatch prior = queue .get (size - 1 );
146+ if (!queue .isEmpty ()) {
147+ int priorKey = queue .lastKey ();
143148 hardAssert (
144- prior .getBatchId () < batchId , "Mutation batchIds must be monotonically increasing order" );
149+ priorKey < batchId ,
150+ "Mutation batchIds must be monotonically increasing order: priorKey=%d, batchId=%d" ,
151+ priorKey ,
152+ batchId );
145153 }
146154
147155 MutationBatch batch = new MutationBatch (batchId , localWriteTime , baseMutations , mutations );
148- queue .add ( batch );
156+ queue .put ( batchId , batch );
149157
150158 // Track references by document key and index collection parents.
151159 for (Mutation mutation : mutations ) {
@@ -161,25 +169,14 @@ public MutationBatch addMutationBatch(
161169 @ Nullable
162170 @ Override
163171 public MutationBatch lookupMutationBatch (int batchId ) {
164- int index = indexOfBatchId (batchId );
165- if (index < 0 || index >= queue .size ()) {
166- return null ;
167- }
168-
169- MutationBatch batch = queue .get (index );
170- hardAssert (batch .getBatchId () == batchId , "If found batch must match" );
171- return batch ;
172+ return queue .get (batchId );
172173 }
173174
174175 @ Nullable
175176 @ Override
176177 public MutationBatch getNextMutationBatchAfterBatchId (int batchId ) {
177- int nextBatchId = batchId + 1 ;
178-
179- // The requested batchId may still be out of range so normalize it to the start of the queue.
180- int rawIndex = indexOfBatchId (nextBatchId );
181- int index = rawIndex < 0 ? 0 : rawIndex ;
182- return queue .size () > index ? queue .get (index ) : null ;
178+ Map .Entry <Integer , MutationBatch > nextEntry = queue .higherEntry (batchId );
179+ return nextEntry == null ? null : nextEntry .getValue ();
183180 }
184181
185182 @ Override
@@ -188,8 +185,8 @@ public int getHighestUnacknowledgedBatchId() {
188185 }
189186
190187 @ Override
191- public List <MutationBatch > getAllMutationBatches () {
192- return Collections .unmodifiableList (queue );
188+ public Collection <MutationBatch > getAllMutationBatches () {
189+ return Collections .unmodifiableCollection (queue . values () );
193190 }
194191
195192 @ Override
@@ -292,12 +289,23 @@ private List<MutationBatch> lookupMutationBatches(ImmutableSortedSet<Integer> ba
292289
293290 @ Override
294291 public void removeMutationBatch (MutationBatch batch ) {
295- // Find the position of the first batch for removal. This need not be the first entry in the
296- // queue.
297- int batchIndex = indexOfExistingBatchId (batch .getBatchId (), "removed" );
298- hardAssert (batchIndex == 0 , "Can only remove the first entry of the mutation queue" );
292+ int batchId = batch .getBatchId ();
293+ hardAssert (
294+ queue .containsKey (batchId ),
295+ "Can not remove unknown batch ID: %d (batch=%s)" ,
296+ batchId ,
297+ batch );
298+
299+ int firstBatchId = queue .firstKey ();
300+ hardAssert (
301+ batchId == firstBatchId ,
302+ "Can only remove the first batch in the mutation queue:"
303+ + " got batchId %d, but expected %d (batch=%s)" ,
304+ batchId ,
305+ firstBatchId ,
306+ batch );
299307
300- queue .remove (0 );
308+ queue .remove (batchId );
301309
302310 // Remove entries from the index too.
303311 ImmutableSortedSet <DocumentReference > references = batchesByDocumentKey ;
@@ -336,45 +344,9 @@ boolean containsKey(DocumentKey key) {
336344
337345 // Helpers
338346
339- /**
340- * Finds the index of the given batchId in the mutation queue. This operation is O(1).
341- *
342- * @return The computed index of the batch with the given batchId, based on the state of the
343- * queue. Note this index can be negative if the requested batchId has already been removed
344- * from the queue or past the end of the queue if the batchId is larger than the last added
345- * batch.
346- */
347- private int indexOfBatchId (int batchId ) {
348- if (queue .isEmpty ()) {
349- // As an index this is past the end of the queue
350- return 0 ;
351- }
352-
353- // Examine the front of the queue to figure out the difference between the batchId and indexes
354- // in the array. Note that since the queue is ordered by batchId, if the first batch has a
355- // larger batchId then the requested batchId doesn't exist in the queue.
356- MutationBatch firstBatch = queue .get (0 );
357- int firstBatchId = firstBatch .getBatchId ();
358- return batchId - firstBatchId ;
359- }
360-
361- /**
362- * Finds the index of the given batchId in the mutation queue and asserts that the resulting index
363- * is within the bounds of the queue.
364- *
365- * @param batchId The batchId to search for
366- * @param action A description of what the caller is doing, phrased in passive form (for example
367- * "acknowledged" in a routine that acknowledges batches).
368- */
369- private int indexOfExistingBatchId (int batchId , String action ) {
370- int index = indexOfBatchId (batchId );
371- hardAssert (index >= 0 && index < queue .size (), "Batches must exist to be %s" , action );
372- return index ;
373- }
374-
375347 long getByteSize (LocalSerializer serializer ) {
376348 long count = 0 ;
377- for (MutationBatch batch : queue ) {
349+ for (MutationBatch batch : queue . values () ) {
378350 count += serializer .encodeMutationBatch (batch ).getSerializedSize ();
379351 }
380352 return count ;
0 commit comments