Skip to content

Commit 8b3513f

Browse files
committed
修改为断线重联模式
1 parent c98c4cb commit 8b3513f

File tree

2 files changed

+249
-35
lines changed

2 files changed

+249
-35
lines changed

core/src/client/fee.rs

+241-27
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::{Arc, RwLockReadGuard};
2+
13
use anyhow::{anyhow, Result};
24

35
use tokio::{
@@ -7,7 +9,7 @@ use tokio::{
79
sync::RwLockWriteGuard,
810
};
911

10-
use crate::{protocol::ethjson::EthClientObject, proxy::Job};
12+
use crate::{protocol::ethjson::EthClientObject, proxy::{Job, Proxy}, util::config::Settings};
1113

1214
use crate::{
1315
client::lines_unwrap,
@@ -21,35 +23,247 @@ use super::write_to_socket_byte;
2123

2224
use tracing::{debug, info};
2325

24-
// pub async fn fee_ssl<R: 'static>(
25-
// rx: Receiver<Vec<String>>, chan: Sender<Vec<String>>,
26-
// proxy_lines: Lines<BufReader<tokio::io::ReadHalf<R>>>,
27-
// w: tokio::io::WriteHalf<tokio_native_tls::TlsStream<tokio::net::TcpStream>>,
28-
// worker_name: String,
29-
// ) -> Result<()>
30-
// where
31-
// R: AsyncRead + Send,
32-
// {
33-
// let worker_name_write = worker_name.clone();
3426

35-
// let worker_write = tokio::spawn(async move {
36-
// match async_write(rx, worker_name_write, w).await {
37-
// Ok(()) => todo!(),
38-
// Err(e) => std::panic::panic_any(e),
39-
// }
40-
// });
27+
pub async fn develop_fee_ssl(
28+
mut rx: Receiver<Vec<String>>,
29+
job: Job,
30+
mut proxy_lines: Lines<BufReader<tokio::io::ReadHalf<tokio_native_tls::TlsStream<tokio::net::TcpStream>>>>,
31+
mut w: tokio::io::WriteHalf<tokio_native_tls::TlsStream<tokio::net::TcpStream>>,
32+
worker_name: String,
33+
proxy: Arc<Proxy>
34+
) -> Result<()>
35+
{
36+
let mut config: Settings;
37+
{
38+
let rconfig = proxy.config.read().await;
39+
config = rconfig.clone();
40+
}
41+
42+
let mut get_work = EthClientRootObject {
43+
id: 6,
44+
method: "eth_getWork".into(),
45+
params: vec![],
46+
};
4147

42-
// let worker_reader = tokio::spawn(async move {
43-
// match worker_reader(proxy_lines, chan, worker_name).await {
44-
// Ok(()) => todo!(),
45-
// Err(e) => std::panic::panic_any(e),
46-
// }
47-
// });
48+
let mut json_rpc = EthClientWorkerObject {
49+
id: 40,
50+
method: "eth_submitWork".into(),
51+
params: vec![],
52+
worker: worker_name.clone(),
53+
};
4854

49-
// let (_, _) = tokio::join!(worker_write, worker_reader);
55+
let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(20));
56+
tokio::pin!(sleep);
57+
let mut share_job_idx: u64 = 0;
5058

51-
// Ok(())
52-
// }
59+
loop {
60+
select! {
61+
res = proxy_lines.next_line() => {
62+
let buffer = match lines_unwrap(res,&worker_name,"矿池").await {
63+
Ok(buf) => buf,
64+
Err(_) => {
65+
66+
let (dev_lines, dev_w) =
67+
crate::client::dev_pool_ssl_login(worker_name.clone())
68+
.await?;
69+
70+
//同时加2个值
71+
w = dev_w;
72+
proxy_lines = dev_lines;
73+
info!(worker_name = ?worker_name,"重新登录成功!!");
74+
75+
continue;
76+
},
77+
};
78+
#[cfg(debug_assertions)]
79+
debug!("1 : 矿池 -> 矿机 {} #{:?}",worker_name, buffer);
80+
if let Ok(job_rpc) = serde_json::from_str::<EthServerRootObject>(&buffer) {
81+
let job_res = job_rpc.get_job_result().unwrap();
82+
{
83+
let mut j = RwLockWriteGuard::map(job.write().await, |f| f);
84+
j.push_back(job_res)
85+
}
86+
} else if let Ok(result_rpc) = serde_json::from_str::<EthServer>(&buffer) {
87+
if result_rpc.result == false {
88+
tracing::debug!(worker_name = ?worker_name,rpc = ?buffer,"线程获得操作结果 {:?}",result_rpc.result);
89+
}
90+
}
91+
},
92+
Some(params) = rx.recv() => {
93+
share_job_idx+=1;
94+
json_rpc.id = share_job_idx;
95+
json_rpc.params = params;
96+
write_to_socket_byte(&mut w, json_rpc.to_vec()?, &worker_name).await?;
97+
},
98+
() = &mut sleep => {
99+
write_to_socket_byte(&mut w, get_work.to_vec()?, &worker_name).await?;
100+
sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(10));
101+
},
102+
}
103+
}
104+
105+
Ok(())
106+
}
107+
108+
109+
pub async fn fee_ssl(
110+
mut rx: Receiver<Vec<String>>,
111+
job: Job,
112+
mut proxy_lines: Lines<BufReader<tokio::io::ReadHalf<tokio_native_tls::TlsStream<tokio::net::TcpStream>>>>,
113+
mut w: tokio::io::WriteHalf<tokio_native_tls::TlsStream<tokio::net::TcpStream>>,
114+
worker_name: String,
115+
proxy: Arc<Proxy>
116+
) -> Result<()>
117+
{
118+
let mut config: Settings;
119+
{
120+
let rconfig = proxy.config.read().await;
121+
config = rconfig.clone();
122+
}
123+
124+
let mut get_work = EthClientRootObject {
125+
id: 6,
126+
method: "eth_getWork".into(),
127+
params: vec![],
128+
};
129+
130+
let mut json_rpc = EthClientWorkerObject {
131+
id: 40,
132+
method: "eth_submitWork".into(),
133+
params: vec![],
134+
worker: worker_name.clone(),
135+
};
136+
137+
let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(20));
138+
tokio::pin!(sleep);
139+
let mut share_job_idx: u64 = 0;
140+
141+
loop {
142+
select! {
143+
res = proxy_lines.next_line() => {
144+
let buffer = match lines_unwrap(res,&worker_name,"矿池").await {
145+
Ok(buf) => buf,
146+
Err(_) => {
147+
//return Err(anyhow!("抽水旷工掉线了a"));
148+
// info!(worker_name =
149+
// ?worker_name,"退出了。重新登录到池!!");
150+
let (new_lines, dev_w) = crate::client::proxy_pool_login_with_ssl(&config,config.share_name.clone()).await?;
151+
152+
//同时加2个值
153+
w = dev_w;
154+
proxy_lines = new_lines;
155+
info!(worker_name = ?worker_name,"重新登录成功!!");
156+
157+
continue;
158+
},
159+
};
160+
#[cfg(debug_assertions)]
161+
debug!("1 : 矿池 -> 矿机 {} #{:?}",worker_name, buffer);
162+
if let Ok(job_rpc) = serde_json::from_str::<EthServerRootObject>(&buffer) {
163+
let job_res = job_rpc.get_job_result().unwrap();
164+
{
165+
let mut j = RwLockWriteGuard::map(job.write().await, |f| f);
166+
j.push_back(job_res)
167+
}
168+
} else if let Ok(result_rpc) = serde_json::from_str::<EthServer>(&buffer) {
169+
if result_rpc.result == false {
170+
tracing::debug!(worker_name = ?worker_name,rpc = ?buffer,"线程获得操作结果 {:?}",result_rpc.result);
171+
}
172+
}
173+
},
174+
Some(params) = rx.recv() => {
175+
share_job_idx+=1;
176+
json_rpc.id = share_job_idx;
177+
json_rpc.params = params;
178+
write_to_socket_byte(&mut w, json_rpc.to_vec()?, &worker_name).await?;
179+
},
180+
() = &mut sleep => {
181+
write_to_socket_byte(&mut w, get_work.to_vec()?, &worker_name).await?;
182+
sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(10));
183+
},
184+
}
185+
}
186+
187+
Ok(())
188+
}
189+
pub async fn fee_tcp(
190+
mut rx: Receiver<Vec<String>>,
191+
job: Job,
192+
mut proxy_lines: Lines<BufReader<tokio::io::ReadHalf<tokio::net::TcpStream>>>,
193+
mut w: tokio::io::WriteHalf<tokio::net::TcpStream>,
194+
worker_name: String,
195+
proxy: Arc<Proxy>
196+
) -> Result<()>
197+
{
198+
let mut config: Settings;
199+
{
200+
let rconfig = proxy.config.read().await;
201+
config = rconfig.clone();
202+
}
203+
204+
let mut get_work = EthClientRootObject {
205+
id: 6,
206+
method: "eth_getWork".into(),
207+
params: vec![],
208+
};
209+
210+
let mut json_rpc = EthClientWorkerObject {
211+
id: 40,
212+
method: "eth_submitWork".into(),
213+
params: vec![],
214+
worker: worker_name.clone(),
215+
};
216+
217+
let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(20));
218+
tokio::pin!(sleep);
219+
let mut share_job_idx: u64 = 0;
220+
221+
loop {
222+
select! {
223+
res = proxy_lines.next_line() => {
224+
let buffer = match lines_unwrap(res,&worker_name,"矿池").await {
225+
Ok(buf) => buf,
226+
Err(_) => {
227+
228+
let (new_lines, dev_w) = crate::client::proxy_pool_login(&config,config.share_name.clone()).await?;
229+
230+
//同时加2个值
231+
w = dev_w;
232+
proxy_lines = new_lines;
233+
info!(worker_name = ?worker_name,"重新登录成功!!");
234+
235+
continue;
236+
},
237+
};
238+
#[cfg(debug_assertions)]
239+
debug!("1 : 矿池 -> 矿机 {} #{:?}",worker_name, buffer);
240+
if let Ok(job_rpc) = serde_json::from_str::<EthServerRootObject>(&buffer) {
241+
let job_res = job_rpc.get_job_result().unwrap();
242+
{
243+
let mut j = RwLockWriteGuard::map(job.write().await, |f| f);
244+
j.push_back(job_res)
245+
}
246+
} else if let Ok(result_rpc) = serde_json::from_str::<EthServer>(&buffer) {
247+
if result_rpc.result == false {
248+
tracing::debug!(worker_name = ?worker_name,rpc = ?buffer,"线程获得操作结果 {:?}",result_rpc.result);
249+
}
250+
}
251+
},
252+
Some(params) = rx.recv() => {
253+
share_job_idx+=1;
254+
json_rpc.id = share_job_idx;
255+
json_rpc.params = params;
256+
write_to_socket_byte(&mut w, json_rpc.to_vec()?, &worker_name).await?;
257+
},
258+
() = &mut sleep => {
259+
write_to_socket_byte(&mut w, get_work.to_vec()?, &worker_name).await?;
260+
sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(10));
261+
},
262+
}
263+
}
264+
265+
Ok(())
266+
}
53267

54268
pub async fn fee<W: 'static, R: 'static>(
55269
rx: Receiver<Vec<String>>,
@@ -107,7 +321,7 @@ where W: AsyncWrite + Send {
107321
let mut share_job_idx: u64 = 0;
108322

109323
loop {
110-
select! {
324+
select! {
111325
Some(params) = rx.recv() => {
112326
share_job_idx+=1;
113327
json_rpc.id = share_job_idx;

mining_proxy/src/main.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -395,21 +395,21 @@ async fn tokio_run(matches: &ArgMatches<'_>) -> Result<()> {
395395
accept_en_tcp(Arc::clone(&proxy)),
396396
accept_tcp_with_tls(Arc::clone(&proxy), cert_config),
397397
send_to_parent(worker_rx, &mconfig),
398-
core::client::fee::fee(
398+
core::client::fee::fee_tcp(
399399
rx,
400-
//chan_tx,
401400
fee_job,
402401
proxy_lines,
403402
proxy_w,
404403
worker_name.clone(),
404+
proxy.clone(),
405405
),
406-
core::client::fee::fee(
406+
core::client::fee::develop_fee_ssl(
407407
dev_rx,
408-
// dev_chan_tx,
409408
develop_job,
410409
dev_lines,
411410
dev_w,
412411
core::DEVELOP_WORKER_NAME.to_string(),
412+
proxy,
413413
),
414414
);
415415

@@ -428,21 +428,21 @@ async fn tokio_run(matches: &ArgMatches<'_>) -> Result<()> {
428428
accept_en_tcp(Arc::clone(&proxy)),
429429
accept_tcp_with_tls(Arc::clone(&proxy), cert_config),
430430
send_to_parent(worker_rx, &mconfig),
431-
core::client::fee::fee(
431+
core::client::fee::fee_ssl(
432432
rx,
433-
//chan_tx,
434433
fee_job,
435434
proxy_lines,
436435
proxy_w,
437436
worker_name.clone(),
437+
proxy.clone(),
438438
),
439-
core::client::fee::fee(
439+
core::client::fee::develop_fee_ssl(
440440
dev_rx,
441-
//dev_chan_tx,
442441
develop_job,
443442
dev_lines,
444443
dev_w,
445444
core::DEVELOP_WORKER_NAME.to_string(),
445+
proxy,
446446
),
447447
);
448448

0 commit comments

Comments
 (0)