Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions example_config_ingest_router.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ ingest_router:
base_dir: target/cache
filename: backup.bin
compression: zstd1
localities:
- us
- de
locality_to_default_cell:
us: us1
de: de1
Expand Down
4 changes: 4 additions & 0 deletions example_config_locator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ locator:
base_dir: target/cache
filename: backup.bin
compression: zstd1
# Optional list of localities to restrict the locator to. If not specified,
# control plane data for all localities will be loaded.
localities:
- us
locality_to_default_cell:
us: us1
# data type must be organization or project_key
Expand Down
3 changes: 3 additions & 0 deletions example_config_proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ proxy:
base_dir: target/cache
filename: backup.bin
compression: zstd1
localities:
- us
- de
locality_to_default_cell:
us: us1
control_plane:
Expand Down
3 changes: 3 additions & 0 deletions ingest-router/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ pub enum LocatorType {
InProcess {
control_plane: ControlPlane,
backup_route_store: BackupRouteStore,
localities: Option<Vec<String>>,
locality_to_default_cell: Option<HashMap<String, String>>,
},
}
Expand All @@ -153,10 +154,12 @@ impl Locator {
LocatorType::InProcess {
control_plane,
backup_route_store,
localities,
locality_to_default_cell,
} => ClientLocatorType::InProcess {
control_plane_url: control_plane.url,
backup_route_store_type: backup_route_store.r#type,
localities,
locality_to_default_cell,
},
LocatorType::Url { url } => ClientLocatorType::Url { url },
Expand Down
1 change: 1 addition & 0 deletions ingest-router/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ mod tests {
"http://control-plane-url".to_string(),
Arc::new(provider),
None,
None,
);
let locator = Locator::from_in_process_service(locator_service);

Expand Down
1 change: 1 addition & 0 deletions ingest-router/src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub async fn create_test_locator(key_to_cell: HashMap<String, String>) -> Locato
"http://invalid-control-plane:9000".to_string(),
provider,
None,
None,
);

let locator = Locator::from_in_process_service(service);
Expand Down
2 changes: 2 additions & 0 deletions locator/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ pub async fn serve(
listener: ListenerConfig,
control_plane: ControlPlaneConfig,
provider: Arc<dyn BackupRouteProvider + 'static>,
localities: Option<Vec<String>>,
locality_to_default_cell: Option<HashMap<String, String>>,
) -> Result<(), LocatorApiError> {
let locator = Locator::new(
data_type,
control_plane.url,
provider,
localities,
locality_to_default_cell,
);
let app = Router::new()
Expand Down
3 changes: 3 additions & 0 deletions locator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub enum LocatorType {
InProcess {
control_plane_url: String,
backup_route_store_type: BackupRouteStoreType,
localities: Option<Vec<String>>,
locality_to_default_cell: Option<HashMap<String, String>>,
},
Url {
Expand All @@ -44,13 +45,15 @@ impl Locator {
LocatorType::InProcess {
control_plane_url,
backup_route_store_type,
localities,
locality_to_default_cell,
} => {
let provider = get_provider(backup_route_store_type).await?;
Ok(Locator(LocatorInner::InProcess(LocatorService::new(
config.data_type,
control_plane_url,
provider,
localities,
locality_to_default_cell,
))))
}
Expand Down
1 change: 1 addition & 0 deletions locator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct Config {
pub listener: Listener,
pub control_plane: ControlPlane,
pub backup_route_store: BackupRouteStore,
pub localities: Option<Vec<String>>,
pub locality_to_default_cell: Option<HashMap<String, String>>,
pub data_type: LocatorDataType,
}
36 changes: 35 additions & 1 deletion locator/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ pub enum ControlPlaneError {
pub struct ControlPlane {
client: reqwest::Client,
full_url: String,
localities: Option<Vec<String>>,
hmac_secret: Option<String>,
}

impl ControlPlane {
pub fn new(data_type: LocatorDataType, base_url: String) -> Self {
pub fn new(
data_type: LocatorDataType,
base_url: String,
localities: Option<Vec<String>>,
) -> Self {
let path = match data_type {
LocatorDataType::Organization => "api/0/internal/org-cell-mappings",
LocatorDataType::ProjectKey => "api/0/internal/projectkey-cell-mappings",
Expand All @@ -87,6 +92,7 @@ impl ControlPlane {
ControlPlane {
client: reqwest::Client::new(),
full_url,
localities,
hmac_secret,
}
}
Expand Down Expand Up @@ -161,6 +167,13 @@ impl ControlPlane {
url.query_pairs_mut().append_pair("cursor", c);
}

// Add locality query parameters if configured
if let Some(ref localities) = self.localities {
for locality in localities {
url.query_pairs_mut().append_pair("locality", locality);
}
}

// Build request with optional HMAC authentication
let mut request = self.client.get(url.clone());

Expand Down Expand Up @@ -229,6 +242,7 @@ mod tests {
let control_plane = ControlPlane::new(
LocatorDataType::Organization,
"http://127.0.0.1:9000/".to_string(),
None,
);
let response = control_plane.load_mappings(None).await;

Expand All @@ -239,6 +253,26 @@ mod tests {
assert_eq!(mapping.get("sentry0").unwrap(), "us1");
}

#[tokio::test]
async fn test_control_plane_with_localities() {
let _server = TestControlPlaneServer::spawn("127.0.0.1", 9002).unwrap();
let control_plane = ControlPlane::new(
LocatorDataType::Organization,
"http://127.0.0.1:9002/".to_string(),
Some(vec!["de".into()]),
);
let response = control_plane.load_mappings(None).await;

let mapping = response.unwrap().id_to_cell;

// Only the 3 "de" orgs (i=4,9,14) should be returned, each with id + slug = 6 entries
assert_eq!(mapping.len(), 6);
assert_eq!(mapping.get("4").unwrap(), "de1");
assert_eq!(mapping.get("sentry4").unwrap(), "de1");
assert_eq!(mapping.get("9").unwrap(), "de1");
assert_eq!(mapping.get("14").unwrap(), "de1");
}

#[test]
fn test_compute_hmac_signature() {
let secret = "test_secret";
Expand Down
1 change: 1 addition & 0 deletions locator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub async fn run(config: config::Config) -> Result<(), api::LocatorApiError> {
config.listener,
config.control_plane,
provider,
config.localities,
config.locality_to_default_cell,
)
.await
Expand Down
15 changes: 8 additions & 7 deletions locator/src/locator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl Locator {
data_type: LocatorDataType,
control_plane_url: String,
backup_provider: Arc<dyn BackupRouteProvider + 'static>,
localities: Option<Vec<String>>,
locality_to_default_cell: Option<HashMap<String, String>>,
) -> Self {
// Channel to send commands to the worker thread.
Expand All @@ -37,10 +38,9 @@ impl Locator {
data_type,
control_plane_url,
backup_provider,
localities,
locality_to_default_cell,
tx.clone(),
Duration::from_secs(60),
Duration::from_secs(1),
));

// Spawn the loader thread. All loading should happen from this thread.
Expand Down Expand Up @@ -153,10 +153,9 @@ impl IdToCell {
data_type: LocatorDataType,
control_plane_url: String,
backup_routes: Arc<dyn BackupRouteProvider + Send + Sync>,
localities: Option<Vec<String>>,
locality_to_default_cell: Option<HashMap<String, String>>,
tx: mpsc::Sender<Command>,
refresh_interval: std::time::Duration,
min_refresh_interval: std::time::Duration,
) -> Self {
let data = RouteDataWithTimestamp {
data: RouteData {
Expand All @@ -168,15 +167,15 @@ impl IdToCell {
};

IdToCell {
control_plane: ControlPlane::new(data_type, control_plane_url),
control_plane: ControlPlane::new(data_type, control_plane_url, localities),
locality_to_default_cell: locality_to_default_cell.unwrap_or_default(),
data: RwLock::new(data),
negative_cache: NegativeCache::new(),
update_lock: Semaphore::new(1),
ready: AtomicBool::new(false),
backup_routes,
refresh_interval,
min_refresh_interval,
refresh_interval: Duration::from_secs(60),
min_refresh_interval: Duration::from_secs(1),
tx,
}
}
Expand Down Expand Up @@ -434,6 +433,7 @@ mod tests {
LocatorDataType::Organization,
format!("http://{host}:{port}").to_string(),
provider.clone(),
None,
Some(HashMap::from([("de".into(), "de".into())])),
);

Expand Down Expand Up @@ -469,6 +469,7 @@ mod tests {
LocatorDataType::Organization,
"http://invalid-control-plane:9000".to_string(),
provider,
None,
Some(HashMap::from([("de".into(), "de".into())])),
);

Expand Down
3 changes: 3 additions & 0 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub enum LocatorType {
InProcess {
control_plane: ControlPlane,
backup_route_store: BackupRouteStore,
localities: Option<Vec<String>>,
locality_to_default_cell: Option<HashMap<String, String>>,
},
}
Expand All @@ -114,10 +115,12 @@ impl Locator {
LocatorType::InProcess {
control_plane,
backup_route_store,
localities,
locality_to_default_cell,
} => ClientLocatorType::InProcess {
control_plane_url: control_plane.url,
backup_route_store_type: backup_route_store.r#type,
localities,
locality_to_default_cell,
},
LocatorType::Url { url } => ClientLocatorType::Url { url },
Expand Down
1 change: 1 addition & 0 deletions proxy/src/resolvers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ mod tests {
"http://control-plane-url".to_string(),
Arc::new(provider),
None,
None,
);
let locator = Locator::from_in_process_service(service);

Expand Down
41 changes: 27 additions & 14 deletions scripts/mock_control_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,24 @@
HasMore = bool


CELL_TO_LOCALITY = {
"us1": "us",
"us2": "us",
"de1": "de",
}


def org_cell(i: int) -> str:
if i % 5 == 4:
return "de1"
return f"us{i % 2 + 1}"


ALL_ORG_RESULTS = [
{
"id": str(i),
"slug": f"sentry{i}",
"cell": f"us{i % 2 + 1}",
"cell": org_cell(i),
"updated_at": START_TIME + i
}
for i in range(TOTAL_RESULTS)
Expand All @@ -53,7 +66,7 @@ class EntityType(Enum):


def get_results(
entity: EntityType, cursor: Optional[str]
entity: EntityType, cursor: Optional[str], requested_localities: Optional[list[str]] = None
) -> tuple[Results, Cursor, HasMore]:

all_results = (
Expand All @@ -62,6 +75,9 @@ def get_results(
else ALL_ORG_RESULTS
)

if requested_localities:
all_results = [r for r in all_results if CELL_TO_LOCALITY.get(r["cell"]) in requested_localities]

from_idx: Optional[int] = None

if cursor is None:
Expand Down Expand Up @@ -92,9 +108,10 @@ def get_results(
return [], None, False
assert from_idx is not None

to_idx = min(from_idx + DEFAULT_PAGE_SIZE - 1, TOTAL_RESULTS - 1)
total = len(all_results)
to_idx = min(from_idx + DEFAULT_PAGE_SIZE - 1, total - 1)

has_more = to_idx < TOTAL_RESULTS - 1
has_more = to_idx < total - 1

if has_more:
next_result = all_results[to_idx + 1]
Expand Down Expand Up @@ -130,17 +147,15 @@ def do_GET(self):

if base_path == "/api/0/internal/org-cell-mappings/":
cursor = query_params.get("cursor", [None])[0]
(data, next_cursor, has_more) = get_results(EntityType.ORG, cursor)
localities = query_params.get("locality") or None
(data, next_cursor, has_more) = get_results(EntityType.ORG, cursor, localities)

response = {
"data": data,
"metadata": {
"cursor": next_cursor,
"has_more": has_more,
"cell_to_locality": {
"us1": "us",
"us2": "us",
},
"cell_to_locality": CELL_TO_LOCALITY,
},
}

Expand All @@ -151,17 +166,15 @@ def do_GET(self):

elif base_path == "/api/0/internal/projectkey-cell-mappings/":
cursor = query_params.get("cursor", [None])[0]
(data, next_cursor, has_more) = get_results(EntityType.PROJECT_KEY, cursor)
localities = query_params.get("locality") or None
(data, next_cursor, has_more) = get_results(EntityType.PROJECT_KEY, cursor, localities)

response = {
"data": data,
"metadata": {
"cursor": next_cursor,
"has_more": has_more,
"cell_to_locality": {
"us1": "us",
"us2": "us",
},
"cell_to_locality": CELL_TO_LOCALITY,
},
}

Expand Down