Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PAN-3249] Use Bloombits for Logs queries #127

Merged
merged 2 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,16 @@ public Optional<TransactionReceiptWithMetadata> transactionReceiptByTransactionH
*/
public List<LogWithMetadata> matchingLogs(
final long fromBlockNumber, final long toBlockNumber, final LogsQuery query) {
// rangeClosed handles the inverted from/to situations automatically with zero results.
return LongStream.rangeClosed(fromBlockNumber, toBlockNumber)
.mapToObj(blockchain::getBlockHashByNumber)
.mapToObj(blockchain::getBlockHeader)
// Use takeWhile instead of clamping on toBlockNumber/headBlockNumber because it may get an
// extra block or two for a query that has a toBlockNumber past chain head. Similarly this
// handles the case when fromBlockNumber is past chain head.
.takeWhile(Optional::isPresent)
.flatMap(Optional::stream)
.flatMap(hash -> matchingLogs(hash, query).stream())
.map(Optional::get)
.filter(header -> query.couldMatch(header.getLogsBloom()))
.flatMap(header -> matchingLogs(header.getHash(), query).stream())
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,46 @@
import org.hyperledger.besu.ethereum.core.Address;
import org.hyperledger.besu.ethereum.core.Log;
import org.hyperledger.besu.ethereum.core.LogTopic;
import org.hyperledger.besu.ethereum.core.LogsBloomFilter;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

public class LogsQuery {

private final List<Address> queryAddresses;
private final List<List<LogTopic>> queryTopics;
private final List<LogsBloomFilter> addressBlooms;
private final List<List<LogsBloomFilter>> topicsBlooms;

private LogsQuery(final List<Address> queryAddresses, final List<List<LogTopic>> queryTopics) {
this.queryAddresses = queryAddresses;
this.queryTopics = queryTopics;
addressBlooms =
this.queryAddresses.stream()
.map(LogsBloomFilter::computeBytes)
.collect(Collectors.toList());
topicsBlooms =
this.queryTopics.stream()
.map(
topics ->
topics.stream()
.filter(Objects::nonNull)
.map(LogsBloomFilter::computeBytes)
.collect(Collectors.toList()))
.collect(Collectors.toList());
}

private LogsQuery(final List<Address> addresses, final List<List<LogTopic>> topics) {
this.queryAddresses = addresses;
this.queryTopics = topics;
public boolean couldMatch(final LogsBloomFilter bloom) {
return (addressBlooms.isEmpty() || addressBlooms.stream().anyMatch(bloom::couldContain))
&& (topicsBlooms.isEmpty()
|| topicsBlooms.stream()
.allMatch(
topics -> topics.isEmpty() || topics.stream().anyMatch(bloom::couldContain)));
}

public boolean matches(final Log log) {
Expand Down Expand Up @@ -62,6 +88,20 @@ private boolean matchesTopic(final LogTopic topic, final List<LogTopic> matchCri
return matchCriteria.contains(null) || matchCriteria.contains(topic);
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final LogsQuery logsQuery = (LogsQuery) o;
return Objects.equals(queryAddresses, logsQuery.queryAddresses)
&& Objects.equals(queryTopics, logsQuery.queryTopics);
}

@Override
public int hashCode() {
return Objects.hash(queryAddresses, queryTopics);
}

public static class Builder {
private final List<Address> queryAddresses = Lists.newArrayList();
private final List<List<LogTopic>> queryTopics = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void shouldCheckMatchingLogsWhenRecordedNewBlockEvent() {
filterManager.installLogFilter(latest(), latest(), logsQuery());
recordNewBlockEvent();

verify(blockchainQueries).matchingLogs(eq(100L), eq(100L), refEq(logsQuery()));
verify(blockchainQueries).matchingLogs(eq(100L), eq(100L), eq(logsQuery()));
}

@Test
Expand All @@ -96,14 +95,14 @@ public void shouldUseHeadBlockAsFromBlockNumberWhenCheckingLogsForChanges() {
filterManager.installLogFilter(blockNum(1L), blockNum(10L), logsQuery());
recordNewBlockEvent();

verify(blockchainQueries).matchingLogs(eq(3L), eq(10L), refEq(logsQuery()));
verify(blockchainQueries).matchingLogs(eq(3L), eq(10L), eq(logsQuery()));
}

@Test
public void shouldReturnLogWhenLogFilterMatches() {
final LogWithMetadata log = logWithMetadata();
when(blockchainQueries.headBlockNumber()).thenReturn(100L);
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), refEq(logsQuery())))
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), eq(logsQuery())))
.thenReturn(Lists.newArrayList(log));

final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery());
Expand Down Expand Up @@ -163,13 +162,13 @@ public void getLogsForAbsentFilterReturnsNull() {
public void getLogsForExistingFilterReturnsResults() {
final LogWithMetadata log = logWithMetadata();
when(blockchainQueries.headBlockNumber()).thenReturn(100L);
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), refEq(logsQuery())))
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), eq(logsQuery())))
.thenReturn(Lists.newArrayList(log));

final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery());
final List<LogWithMetadata> retrievedLogs = filterManager.logs(filterId);

assertThat(retrievedLogs).isEqualToComparingFieldByFieldRecursively(Lists.newArrayList(log));
assertThat(retrievedLogs).usingRecursiveComparison().isEqualTo(Lists.newArrayList(log));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.ethereum.core.Address;
import org.hyperledger.besu.ethereum.core.Log;
import org.hyperledger.besu.ethereum.core.LogTopic;
import org.hyperledger.besu.ethereum.core.LogsBloomFilter;
import org.hyperledger.besu.util.bytes.BytesValue;

import java.util.ArrayList;
Expand All @@ -41,6 +42,7 @@ public void wildcardQueryAddressTopicReturnTrue() {
final List<LogTopic> topics = new ArrayList<>();
final Log log = new Log(address, data, topics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand All @@ -52,6 +54,7 @@ public void univariateAddressMatchReturnsTrue() {
final List<LogTopic> topics = new ArrayList<>();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), topics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand All @@ -77,6 +80,7 @@ public void multivariateAddressQueryMatchReturnsTrue() {
final List<LogTopic> topics = new ArrayList<>();
final Log log = new Log(address1, BytesValue.fromHexString("0x0102"), topics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -129,6 +133,7 @@ public void univariateTopicQueryMatchReturnTrue() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(topicsQuery).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), Lists.newArrayList(topic));

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -214,6 +219,7 @@ public void multivariateSurplusTopicMatchMultivariateNullQueryReturnTrue() {
new LogsQuery.Builder().address(address1).topics(queryParameter).build();
final Log log = new Log(address1, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -243,6 +249,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_00() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -275,6 +282,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_01() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -308,6 +316,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_02() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -344,6 +353,7 @@ public void redundantUnivariateTopicMatchMultivariateQueryReturnTrue() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -417,6 +427,7 @@ public void multivariateTopicMatchMultivariateQueryReturnTrue() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -85,14 +86,14 @@ public void newFilterWithoutAddressAndTopicsParamsInstallsEmptyLogFilter() {
final JsonRpcResponse expectedResponse = new JsonRpcSuccessResponse(request.getId(), "0x1");

final LogsQuery expectedLogsQuery = new LogsQuery.Builder().build();
when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1");
when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1");

final JsonRpcResponse actualResponse = method.response(request);

assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(filterManager)
.installLogFilter(
refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery));
refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery));
}

@Test
Expand All @@ -104,14 +105,14 @@ public void newFilterWithTopicsOnlyParamInstallsExpectedLogFilter() {

final LogsQuery expectedLogsQuery =
new LogsQuery.Builder().topics(new TopicsParameter(topics)).build();
when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1");
when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1");

final JsonRpcResponse actualResponse = method.response(request);

assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(filterManager)
.installLogFilter(
refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery));
refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery));
}

@Test
Expand All @@ -122,14 +123,14 @@ public void newFilterWithAddressOnlyParamInstallsExpectedLogFilter() {
final JsonRpcResponse expectedResponse = new JsonRpcSuccessResponse(request.getId(), "0x1");

final LogsQuery expectedLogsQuery = new LogsQuery.Builder().address(address).build();
when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1");
when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1");

final JsonRpcResponse actualResponse = method.response(request);

assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(filterManager)
.installLogFilter(
refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery));
refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery));
}

@Test
Expand All @@ -142,14 +143,14 @@ public void newFilterWithAddressAndTopicsParamInstallsExpectedLogFilter() {

final LogsQuery expectedLogsQuery =
new LogsQuery.Builder().address(address).topics(new TopicsParameter(topics)).build();
when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1");
when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1");

final JsonRpcResponse actualResponse = method.response(request);

assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(filterManager)
.installLogFilter(
refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery));
refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery));
}

private List<List<String>> topics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,32 @@
"jsonrpc": "2.0",
"method": "eth_getLogs",
"params": [{
"fromBlock": "0x17",
"toBlock": "0x17",
"fromBlock": "0x20",
"toBlock": "0x20",
"address": [],
"topics": [["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]]
"topics": [null, ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b"]]
}]
},
"response": {
"jsonrpc": "2.0",
"id": 406,
"result" : [{
"logIndex" : "0x0",
"removed": false,
"blockNumber" : "0x17",
"blockHash" : "0x3c419f39b340a4c35cc27b8f7880b779dc1abb9814ad13a2a5a55b885cc8ec2d",
"transactionHash" : "0x97a385bf570ced7821c6495b3877ddd2afd5c452f350f0d4876e98d9161389c6",
"transactionIndex" : "0x0",
"address" : "0x6295ee1b4f6dd65047762f924ecd367c17eabf8f",
"data" : "0x000000000000000000000000000000000000000000000000000000000000002a",
"topics" : ["0x65c9ac8011e286e89d02a269890f41d67ca2cc597b2c76c7c69321ff492be580"]
}]
"result": [
{
"logIndex": "0x0",
"removed": false,
"blockNumber": "0x20",
"blockHash": "0x71d59849ddd98543bdfbe8548f5eed559b07b8aaf196369f39134500eab68e53",
"transactionHash": "0xcef53f2311d7c80e9086d661e69ac11a5f3d081e28e02a9ba9b66749407ac310",
"transactionIndex": "0x0",
"address": "0x6295ee1b4f6dd65047762f924ecd367c17eabf8f",
"data": "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe9000000000000000000000000000000000000000000000000000000000000002a",
"topics": [
"0x0000000000000000000000000000000000000000000000000000000000000001",
"0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b",
"0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
]
}
]
},
"statusCode": 200
}
Loading