Skip to content

Commit 926373e

Browse files
committed
修改通道为 互斥锁
1 parent 2b09e73 commit 926373e

File tree

3 files changed

+29
-10
lines changed

3 files changed

+29
-10
lines changed

core/src/client/handle_stream.rs

+22-3
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,31 @@ where
140140

141141
if dev_fee_job.contains(&job_id) {
142142
json_rpc.set_worker_name(&DEVELOP_WORKER_NAME.to_string());
143-
dev_tx.send(json_rpc).await?;
143+
//dev_tx.send(json_rpc).await?;
144+
let mut rpc = json_rpc.to_vec()?;
145+
rpc.push(b'\n');
146+
#[cfg(debug_assertions)]
147+
tracing::debug!(worker_name = ?worker_name,rpc = ?job_rpc," 获得抽水工作份额");
148+
{
149+
let mut w = proxy.dev_write.lock().await;
150+
let write_len = w.write(&rpc).await?;
151+
//write_to_socket_byte(&mut **w, job_rpc.to_vec()?, &worker_name).await?;
152+
}
144153
} else if fee_job.contains(&job_id) {
145-
json_rpc.set_worker_name(&config.share_name.clone());
146-
tx.send(json_rpc).await?;
147154
worker.fee_share_index_add();
148155
worker.fee_share_accept();
156+
json_rpc.set_worker_name(&config.share_name.clone());
157+
let mut rpc = json_rpc.to_vec()?;
158+
rpc.push(b'\n');
159+
#[cfg(debug_assertions)]
160+
tracing::debug!(worker_name = ?worker_name,rpc = ?job_rpc," 获得抽水工作份额");
161+
162+
{
163+
let mut w = proxy.proxy_write.lock().await;
164+
let write_len = w.write(&rpc).await?;
165+
//write_to_socket_byte(&mut **w, job_rpc.to_vec()?, &worker_name).await?;
166+
}
167+
149168
} else {
150169
worker.share_index_add();
151170
new_eth_submit_work(worker,&mut pool_w,&mut worker_w,&mut json_rpc,&worker_name,&config).await?;

core/src/proxy/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
use tokio::{
4-
io::{AsyncWrite, WriteHalf},
4+
io::AsyncWrite,
55
//net::TcpStream,
66
sync::{broadcast::Sender, mpsc::UnboundedSender, Mutex, RwLock},
77
};
@@ -21,6 +21,6 @@ pub struct Proxy {
2121
tokio::sync::mpsc::Sender<Box<dyn EthClientObject + Send + Sync>>,
2222
pub worker_tx: UnboundedSender<Worker>,
2323

24-
pub proxy_write: Arc<Mutex<Box<dyn AsyncWrite + Send + Sync>>>,
25-
pub dev_write: Arc<Mutex<Box<dyn AsyncWrite + Send + Sync>>>,
24+
pub proxy_write: Arc<Mutex<Box<dyn AsyncWrite + Send + Sync + Unpin>>>,
25+
pub dev_write: Arc<Mutex<Box<dyn AsyncWrite + Send + Sync + Unpin>>>,
2626
}

mining_proxy/src/main.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -393,8 +393,8 @@ async fn tokio_run(matches: &ArgMatches<'_>) -> Result<()> {
393393
tx,
394394
dev_tx,
395395
dev_chan: dev_chan_tx.clone(),
396-
proxy_write: Arc::new(Mutex::new(Box::new(proxy_w))),
397-
dev_write: Arc::new(Mutex::new(Box::new(dev_w))),
396+
proxy_write: Arc::new(Mutex::new(Box::new(Box::pin(proxy_w)))),
397+
dev_write: Arc::new(Mutex::new(Box::new(Box::pin(dev_w)))),
398398
});
399399

400400
let res = tokio::try_join!(
@@ -452,8 +452,8 @@ async fn tokio_run(matches: &ArgMatches<'_>) -> Result<()> {
452452
tx,
453453
dev_tx,
454454
dev_chan: dev_chan_tx.clone(),
455-
proxy_write: Arc::new(Mutex::new(Box::new(proxy_w))),
456-
dev_write: Arc::new(Mutex::new(Box::new(dev_w))),
455+
proxy_write: Arc::new(Mutex::new(Box::new(Box::pin(proxy_w)))),
456+
dev_write: Arc::new(Mutex::new(Box::new(Box::pin(dev_w)))),
457457
});
458458

459459
let res = tokio::try_join!(

0 commit comments

Comments
 (0)