Skip to content

Commit

Permalink
Update Task for DAP-02. (#616)
Browse files Browse the repository at this point in the history
* Rename min_batch_duration to time_precision.
* Rename max_batch_lifetime to max_batch_query_count.
* Add task_expiration to Task configuration.
* Add query_type to Task.

Also, clean up Task to be immutable with getters, as in other cross-module data structures.
  • Loading branch information
branlwyd authored Oct 7, 2022
1 parent d8bc70b commit a9eb40e
Show file tree
Hide file tree
Showing 25 changed files with 2,133 additions and 1,742 deletions.
6 changes: 4 additions & 2 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ CREATE TABLE tasks(
task_id BYTEA UNIQUE NOT NULL, -- 32-byte TaskID as defined by the DAP specification
aggregator_role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator for this task
aggregator_endpoints TEXT[] NOT NULL, -- aggregator HTTPS endpoints, leader first
query_type JSON NOT NULL, -- the query type in use for this task, along with its parameters
vdaf JSON NOT NULL, -- the VDAF instance in use for this task, along with its parameters
max_batch_lifetime BIGINT NOT NULL, -- the maximum number of times a given batch may be collected
max_batch_query_count BIGINT NOT NULL, -- the maximum number of times a given batch may be collected
task_expiration TIMESTAMP NOT NULL, -- the time after which client reports are no longer accepted
min_batch_size BIGINT NOT NULL, -- the minimum number of reports in a batch to allow it to be collected
min_batch_duration BIGINT NOT NULL, -- the minimum duration in seconds of a single batch interval
time_precision BIGINT NOT NULL, -- the duration to which clients are expected to round their report timestamps, in seconds
tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds
collector_hpke_config BYTEA NOT NULL -- the HPKE config of the collector (encoded HpkeConfig message)
);
Expand Down
32 changes: 16 additions & 16 deletions integration_tests/src/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<'a> Daphne<'a> {
// Aes128Gcm); this is checked in `DaphneHpkeConfig::from`.
let dap_hpke_receiver_config_list = serde_json::to_string(
&task
.hpke_keys
.hpke_keys()
.values()
.map(|(hpke_config, private_key)| DaphneHpkeReceiverConfig {
config: DaphneHpkeConfig::from(hpke_config.clone()),
Expand All @@ -80,28 +80,28 @@ impl<'a> Daphne<'a> {
let dap_collect_id_key: [u8; 16] = random();

let dap_task_list = serde_json::to_string(&HashMap::from([(
hex::encode(task.id.as_ref()),
hex::encode(task.id().as_ref()),
DaphneDapTaskConfig {
version: "v01".to_string(),
leader_url: task.aggregator_url(Role::Leader).unwrap().clone(),
helper_url: task.aggregator_url(Role::Helper).unwrap().clone(),
min_batch_duration: task.min_batch_duration.as_seconds(),
min_batch_size: task.min_batch_size,
vdaf: daphne_vdaf_config_from_janus_vdaf(&task.vdaf),
vdaf_verify_key: hex::encode(task.vdaf_verify_keys().first().unwrap().as_bytes()),
collector_hpke_config: DaphneHpkeConfig::from(task.collector_hpke_config.clone()),
leader_url: task.aggregator_url(&Role::Leader).unwrap().clone(),
helper_url: task.aggregator_url(&Role::Helper).unwrap().clone(),
min_batch_duration: task.time_precision().as_seconds(), // TODO(#493): this field will likely need to be renamed
min_batch_size: task.min_batch_size(),
vdaf: daphne_vdaf_config_from_janus_vdaf(task.vdaf()),
vdaf_verify_key: hex::encode(task.vdaf_verify_keys().first().unwrap().as_ref()),
collector_hpke_config: DaphneHpkeConfig::from(task.collector_hpke_config().clone()),
},
)]))
.unwrap();

// Daphne currently only supports one auth token per task. Janus supports multiple tokens
// per task to allow rotation; we supply Daphne with the "primary" token.
let aggregator_bearer_token_list = json!({
hex::encode(task.id.as_ref()): String::from_utf8(task.primary_aggregator_auth_token().as_bytes().to_vec()).unwrap()
hex::encode(task.id().as_ref()): String::from_utf8(task.primary_aggregator_auth_token().as_bytes().to_vec()).unwrap()
}).to_string();
let collector_bearer_token_list = if task.role == Role::Leader {
let collector_bearer_token_list = if task.role() == &Role::Leader {
json!({
hex::encode(task.id.as_ref()): String::from_utf8(task.primary_collector_auth_token().as_bytes().to_vec()).unwrap()
hex::encode(task.id().as_ref()): String::from_utf8(task.primary_collector_auth_token().as_bytes().to_vec()).unwrap()
}).to_string()
} else {
String::new()
Expand Down Expand Up @@ -160,7 +160,7 @@ impl<'a> Daphne<'a> {

// Start the Daphne test container running.
let port = pick_unused_port().expect("Couldn't pick unused port");
let endpoint = task.aggregator_url(task.role).unwrap();
let endpoint = task.aggregator_url(task.role()).unwrap();

let args = [
(
Expand All @@ -179,7 +179,7 @@ impl<'a> Daphne<'a> {
),
(
"DAP_AGGREGATOR_ROLE".to_string(),
task.role.as_str().to_string(),
task.role().as_str().to_string(),
),
(
"DAP_GLOBAL_CONFIG".to_string(),
Expand Down Expand Up @@ -233,7 +233,7 @@ impl<'a> Daphne<'a> {
task::spawn({
let http_client = reqwest::Client::default();
let mut request_url = task
.aggregator_url(task.role)
.aggregator_url(task.role())
.unwrap()
.join("/internal/process")
.unwrap();
Expand Down Expand Up @@ -266,7 +266,7 @@ impl<'a> Daphne<'a> {

Self {
daphne_container,
role: task.role,
role: *task.role(),
start_shutdown_sender: Some(start_shutdown_sender),
shutdown_complete_receiver: Some(shutdown_complete_receiver),
}
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<'a> Janus<'a> {
task: &Task,
) -> Janus<'a> {
// Start the Janus interop aggregator container running.
let endpoint = task.aggregator_url(task.role).unwrap();
let endpoint = task.aggregator_url(task.role()).unwrap();
let container = container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(network)
Expand All @@ -77,7 +77,7 @@ impl<'a> Janus<'a> {
assert_eq!(resp.get("status"), Some(&Some("success".to_string())));

Self::Container {
role: task.role,
role: *task.role(),
container,
}
}
Expand Down
105 changes: 33 additions & 72 deletions integration_tests/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,69 +7,36 @@ use janus_collector::{
use janus_core::{
hpke::{test_util::generate_test_hpke_config_and_private_key, HpkePrivateKey},
retries::test_http_request_exponential_backoff,
task::{AuthenticationToken, VdafInstance},
task::VdafInstance,
time::{Clock, RealClock, TimeExt},
};
use janus_messages::{Duration, HpkeConfig, Interval, Role};
use janus_server::{
messages::DurationExt,
task::{test_util::generate_auth_token, Task, PRIO3_AES128_VERIFY_KEY_LENGTH},
SecretBytes,
};
use janus_messages::{Duration, Interval, Role};
use janus_server::task::{test_util::TaskBuilder, QueryType, Task};
use prio::vdaf::prio3::Prio3;
use rand::random;
use reqwest::Url;
use std::iter;
use tokio::time;

// Returns (leader_task, helper_task).
pub fn create_test_tasks(collector_hpke_config: &HpkeConfig) -> (Task, Task) {
// Generate parameters.
let task_id = random();
let buf: [u8; 4] = random();
let endpoints = Vec::from([
Url::parse(&format!("http://leader-{}:8080/", hex::encode(buf))).unwrap(),
Url::parse(&format!("http://helper-{}:8080/", hex::encode(buf))).unwrap(),
]);
let vdaf_verify_key: [u8; PRIO3_AES128_VERIFY_KEY_LENGTH] = random();
let vdaf_verify_keys = Vec::from([SecretBytes::new(vdaf_verify_key.to_vec())]);
let aggregator_auth_tokens = Vec::from([generate_auth_token()]);

// Create tasks & return.
let leader_task = Task::new(
task_id,
endpoints.clone(),
// Returns (collector_private_key, leader_task, helper_task).
pub fn test_task_builders() -> (HpkePrivateKey, TaskBuilder, TaskBuilder) {
let endpoint_random_value = hex::encode(random::<[u8; 4]>());
let (collector_hpke_config, collector_private_key) =
generate_test_hpke_config_and_private_key();
let leader_task = TaskBuilder::new(
QueryType::TimeInterval,
VdafInstance::Prio3Aes128Count.into(),
Role::Leader,
vdaf_verify_keys.clone(),
1,
46,
Duration::from_hours(8).unwrap(),
Duration::from_minutes(10).unwrap(),
collector_hpke_config.clone(),
aggregator_auth_tokens.clone(),
Vec::from([generate_auth_token()]),
Vec::from([generate_test_hpke_config_and_private_key()]),
)
.unwrap();
let helper_task = Task::new(
task_id,
endpoints,
VdafInstance::Prio3Aes128Count.into(),
Role::Helper,
vdaf_verify_keys,
1,
46,
Duration::from_hours(8).unwrap(),
Duration::from_minutes(10).unwrap(),
collector_hpke_config.clone(),
aggregator_auth_tokens,
Vec::new(),
Vec::from([generate_test_hpke_config_and_private_key()]),
)
.unwrap();
.with_aggregator_endpoints(Vec::from([
Url::parse(&format!("http://leader-{endpoint_random_value}:8080/")).unwrap(),
Url::parse(&format!("http://helper-{endpoint_random_value}:8080/")).unwrap(),
]))
.with_min_batch_size(46)
.with_collector_hpke_config(collector_hpke_config);
let helper_task = leader_task.clone().with_role(Role::Helper);

(leader_task, helper_task)
(collector_private_key, leader_task, helper_task)
}

pub fn translate_url_for_external_access(url: &Url, external_port: u16) -> Url {
Expand All @@ -86,33 +53,32 @@ pub async fn submit_measurements_and_verify_aggregate(
) {
// Translate aggregator endpoints for our perspective outside the container network.
let aggregator_endpoints: Vec<_> = leader_task
.aggregator_endpoints
.aggregator_endpoints()
.iter()
.zip([leader_port, helper_port])
.map(|(url, port)| translate_url_for_external_access(url, port))
.collect();

// Create client.
let task_id = leader_task.id;
let vdaf = Prio3::new_aes128_count(2).unwrap();
let client_parameters = ClientParameters::new(
task_id,
*leader_task.id(),
aggregator_endpoints.clone(),
leader_task.min_batch_duration,
*leader_task.time_precision(),
);
let http_client = janus_client::default_http_client().unwrap();
let leader_report_config = janus_client::aggregator_hpke_config(
&client_parameters,
Role::Leader,
task_id,
&Role::Leader,
leader_task.id(),
&http_client,
)
.await
.unwrap();
let helper_report_config = janus_client::aggregator_hpke_config(
&client_parameters,
Role::Helper,
task_id,
&Role::Helper,
leader_task.id(),
&http_client,
)
.await
Expand All @@ -132,7 +98,7 @@ pub async fn submit_measurements_and_verify_aggregate(
// We generate exactly one batch's worth of measurement uploads to work around an issue in
// Daphne at time of writing.
let clock = RealClock::default();
let total_measurements: usize = leader_task.min_batch_size.try_into().unwrap();
let total_measurements: usize = leader_task.min_batch_size().try_into().unwrap();
let num_nonzero_measurements = total_measurements / 2;
let num_zero_measurements = total_measurements - num_nonzero_measurements;
assert!(num_nonzero_measurements > 0 && num_zero_measurements > 0);
Expand All @@ -147,23 +113,18 @@ pub async fn submit_measurements_and_verify_aggregate(
// Send a collect request.
let batch_interval = Interval::new(
before_timestamp
.to_batch_unit_interval_start(leader_task.min_batch_duration)
.to_batch_unit_interval_start(leader_task.time_precision())
.unwrap(),
// Use two minimum batch durations as the interval duration in order to avoid a race
// condition if this test happens to run very close to the end of a batch window.
Duration::from_seconds(2 * leader_task.min_batch_duration.as_seconds()),
// Use two time precisions as the interval duration in order to avoid a race condition if
// this test happens to run very close to the end of a batch window.
Duration::from_seconds(2 * leader_task.time_precision().as_seconds()),
)
.unwrap();
let collector_params = CollectorParameters::new(
task_id,
*leader_task.id(),
aggregator_endpoints[Role::Leader.index().unwrap()].clone(),
AuthenticationToken::from(
leader_task
.primary_collector_auth_token()
.as_bytes()
.to_vec(),
),
leader_task.collector_hpke_config.clone(),
leader_task.primary_collector_auth_token().clone(),
leader_task.collector_hpke_config().clone(),
collector_private_key.clone(),
)
.with_http_request_backoff(test_http_request_exponential_backoff())
Expand Down
48 changes: 25 additions & 23 deletions integration_tests/tests/daphne.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
#![cfg(feature = "daphne")]

use common::{create_test_tasks, submit_measurements_and_verify_aggregate};
use common::{submit_measurements_and_verify_aggregate, test_task_builders};
use integration_tests::{daphne::Daphne, janus::Janus};
use interop_binaries::test_util::generate_network_name;
use janus_core::{
hpke::test_util::generate_test_hpke_config_and_private_key,
test_util::{install_test_trace_subscriber, testcontainers::container_client},
};
use janus_core::test_util::{install_test_trace_subscriber, testcontainers::container_client};
use janus_messages::Role;
use janus_server::task::Task;

mod common;

Expand All @@ -18,17 +16,19 @@ async fn daphne_janus() {

// Start servers.
let network = generate_network_name();
let (collector_hpke_config, collector_private_key) =
generate_test_hpke_config_and_private_key();
let (mut leader_task, mut helper_task) = create_test_tasks(&collector_hpke_config);
let (collector_private_key, leader_task, helper_task) = test_task_builders();

// Daphne is hardcoded to serve from a path starting with /v01/.
for task in [&mut leader_task, &mut helper_task] {
task.aggregator_endpoints
.get_mut(Role::Leader.index().unwrap())
.unwrap()
.set_path("/v01/");
}
let [leader_task, helper_task]: [Task; 2] = [leader_task, helper_task]
.into_iter()
.map(|task| {
let mut endpoints = task.aggregator_endpoints().to_vec();
endpoints[Role::Leader.index().unwrap()].set_path("/v01/");
task.with_aggregator_endpoints(endpoints).build()
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

let container_client = container_client();
let leader = Daphne::new(&container_client, &network, &leader_task).await;
Expand All @@ -50,17 +50,19 @@ async fn janus_daphne() {

// Start servers.
let network = generate_network_name();
let (collector_hpke_config, collector_private_key) =
generate_test_hpke_config_and_private_key();
let (mut leader_task, mut helper_task) = create_test_tasks(&collector_hpke_config);
let (collector_private_key, leader_task, helper_task) = test_task_builders();

// Daphne is hardcoded to serve from a path starting with /v01/.
for task in [&mut leader_task, &mut helper_task] {
task.aggregator_endpoints
.get_mut(Role::Helper.index().unwrap())
.unwrap()
.set_path("/v01/");
}
let [leader_task, helper_task]: [Task; 2] = [leader_task, helper_task]
.into_iter()
.map(|task| {
let mut endpoints = task.aggregator_endpoints().to_vec();
endpoints[Role::Helper.index().unwrap()].set_path("/v01/");
task.with_aggregator_endpoints(endpoints).build()
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

let container_client = container_client();
let leader = Janus::new_in_container(&container_client, &network, &leader_task).await;
Expand Down
Loading

0 comments on commit a9eb40e

Please sign in to comment.