Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

remove QueueError #10852

Merged
merged 1 commit into from
Jul 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 34 additions & 39 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use engines::{MAX_UNCLE_AGE, Engine, EpochTransition, ForkChoice, EngineError, S
use engines::epoch::PendingTransition;
use error::{
ImportError, ExecutionError, CallError, BlockError,
QueueError, Error as EthcoreError, EthcoreResult,
Error as EthcoreError, EthcoreResult,
};
use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::{Factories, VmFactory};
Expand Down Expand Up @@ -2604,6 +2604,39 @@ fn transaction_receipt(
}
}

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
}

impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
currently_queued: Default::default(),
limit,
}
}

pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> EthcoreResult<()> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
if queue_size >= self.limit {
return Err(EthcoreError::FullQueue(self.limit))
};

let currently_queued = self.currently_queued.clone();
let _ok = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}))?;

self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
}
}

#[cfg(test)]
mod tests {
use ethereum_types::{H256, Address};
Expand Down Expand Up @@ -2761,41 +2794,3 @@ mod tests {
});
}
}

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
}

impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
currently_queued: Default::default(),
limit,
}
}

pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
if queue_size >= self.limit {
return Err(QueueError::Full(self.limit))
};

let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}));

match result {
Ok(_) => {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => return Err(QueueError::Channel(e)),
}
}
}
24 changes: 2 additions & 22 deletions ethcore/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,6 @@ impl error::Error for BlockError {
}
}

/// Queue error
#[derive(Debug, Display, From)]
pub enum QueueError {
/// Queue is full
#[display(fmt = "Queue is full ({})", _0)]
Full(usize),
/// Io channel error
#[display(fmt = "Io channel error: {}", _0)]
Channel(::io::IoError)
}

impl error::Error for QueueError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
QueueError::Channel(e) => Some(e),
_ => None,
}
}
}

/// Block import Error
#[derive(Debug, Display)]
pub enum ImportError {
Expand All @@ -196,8 +176,8 @@ pub enum Error {
#[display(fmt = "Import error: {}", _0)]
Import(ImportError),
/// Io channel queue error
#[display(fmt = "Queue error: {}", _0)]
Queue(QueueError),
#[display(fmt = "Queue is full: {}", _0)]
FullQueue(usize),
/// Io create error
#[display(fmt = "Io error: {}", _0)]
Io(::io::IoError),
Expand Down
4 changes: 2 additions & 2 deletions ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ethereum_types::H256;
use rlp::{self, Rlp};
use types::BlockNumber;
use ethcore::client::{BlockStatus, BlockId};
use ethcore::error::{ImportError, QueueError, BlockError, Error as EthcoreError};
use ethcore::error::{ImportError, BlockError, Error as EthcoreError};
use sync_io::SyncIo;
use blocks::{BlockCollection, SyncBody, SyncHeader};
use chain::BlockSet;
Expand Down Expand Up @@ -582,7 +582,7 @@ impl BlockDownloader {
debug_sync!(self, "Block temporarily invalid: {:?}, restarting sync", h);
break;
},
Err(EthcoreError::Queue(QueueError::Full(limit))) => {
Err(EthcoreError::FullQueue(limit)) => {
debug_sync!(self, "Block import queue full ({}), restarting sync", limit);
download_action = DownloadAction::Reset;
break;
Expand Down