-
-
Notifications
You must be signed in to change notification settings - Fork 1
ingest-router: add executor and actually return results to client #93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: handler-types
Are you sure you want to change the base?
Conversation
- introduces the executor. this module splits the request, calls upstreams, enforces timeouts, and merges the responses back for the caller - wires up the executor to the the ingest router service so the service now returns real results from upstreams. - adds a test that spins up the mock relay api to demonstrate the project config endpoint working end to end
| request_data = self._read_json_body() | ||
| query_params = parse_qs(parsed_path.query) | ||
| version = query_params.get("version", ["2"])[0] | ||
| version = query_params.get("version", ["3"])[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version 2 doesn't exist anymore, it shouldn't be default
| // Mock backup provider for testing | ||
| struct MockBackupProvider { | ||
| data: RouteData, | ||
| } | ||
|
|
||
| #[async_trait::async_trait] | ||
| impl BackupRouteProvider for MockBackupProvider { | ||
| async fn load(&self) -> Result<RouteData, locator::backup_routes::BackupError> { | ||
| Ok(self.data.clone()) | ||
| } | ||
|
|
||
| async fn store(&self, _data: &RouteData) -> Result<(), locator::backup_routes::BackupError> { | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| pub async fn create_test_locator(key_to_cell: HashMap<String, String>) -> Locator { | ||
| let route_data = RouteData::from( | ||
| key_to_cell, | ||
| "cursor".to_string(), | ||
| HashMap::from([ | ||
| ("us1".to_string(), "us".to_string()), | ||
| ("us2".to_string(), "us".to_string()), | ||
| ]), | ||
| ); | ||
|
|
||
| let provider = Arc::new(MockBackupProvider { data: route_data }); | ||
|
|
||
| let service = locator::locator::Locator::new( | ||
| locator::config::LocatorDataType::ProjectKey, | ||
| "http://invalid-control-plane:9000".to_string(), | ||
| provider, | ||
| None, | ||
| ); | ||
|
|
||
| let locator = Locator::from_in_process_service(service); | ||
|
|
||
| // Wait for locator to be ready | ||
| for _ in 0..50 { | ||
| if locator.is_ready() { | ||
| break; | ||
| } | ||
| tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; | ||
| } | ||
| assert!(locator.is_ready(), "Locator should be ready"); | ||
|
|
||
| locator | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is not new just moved as-is from the project-config/handler code so it can be reused more easily
| fn spawn() -> std::io::Result<Self> { | ||
| let child = Command::new("python") | ||
| .arg("../scripts/mock_relay_api.py") | ||
| .spawn()?; |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| tokio::select! { | ||
| _ = &mut timeout => break, | ||
| join_result = join_set.join_next() => { | ||
| match join_result { | ||
| Some(Ok(result)) => results.push(result), | ||
| Some(Err(e)) => tracing::error!("Task panicked: {}", e), | ||
| None => break, | ||
| } | ||
| } | ||
| } | ||
| } |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| use crate::testutils::get_mock_provider; | ||
| use std::sync::Arc; | ||
| use crate::project_config::protocol::ProjectConfigsResponse; | ||
| use crate::testutils::create_test_locator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: When execute_parallel times out, it returns partial results. Downstream logic then fails to add keys from timed-out cells to the pending_keys list, causing silent data loss.
Severity: CRITICAL | Confidence: High
🔍 Detailed Analysis
The execute_parallel function can return incomplete results when a timeout occurs. The tokio::select! loop breaks on timeout, returning only the results collected so far. Consequently, the merge_responses function, which iterates over these results, never processes the cells that timed out. Because these timed-out cells are missing from the response vector, their associated keys are never added to pending_keys for a retry attempt. This leads to silent data loss, as keys that should be retried are dropped instead.
💡 Suggested Fix
Before breaking from the loop in execute_parallel due to a timeout, identify which cells have not yet responded. For each of these missing cells, add an error result to the results vector. This ensures the downstream merge_responses function is aware of the timeout and can correctly add the associated keys to pending_keys.
🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: ingest-router/src/lib.rs#L101
Potential issue: The `execute_parallel` function can return incomplete results when a
timeout occurs. The `tokio::select!` loop breaks on timeout, returning only the results
collected so far. Consequently, the `merge_responses` function, which iterates over
these results, never processes the cells that timed out. Because these timed-out cells
are missing from the response vector, their associated keys are never added to
`pending_keys` for a retry attempt. This leads to silent data loss, as keys that should
be retried are dropped instead.
Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 7891150
What this does:
introduces the executor. this module splits the request, calls upstreams, enforces timeouts, and merges the responses back for the caller
wires up the executor to the the ingest router service so the service now returns real results from upstreams.
adds a test that spins up the mock relay api to demonstrate the project config endpoint working end to end
proper timeout handling still needs to be implemented (different timeouts for first and subsequent results), added these as TODOs in code
This PR is built on top of #92 -- please review that one first