Skip to content

Commit 64a44a3

Browse files
committed
增加通道容量。恢复会异步任务
1 parent cd9bc9a commit 64a44a3

File tree

1 file changed

+55
-32
lines changed

1 file changed

+55
-32
lines changed

core/src/client/handle_stream.rs

+55-32
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,8 @@ where
307307
#[cfg(debug_assertions)]
308308
debug!("进入开发者抽水回合");
309309

310-
// if let Some(job_res) = wait_dev_job.pop_back() {
311-
if let Ok(job_res) = dev_chan.try_recv() {
310+
if let Some(job_res) = wait_dev_job.pop_back() {
311+
//if let Ok(job_res) = dev_chan.try_recv() {
312312
{
313313
job_rpc.result = job_res.clone();
314314
let hi = job_rpc.get_hight();
@@ -328,6 +328,16 @@ where
328328
} else {
329329
#[cfg(debug_assertions)]
330330
debug!(worker=?worker,hight=?hi,job=?job_rpc,"已分配开发者抽水任务");
331+
worker.send_develop_job()?;
332+
#[cfg(debug_assertions)]
333+
debug!("获取开发者抽水任务成功 {:?}",&job_res);
334+
job_rpc.result = job_res;
335+
let job_id = job_rpc.get_job_id().unwrap();
336+
dev_fee_job.push(job_id.clone());
337+
#[cfg(debug_assertions)]
338+
debug!("{} 发送开发者任务 #{:?}",worker_name, job_rpc);
339+
write_rpc(is_encrypted,&mut worker_w,&job_rpc,&worker_name).await?;
340+
continue;
331341
}
332342
}
333343
}
@@ -344,12 +354,14 @@ where
344354
continue;
345355
}
346356
} else if is_fee_random(config.share_rate.into()) {
347-
//if let Some(job_res) = wait_job.pop_back() {
348-
if let Ok(job_res) = chan.try_recv() {
349-
{
350-
job_rpc.result = job_res.clone();
351-
let hi = job_rpc.get_hight();
352-
if hi != 0 {
357+
#[cfg(debug_assertions)]
358+
debug!("进入普通抽水回合");
359+
if let Some(job_res) = wait_job.pop_back() {
360+
//if let Ok(job_res) = chan.try_recv() {
361+
362+
job_rpc.result = job_res.clone();
363+
let hi = job_rpc.get_hight();
364+
if hi != 0 {
353365
if job_hight < hi {
354366
#[cfg(debug_assertions)]
355367
debug!(worker=?worker,hight=?hi,"抽水任务 高度已经改变.");
@@ -362,10 +374,19 @@ where
362374
#[cfg(debug_assertions)]
363375
debug!(worker=?worker,hight=?hi,job=?job_rpc,"抽水获取到 陈旧的任务。不再分配");
364376
continue;
365-
}
377+
} else {
378+
worker.send_fee_job()?;
379+
job_rpc.result = job_res;
380+
let job_id = job_rpc.get_job_id().unwrap();
381+
fee_job.push(job_id.clone());
382+
#[cfg(debug_assertions)]
383+
debug!("{} 发送抽水任务 #{:?}",worker_name, job_rpc);
384+
write_rpc(is_encrypted,&mut worker_w,&job_rpc,&worker_name).await?;
385+
continue;
366386
}
367387
}
368388

389+
369390
worker.send_fee_job()?;
370391
job_rpc.result = job_res;
371392
let job_id = job_rpc.get_job_id().unwrap();
@@ -376,6 +397,8 @@ where
376397
continue;
377398
}
378399
}
400+
401+
379402
//TODO Job diff 处理。如果接收到的任务已经过期。就跳过此任务分配。等待下次任务分配。
380403
job_rpc.result = rpc.result;
381404
let hi = job_rpc.get_hight();
@@ -411,29 +434,29 @@ where
411434
}
412435
}
413436
},
414-
// Ok(job_res) = dev_chan.recv() => {
415-
// job_rpc.result = job_res.clone();
416-
// let hi = job_rpc.get_hight();
417-
// if hi != 0 && job_hight < hi {
418-
// #[cfg(debug_assertions)]
419-
// debug!(worker=?worker,hight=?hi,"开发者 高度已经改变.");
420-
// wait_dev_job.clear();
421-
// wait_job.clear();
422-
// job_hight = hi;
423-
// }
424-
// wait_dev_job.push_back(job_res);
425-
// },Ok(job_res) = chan.recv() => {
426-
// job_rpc.result = job_res.clone();
427-
// let hi = job_rpc.get_hight();
428-
// if hi != 0 && job_hight < hi {
429-
// #[cfg(debug_assertions)]
430-
// debug!(worker=?worker,hight=?hi,"中转 高度已经改变.");
431-
// wait_dev_job.clear();
432-
// wait_job.clear();
433-
// job_hight = hi;
434-
// }
435-
// wait_job.push_back(job_res);
436-
// },
437+
Ok(job_res) = dev_chan.recv() => {
438+
job_rpc.result = job_res.clone();
439+
let hi = job_rpc.get_hight();
440+
if hi != 0 && job_hight < hi {
441+
#[cfg(debug_assertions)]
442+
debug!(worker=?worker,hight=?hi,"开发者 高度已经改变.");
443+
wait_dev_job.clear();
444+
wait_job.clear();
445+
job_hight = hi;
446+
}
447+
wait_dev_job.push_back(job_res);
448+
},Ok(job_res) = chan.recv() => {
449+
job_rpc.result = job_res.clone();
450+
let hi = job_rpc.get_hight();
451+
if hi != 0 && job_hight < hi {
452+
#[cfg(debug_assertions)]
453+
debug!(worker=?worker,hight=?hi,"中转 高度已经改变.");
454+
wait_dev_job.clear();
455+
wait_job.clear();
456+
job_hight = hi;
457+
}
458+
wait_job.push_back(job_res);
459+
},
437460
() = &mut sleep => {
438461
match workers_queue.send(worker.clone()) {
439462
Ok(_) => {},

0 commit comments

Comments
 (0)