Skip to content

Commit

Permalink
Replace lambda with method reference. (apache#14757)
Browse files Browse the repository at this point in the history
  • Loading branch information
RocMarshal authored Mar 21, 2022
1 parent 804f72a commit d4e0797
Show file tree
Hide file tree
Showing 31 changed files with 517 additions and 609 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public void verifyHitsMisses() throws Exception {

List<Entry> entries = c1.readEntries(10);
assertEquals(entries.size(), 10);
entries.forEach(e -> e.release());
entries.forEach(Entry::release);

cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
Expand Down Expand Up @@ -311,7 +311,7 @@ public void verifyHitsMisses() throws Exception {
PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition();
c2.setReadPosition(pos);
ledger.discardEntriesFromCache(c2, pos);
entries.forEach(e -> e.release());
entries.forEach(Entry::release);

cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 7);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testRead() throws Exception {
entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
assertEquals(entries.size(), 10);
entries.forEach(e -> e.release());
entries.forEach(Entry::release);
counter.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,41 +87,37 @@ public void testMarkDeleteAndRead(boolean useOpenRangeSet) throws Exception {
final CountDownLatch counter = new CountDownLatch(2);
final AtomicBoolean gotException = new AtomicBoolean(false);

Thread deleter = new Thread() {
public void run() {
try {
barrier.await();

for (Position position : addedEntries) {
cursor.markDelete(position);
}
Thread deleter = new Thread(() -> {
try {
barrier.await();

} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
for (Position position : addedEntries) {
cursor.markDelete(position);
}
}
};

Thread reader = new Thread() {
public void run() {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
});

for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(e -> e.release());
}
Thread reader = new Thread(() -> {
try {
barrier.await();

} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(Entry::release);
}

} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
};
});

deleter.start();
reader.start();
Expand Down Expand Up @@ -151,61 +147,57 @@ public void testCloseAndRead() throws Exception {
final CountDownLatch counter = new CountDownLatch(2);
final AtomicBoolean gotException = new AtomicBoolean(false);

Thread deleter = new Thread() {
public void run() {
try {
barrier.await();

for (Position position : addedEntries) {
cursor.markDelete(position);
Thread.sleep(1);
}
} catch (ManagedLedgerException e) {
if (!(e instanceof ManagedLedgerException.CursorAlreadyClosedException)) {
gotException.set(true);
}
Thread deleter = new Thread(() -> {
try {
barrier.await();

} catch (Exception e) {
e.printStackTrace();
for (Position position : addedEntries) {
cursor.markDelete(position);
Thread.sleep(1);
}
} catch (ManagedLedgerException e) {
if (!(e instanceof ManagedLedgerException.CursorAlreadyClosedException)) {
gotException.set(true);
} finally {
counter.countDown();
}

} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
};
});

Thread reader = new Thread() {
public void run() {
try {
barrier.await();
Thread reader = new Thread(() -> {
try {
barrier.await();

for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(e -> e.release());
// Thread.sleep(2,200);
Thread.sleep(2, 195);
for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(Entry::release);
// Thread.sleep(2,200);
Thread.sleep(2, 195);
}
cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
@Override
public void closeComplete(Object ctx) {
log.info("Successfully closed cursor ledger");
closeFuture.complete(CLOSED);
}
cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
@Override
public void closeComplete(Object ctx) {
log.info("Successfully closed cursor ledger");
closeFuture.complete(CLOSED);
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("Error closing cursor: ", exception);
closeFuture.completeExceptionally(new Exception(exception));
}
}, null);
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("Error closing cursor: ", exception);
closeFuture.completeExceptionally(new Exception(exception));
}
}, null);

} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
};
});

deleter.start();
reader.start();
Expand Down Expand Up @@ -256,7 +248,7 @@ public void testAckAndClose() throws Exception {
barrier.await();

for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(e -> e.release());
cursor.readEntries(1).forEach(Entry::release);
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
Loading

0 comments on commit d4e0797

Please sign in to comment.