Skip to content
This repository has been archived by the owner on May 22, 2023. It is now read-only.

Commit

Permalink
Merge pull request #218 from Cerebellum-Network/feature/integrate-dac…
Browse files Browse the repository at this point in the history
…-validator-rebase

Feature/integrate dac validator rebase
  • Loading branch information
MRamanenkau authored May 18, 2023
2 parents 74029a2 + 1b5a218 commit dbeb513
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 88 deletions.
38 changes: 24 additions & 14 deletions frame/ddc-validator/src/dac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,24 @@ fn get_timestamps_with_ack(file_requests: &Requests) -> Vec<TimestampInSec> {
timestamps
}

pub fn get_proved_delivered_bytes_sum(file_requests: &Requests) -> u64 {
pub fn get_served_bytes_sum(file_requests: &Requests) -> (u64, u64) {
let ack_timestamps = get_timestamps_with_ack(file_requests);
let mut total_bytes_received = 0u64;
let mut total_bytes_sent = 0u64;

for (_, file_request) in file_requests {
for (_, chunk) in &file_request.chunks {
total_bytes_sent += chunk.log.bytes_sent;

if let Some(ack) = &chunk.ack {
total_bytes_received += &chunk.log.bytes_sent;
total_bytes_received += ack.bytes_received;
} else {
total_bytes_received += get_proved_delivered_bytes(chunk, &ack_timestamps);
}
}
}

total_bytes_received
(total_bytes_sent, total_bytes_received)
}

fn get_proved_delivered_bytes(chunk: &Chunk, ack_timestamps: &Vec<TimestampInSec>) -> u64 {
Expand Down Expand Up @@ -342,9 +345,12 @@ fn get_file_request_url(data_provider_url: &String) -> String {
}

pub(crate) fn fetch_file_request(url: &String) -> Requests {
log::info!("fetch_file_request | url: {:?}", url);
let response: FileRequestWrapper = http_get_json(&url).unwrap();
let value: Value = serde_json::from_str(response.json.as_str()).unwrap();
let map: Requests = serde_json::from_value(value).unwrap();

log::info!("response.json: {:?}", response.json);

map
}
Expand Down Expand Up @@ -409,7 +415,7 @@ fn get_bytes_received_query_url(data_provider_url: &String, era: EraIndex) -> St
format!("{}/FT.AGGREGATE/ddc:dac:searchCommonIndex/@era:[{}%20{}]/GROUPBY/2/@nodePublicKey/@era/REDUCE/SUM/1/@bytesReceived/AS/bytesReceivedSum", data_provider_url, era, era)
}

fn http_get_json<OUT: DeserializeOwned>(url: &str) -> crate::ResultStr<OUT> {
pub(crate) fn http_get_json<OUT: DeserializeOwned>(url: &str) -> crate::ResultStr<OUT> {
let body = http_get_request(url).map_err(|err| {
log::error!("[DAC Validator] Error while getting {}: {:?}", url, err);
"HTTP GET error"
Expand Down Expand Up @@ -540,16 +546,19 @@ pub(crate) fn post_final_decision(
res
}

pub(crate) fn get_final_decision(decisions: Vec<ValidationResult>) -> ValidationDecision {
pub(crate) fn get_final_decision(decisions: Vec<ValidationDecision>) -> ValidationDecision {
let common_decisions = find_largest_group(decisions).unwrap();
let decision_example = common_decisions.get(0).unwrap();

let serialized_decisions = serde_json::to_string(&common_decisions).unwrap();

let final_decision = ValidationDecision {
edge: decision_example.edge.clone(),
result: decision_example.result,
payload: utils::get_hashed(&common_decisions),
payload: utils::hash(&serialized_decisions),
totals: DacTotalAggregates {
received: decision_example.received,
sent: decision_example.sent,
received: decision_example.totals.received,
sent: decision_example.totals.sent,
failed_by_client: 0,
failure_rate: 0,
},
Expand All @@ -558,7 +567,7 @@ pub(crate) fn get_final_decision(decisions: Vec<ValidationResult>) -> Validation
final_decision
}

pub(crate) fn get_validation_results(
pub(crate) fn fetch_validation_results(
data_provider_url: &String,
era: EraIndex,
edge: &String,
Expand All @@ -570,8 +579,8 @@ pub(crate) fn get_validation_results(
Ok(results)
}

fn find_largest_group(decisions: Vec<ValidationResult>) -> Option<Vec<ValidationResult>> {
let mut groups: Vec<Vec<ValidationResult>> = Vec::new();
fn find_largest_group(decisions: Vec<ValidationDecision>) -> Option<Vec<ValidationDecision>> {
let mut groups: Vec<Vec<ValidationDecision>> = Vec::new();
let half = decisions.len() / 2;

for decision in decisions {
Expand All @@ -580,12 +589,12 @@ fn find_largest_group(decisions: Vec<ValidationResult>) -> Option<Vec<Validation
for group in &mut groups {
if group.iter().all(|x| {
x.result == decision.result &&
x.received == decision.received &&
x.sent == decision.sent
x.totals.received == decision.totals.received &&
x.totals.sent == decision.totals.sent
}) {
group.push(decision.clone());
found_group = true;
break
break;
}
}

Expand All @@ -602,3 +611,4 @@ fn find_largest_group(decisions: Vec<ValidationResult>) -> Option<Vec<Validation
None
}
}

Loading

0 comments on commit dbeb513

Please sign in to comment.