Skip to content

Conversation

@lynnagara
Copy link
Member

@lynnagara lynnagara commented Dec 23, 2025

What this does:

  • introduces the executor. this module splits the request, calls upstreams, enforces timeouts, and merges the responses back for the caller

  • wires up the executor to the the ingest router service so the service now returns real results from upstreams.

  • adds a test that spins up the mock relay api to demonstrate the project config endpoint working end to end

  • proper timeout handling still needs to be implemented (different timeouts for first and subsequent results), added these as TODOs in code

This PR is built on top of #92 -- please review that one first

- introduces the executor. this module splits the request, calls upstreams,
enforces timeouts, and merges the responses back for the caller

- wires up the executor to the the ingest router service so
the service now returns real results from upstreams.

- adds a test that spins up the mock relay api to demonstrate the
project config endpoint working end to end
@lynnagara lynnagara requested a review from a team as a code owner December 23, 2025 22:15
request_data = self._read_json_body()
query_params = parse_qs(parsed_path.query)
version = query_params.get("version", ["2"])[0]
version = query_params.get("version", ["3"])[0]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

version 2 doesn't exist anymore, it shouldn't be default

Comment on lines +29 to +76
// Mock backup provider for testing
struct MockBackupProvider {
data: RouteData,
}

#[async_trait::async_trait]
impl BackupRouteProvider for MockBackupProvider {
async fn load(&self) -> Result<RouteData, locator::backup_routes::BackupError> {
Ok(self.data.clone())
}

async fn store(&self, _data: &RouteData) -> Result<(), locator::backup_routes::BackupError> {
Ok(())
}
}

pub async fn create_test_locator(key_to_cell: HashMap<String, String>) -> Locator {
let route_data = RouteData::from(
key_to_cell,
"cursor".to_string(),
HashMap::from([
("us1".to_string(), "us".to_string()),
("us2".to_string(), "us".to_string()),
]),
);

let provider = Arc::new(MockBackupProvider { data: route_data });

let service = locator::locator::Locator::new(
locator::config::LocatorDataType::ProjectKey,
"http://invalid-control-plane:9000".to_string(),
provider,
None,
);

let locator = Locator::from_in_process_service(service);

// Wait for locator to be ready
for _ in 0..50 {
if locator.is_ready() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
assert!(locator.is_ready(), "Locator should be ready");

locator
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is not new just moved as-is from the project-config/handler code so it can be reused more easily

Comment on lines +109 to +112
fn spawn() -> std::io::Result<Self> {
let child = Command::new("python")
.arg("../scripts/mock_relay_api.py")
.spawn()?;

This comment was marked as outdated.

Comment on lines 76 to 86
tokio::select! {
_ = &mut timeout => break,
join_result = join_set.join_next() => {
match join_result {
Some(Ok(result)) => results.push(result),
Some(Err(e)) => tracing::error!("Task panicked: {}", e),
None => break,
}
}
}
}

This comment was marked as outdated.

Comment on lines +82 to +83
// TODO: panicked task should be added to the results vec as error
Some(Err(e)) => tracing::error!("Task panicked: {}", e),

This comment was marked as outdated.

use crate::testutils::get_mock_provider;
use std::sync::Arc;
use crate::project_config::protocol::ProjectConfigsResponse;
use crate::testutils::create_test_locator;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: When execute_parallel times out, it returns partial results. Downstream logic then fails to add keys from timed-out cells to the pending_keys list, causing silent data loss.
Severity: CRITICAL | Confidence: High

🔍 Detailed Analysis

The execute_parallel function can return incomplete results when a timeout occurs. The tokio::select! loop breaks on timeout, returning only the results collected so far. Consequently, the merge_responses function, which iterates over these results, never processes the cells that timed out. Because these timed-out cells are missing from the response vector, their associated keys are never added to pending_keys for a retry attempt. This leads to silent data loss, as keys that should be retried are dropped instead.

💡 Suggested Fix

Before breaking from the loop in execute_parallel due to a timeout, identify which cells have not yet responded. For each of these missing cells, add an error result to the results vector. This ensures the downstream merge_responses function is aware of the timeout and can correctly add the associated keys to pending_keys.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: ingest-router/src/lib.rs#L101

Potential issue: The `execute_parallel` function can return incomplete results when a
timeout occurs. The `tokio::select!` loop breaks on timeout, returning only the results
collected so far. Consequently, the `merge_responses` function, which iterates over
these results, never processes the cells that timed out. Because these timed-out cells
are missing from the response vector, their associated keys are never added to
`pending_keys` for a retry attempt. This leads to silent data loss, as keys that should
be retried are dropped instead.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 7891150

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants