Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Events API: Transaction dropped, sync status, and renames #1919

Merged
merged 24 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename Sync related stuff and remove logs for now
  • Loading branch information
RatanRSur committed Sep 11, 2019
commit 2f7edd6bdf33abd40d7a7693820f0c71be39d6af
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void startNode(final PantheonNode node) {
pantheonPluginContext.addService(
PantheonEvents.class,
new PantheonEventsImpl(
pantheonController.getProtocolManager().getBlockBroadcaster(),
pantheonController.getProtocolManager().getBlockBroadcaster(),
pantheonController.getTransactionPool(),
pantheonController.getSyncState()));
pantheonPluginContext.startPlugins();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface Synchronizer {
*/
Optional<SyncStatus> getSyncStatus();

long observeSyncStatus(final PantheonEvents.SynchronizerStatusListener listener);
long observeSyncStatus(final PantheonEvents.SyncStatusListener listener);

boolean removeObserver(long observerId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.plugin.services.MetricsSystem;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SynchronizerStatusListener;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.Subscribers;

Expand All @@ -49,7 +49,7 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final Optional<Pruner> maybePruner;
private final SyncState syncState;
private final AtomicBoolean running = new AtomicBoolean(false);
private final Subscribers<SynchronizerStatusListener> syncStatusListeners = Subscribers.create();
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private final BlockPropagationManager<C> blockPropagationManager;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;
private final FullSyncDownloader<C> fullSyncDownloader;
Expand Down Expand Up @@ -185,7 +185,7 @@ public Optional<SyncStatus> getSyncStatus() {
}

@Override
public long observeSyncStatus(final SynchronizerStatusListener listener) {
public long observeSyncStatus(final SyncStatusListener listener) {
checkNotNull(listener);
return syncStatusListeners.subscribe(listener);
}
Expand All @@ -196,6 +196,6 @@ public boolean removeObserver(final long observerId) {
}

private void syncStatusCallback(final SyncStatus status) {
syncStatusListeners.forEach(c -> c.onSynchronizerStatusChanged(status));
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(status));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SynchronizerStatusListener;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;
import tech.pegasys.pantheon.util.Subscribers;

import java.util.Optional;
Expand All @@ -32,7 +32,7 @@ public class SyncState {
private final long startingBlock;
private boolean lastInSync = true;
private final Subscribers<InSyncListener> inSyncListeners = Subscribers.create();
private final Subscribers<SynchronizerStatusListener> syncStatusListeners = Subscribers.create();
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private Optional<SyncTarget> syncTarget = Optional.empty();
private long chainHeightListenerId;

Expand All @@ -51,14 +51,14 @@ public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {

public void publishSyncStatus() {
final SyncStatus syncStatus = syncStatus();
syncStatusListeners.forEach(c -> c.onSynchronizerStatusChanged(syncStatus));
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}

public void addInSyncListener(final InSyncListener observer) {
inSyncListeners.subscribe(observer);
}

public long addSyncStatusListener(final SynchronizerStatusListener observer) {
public long addSyncStatusListener(final SyncStatusListener observer) {
return syncStatusListeners.subscribe(observer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ private PantheonCommand startPlugins() {
pantheonPluginContext.addService(
PantheonEvents.class,
new PantheonEventsImpl(
(pantheonController.getProtocolManager().getBlockBroadcaster()),
pantheonController.getProtocolManager().getBlockBroadcaster(),
pantheonController.getTransactionPool(),
pantheonController.getSyncState()));
pantheonPluginContext.startPlugins();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,13 @@ public void removeTransactionDroppedListener(final long listenerIdentifier) {
}

@Override
public long addLogWithMetadataListener(final LogWithMetadataListener logWithMetaDataListener) {
return 0;
public long addSyncStatusListener(
final SyncStatusListener syncStatusListener) {
return syncState.addSyncStatusListener(syncStatusListener);
}

@Override
public void removeLogWithMetadataListener(final long listenerIdentifier) {}

@Override
public long addSynchronizerStatusListener(
final SynchronizerStatusListener synchronizerStatusListener) {
return syncState.addSyncStatusListener(synchronizerStatusListener);
}

@Override
public void removeSynchronizerStatusListener(final long listenerIdentifier) {
public void removeSyncStatusListener(final long listenerIdentifier) {
syncState.removeSyncStatusListener(listenerIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void setUp() {
@Test
public void newSyncStatusEventFiresAfterSubscribe() {
final AtomicReference<SyncStatus> result = new AtomicReference<>();
serviceImpl.addSynchronizerStatusListener(result::set);
serviceImpl.addSyncStatusListener(result::set);

assertThat(result.get()).isNull();
syncState.publishSyncStatus();
Expand All @@ -131,8 +131,8 @@ public void newSyncStatusEventFiresAfterSubscribe() {
@Test
public void newSyncStatusEventDoesNotFireAfterUnsubscribe() {
final AtomicReference<SyncStatus> result = new AtomicReference<>();
final long id = serviceImpl.addSynchronizerStatusListener(result::set);
serviceImpl.removeSynchronizerStatusListener(id);
final long id = serviceImpl.addSyncStatusListener(result::set);
serviceImpl.removeSyncStatusListener(id);
syncState.publishSyncStatus();
assertThat(result.get()).isNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@

import tech.pegasys.pantheon.plugin.Unstable;
import tech.pegasys.pantheon.plugin.data.BlockHeader;
import tech.pegasys.pantheon.plugin.data.LogWithMetadata;
import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.plugin.data.Transaction;

import java.util.List;

/**
* This service allows plugins to attach to various events during the normal operation of Pantheon.
*
Expand Down Expand Up @@ -85,38 +82,21 @@ public interface PantheonEvents {
*/
void removeTransactionDroppedListener(long listenerIdentifier);

/**
* Add a listener watching logs (plus metadata about them) included in new blocks.
*
* @param logWithMetadataListener The listener that will accept the LogWithMetadata object as the
* event.
* @return an object to be used as an identifier when de-registering the event.
*/
long addLogWithMetadataListener(
List<String> address, List<String> topics, LogWithMetadataListener logWithMetadataListener);

/**
* Remove the logs listener from pantheon notifications.
*
* @param listenerIdentifier The instance that was returned from addTransactionDroppedListener;
*/
void removeLogWithMetadataListener(long listenerIdentifier);

/**
* Add a listener watching the synchronizer status.
*
* @param synchronizerStatusListener The listener that will accept the SyncStatus object as the
* @param syncStatusListener The listener that will accept the SyncStatus object as the
* event.
* @return an object to be used as an identifier when de-registering the event.
*/
long addSynchronizerStatusListener(SynchronizerStatusListener synchronizerStatusListener);
long addSyncStatusListener(SyncStatusListener syncStatusListener);

/**
* Remove the logs listener from pantheon notifications.
*
* @param listenerIdentifier The instance that was returned from addTransactionDroppedListener;
*/
void removeSynchronizerStatusListener(long listenerIdentifier);
void removeSyncStatusListener(long listenerIdentifier);

/** The listener interface for receiving new block propagated events. */
interface BlockPropagatedListener {
Expand Down Expand Up @@ -155,25 +135,14 @@ interface TransactionDroppedListener {
void onTransactionDropped(Transaction transaction);
}

/** The listener interface for receiving logs from new blocks. */
interface LogWithMetadataListener {

/**
* Invoked when a new block is added.
*
* @param logs the new logs from the block added event
*/
void onLogWithMetadatasAdded(List<LogWithMetadata> logs);
}

/** The listener interface for receiving sync status events. */
interface SynchronizerStatusListener {
interface SyncStatusListener {

/**
* Invoked when the synchronizer status changes
*
* @param syncStatus the sync status
*/
void onSynchronizerStatusChanged(SyncStatus syncStatus);
void onSyncStatusChanged(SyncStatus syncStatus);
}
}