Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Events Plugin - Add initial "NewBlockPropagated" event message (#1463)
Browse files Browse the repository at this point in the history
* It returns for any block we would broadcast to other peers, when
  we would broadcast them.
* It returns a JSON String containing hash, number, and timestamp
* This event data is not set in stone, it may change in type or content.
* Acceptance tests and unit tests got a re-work away from the assumption
  that there is only one plugin type.
  • Loading branch information
Danno Ferrin authored May 20, 2019
1 parent 0ef1d8f commit 9ab506b
Show file tree
Hide file tree
Showing 17 changed files with 494 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.tests.acceptance.plugins;

import tech.pegasys.pantheon.tests.acceptance.dsl.AcceptanceTestBase;
import tech.pegasys.pantheon.tests.acceptance.dsl.node.PantheonNode;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;

public class PantheonEventsPluginTest extends AcceptanceTestBase {
private PantheonNode pluginNode;
private PantheonNode minerNode;

@Before
public void setUp() throws Exception {
minerNode = pantheon.createMinerNode("minerNode");
pluginNode =
pantheon.createPluginsNode(
"node1", Collections.singletonList("testPlugin"), Collections.emptyList());
cluster.start(pluginNode, minerNode);
}

@Test
public void blockIsAnnounded() {
waitForFile(pluginNode.homeDirectory().resolve("plugins/newBlock.2"));
}

private void waitForFile(final Path path) {
final File file = path.toFile();
Awaitility.waitAtMost(30, TimeUnit.SECONDS)
.until(
() -> {
if (file.exists()) {
try (final Stream<String> s = Files.lines(path)) {
return s.count() > 0;
}
} else {
return false;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.junit.Ignore;
import org.junit.Test;

public class PluginsAcceptanceTest extends AcceptanceTestBase {
public class PicoCLIOptionsPluginTest extends AcceptanceTestBase {
private PantheonNode node;

// context: https://en.wikipedia.org/wiki/The_Magic_Words_are_Squeamish_Ossifrage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public EthContext ethContext() {
return ethContext;
}

public BlockBroadcaster getBlockBroadcaster() {
return blockBroadcaster;
}

@Override
public String getSupportedProtocol() {
return EthProtocol.NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,34 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.function.Consumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class BlockBroadcaster {
private static final Logger LOG = LogManager.getLogger();

private final EthContext ethContext;
private final Subscribers<Consumer<Block>> blockPropagatedSubscribers = new Subscribers<>();

public BlockBroadcaster(final EthContext ethContext) {
this.ethContext = ethContext;
}

public long subscribePropagateNewBlocks(final Consumer<Block> callback) {
return blockPropagatedSubscribers.subscribe(callback);
}

public void unsubscribePropagateNewBlocks(final long id) {
blockPropagatedSubscribers.unsubscribe(id);
}

public void propagate(final Block block, final UInt256 totalDifficulty) {
blockPropagatedSubscribers.forEach(listener -> listener.accept(block));
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
ethContext
.getEthPeers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public DefaultSynchronizer(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final WorldStateStorage worldStateStorage,
final BlockBroadcaster blockBroadcaster,
final EthContext ethContext,
final SyncState syncState,
final Path dataDirectory,
Expand All @@ -78,7 +79,7 @@ public DefaultSynchronizer(
syncState,
new PendingBlocks(),
metricsSystem,
new BlockBroadcaster(ethContext));
blockBroadcaster);

this.fullSyncDownloader =
new FullSyncDownloader<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@
import tech.pegasys.pantheon.metrics.prometheus.PrometheusMetricsSystem;
import tech.pegasys.pantheon.metrics.vertx.VertxMetricsAdapterFactory;
import tech.pegasys.pantheon.plugins.internal.PantheonPluginContextImpl;
import tech.pegasys.pantheon.plugins.services.PantheonEvents;
import tech.pegasys.pantheon.plugins.services.PicoCLIOptions;
import tech.pegasys.pantheon.services.PantheonEventsImpl;
import tech.pegasys.pantheon.services.PicoCLIOptionsImpl;
import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration;
import tech.pegasys.pantheon.util.BlockImporter;
import tech.pegasys.pantheon.util.InvalidConfigurationException;
Expand Down Expand Up @@ -644,9 +647,7 @@ public void parse(
"Ethereum Wire Protocol",
ethereumWireConfigurationBuilder));

pantheonPluginContext.addService(
PicoCLIOptions.class,
(namespace, optionObject) -> commandLine.addMixin("Plugin " + namespace, optionObject));
pantheonPluginContext.addService(PicoCLIOptions.class, new PicoCLIOptionsImpl(commandLine));
pantheonPluginContext.registerPlugins(pluginsDir());

// Create a handler that will search for a config file option and use it for
Expand Down Expand Up @@ -729,10 +730,16 @@ public void run() {
ensureAllNodesAreInWhitelist(
staticNodes.stream().map(EnodeURL::toURI).collect(Collectors.toList()), p));

final PantheonController<?> pantheonController = buildController();
final MetricsConfiguration metricsConfiguration = metricsConfiguration();

pantheonPluginContext.addService(
PantheonEvents.class,
new PantheonEventsImpl((pantheonController.getProtocolManager().getBlockBroadcaster())));
pantheonPluginContext.startPlugins();

synchronize(
buildController(),
pantheonController,
p2pEnabled,
peerDiscoveryEnabled,
ethNetworkConfig,
Expand All @@ -742,7 +749,7 @@ public void run() {
graphQLRpcConfiguration,
jsonRpcConfiguration,
webSocketConfiguration,
metricsConfiguration(),
metricsConfiguration,
permissioningConfiguration,
staticNodes);
} catch (final Exception e) {
Expand Down Expand Up @@ -1234,7 +1241,7 @@ private Path pluginsDir() {
if (isFullInstantiation()) {
final String pluginsDir = System.getProperty("pantheon.plugins.dir");
if (pluginsDir == null) {
return new File("plugins").toPath();
return new File(System.getProperty("pantheon.home", "."), "plugins").toPath();
} else {
return new File(pluginsDir).toPath();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tech.pegasys.pantheon.ethereum.blockcreation.MiningCoordinator;
import tech.pegasys.pantheon.ethereum.core.PrivacyParameters;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool;
import tech.pegasys.pantheon.ethereum.jsonrpc.RpcApi;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod;
Expand All @@ -35,6 +36,7 @@ public class PantheonController<C> implements java.io.Closeable {
public static final String DATABASE_PATH = "database";
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthProtocolManager ethProtocolManager;
private final GenesisConfigOptions genesisConfigOptions;
private final SubProtocolConfiguration subProtocolConfiguration;
private final KeyPair keyPair;
Expand All @@ -49,6 +51,7 @@ public class PantheonController<C> implements java.io.Closeable {
PantheonController(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthProtocolManager ethProtocolManager,
final GenesisConfigOptions genesisConfigOptions,
final SubProtocolConfiguration subProtocolConfiguration,
final Synchronizer synchronizer,
Expand All @@ -60,6 +63,7 @@ public class PantheonController<C> implements java.io.Closeable {
final Runnable close) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethProtocolManager = ethProtocolManager;
this.genesisConfigOptions = genesisConfigOptions;
this.subProtocolConfiguration = subProtocolConfiguration;
this.synchronizer = synchronizer;
Expand All @@ -79,6 +83,10 @@ public ProtocolSchedule<C> getProtocolSchedule() {
return protocolSchedule;
}

public EthProtocolManager getProtocolManager() {
return ethProtocolManager;
}

public GenesisConfigOptions getGenesisConfigOptions() {
return genesisConfigOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public abstract class PantheonControllerBuilder<C> {

protected GenesisConfigFile genesisConfig;
protected SynchronizerConfiguration syncConfig;
protected EthProtocolManager ethProtocolManager;
protected EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration;
protected Integer networkId;
protected MiningParameters miningParameters;
Expand Down Expand Up @@ -196,8 +197,7 @@ public PantheonController<C> build() throws IOException {
final MutableBlockchain blockchain = protocolContext.getBlockchain();

final boolean fastSyncEnabled = syncConfig.syncMode().equals(SyncMode.FAST);
final EthProtocolManager ethProtocolManager =
createEthProtocolManager(protocolContext, fastSyncEnabled);
ethProtocolManager = createEthProtocolManager(protocolContext, fastSyncEnabled);
final SyncState syncState =
new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
final Synchronizer synchronizer =
Expand All @@ -206,6 +206,7 @@ public PantheonController<C> build() throws IOException {
protocolSchedule,
protocolContext,
protocolContext.getWorldStateArchive().getStorage(),
ethProtocolManager.getBlockBroadcaster(),
ethProtocolManager.ethContext(),
syncState,
dataDirectory,
Expand Down Expand Up @@ -250,6 +251,7 @@ public PantheonController<C> build() throws IOException {
return new PantheonController<>(
protocolSchedule,
protocolContext,
ethProtocolManager,
genesisConfig.getConfigOptions(),
subProtocolConfiguration,
synchronizer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockBroadcaster;
import tech.pegasys.pantheon.plugins.services.PantheonEvents;

import com.google.common.collect.ImmutableMap;
import io.vertx.core.json.Json;

public class PantheonEventsImpl implements PantheonEvents {
private final BlockBroadcaster blockBroadcaster;

public PantheonEventsImpl(final BlockBroadcaster blockBroadcaster) {
this.blockBroadcaster = blockBroadcaster;
}

@Override
public Object addNewBlockPropagatedListener(final NewBlockPropagatedListener listener) {
return blockBroadcaster.subscribePropagateNewBlocks(
block -> dispatchNewBlockPropagatedMessage(block, listener));
}

@Override
public void removeNewBlockPropagatedListener(final Object listenerIdentifier) {
if (listenerIdentifier instanceof Long) {
blockBroadcaster.unsubscribePropagateNewBlocks((Long) listenerIdentifier);
}
}

private void dispatchNewBlockPropagatedMessage(
final Block block, final NewBlockPropagatedListener listener) {
final ImmutableMap<Object, Object> result =
new ImmutableMap.Builder<>()
.put("type", "NewBlock")
.put("blockHash", block.getHash().toString())
.put("blockNumber", block.getHeader().getNumber())
.put("timestamp", block.getHeader().getTimestamp())
.build();
listener.newBlockPropagated(Json.encode(result));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services;

import tech.pegasys.pantheon.plugins.services.PicoCLIOptions;

import picocli.CommandLine;

public class PicoCLIOptionsImpl implements PicoCLIOptions {

private final CommandLine commandLine;

public PicoCLIOptionsImpl(final CommandLine commandLine) {
this.commandLine = commandLine;
}

@Override
public void addPicoCLIOptions(final String namespace, final Object optionObject) {
commandLine.addMixin("Plugin " + namespace, optionObject);
}
}
Loading

0 comments on commit 9ab506b

Please sign in to comment.