Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/pubsub/src/publisher/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use tokio::task::JoinSet;

use crate::generated::gapic_dataplane::client::Publisher as GapicPublisher;
use crate::publisher::worker::BundledMessage;
use std::sync::Arc;
Expand Down Expand Up @@ -70,14 +72,15 @@ impl Batch {
&mut self,
client: GapicPublisher,
topic: String,
) -> tokio::task::JoinHandle<()> {
inflight: &mut JoinSet<()>,
) {
let batch_to_send = Self {
initial_size: self.initial_size,
messages: self.messages.drain(..).collect(),
messages_byte_size: self.messages_byte_size,
};
self.messages_byte_size = self.initial_size;
tokio::spawn(batch_to_send.send(client, topic))
inflight.spawn(batch_to_send.send(client, topic));
}

/// Send the batch to the service and process the results.
Expand Down Expand Up @@ -122,6 +125,7 @@ mod tests {
publisher::batch::Batch,
publisher::worker::BundledMessage,
};
use tokio::task::JoinSet;

mockall::mock! {
#[derive(Debug)]
Expand Down Expand Up @@ -157,8 +161,8 @@ mod tests {
}
});
let client = GapicPublisher::from_stub(mock);
// Also apply pressure on inflight to validate the mock.
batch.flush(client, "topic".to_string());
let mut inflight = JoinSet::new();
batch.flush(client, "topic".to_string(), &mut inflight);
assert_eq!(batch.len(), 0);
}

Expand Down Expand Up @@ -200,7 +204,8 @@ mod tests {
}
});
let client = GapicPublisher::from_stub(mock);
batch.flush(client, "topic".to_string());
let mut inflight = JoinSet::new();
batch.flush(client, "topic".to_string(), &mut inflight);
assert_eq!(batch.size(), "topic".len() as u32);
}

Expand Down
95 changes: 94 additions & 1 deletion src/pubsub/src/publisher/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ mod tests {
generated::gapic_dataplane::client::Publisher as GapicPublisher,
model::{PublishResponse, PubsubMessage},
};
use mockall::Sequence;

mockall::mock! {
#[derive(Debug)]
Expand All @@ -262,6 +263,20 @@ mod tests {
}
}

// Similar to GapicPublisher but returns impl Future instead.
// This is useful for mocking a response with delays/timeouts.
// See https://github.com/asomers/mockall/issues/189 for more
// detail on why this is needed.
// While this can used inplace of GapicPublisher, it makes the
// normal usage without async closure much more cumbersome.
mockall::mock! {
#[derive(Debug)]
GapicPublisherWithFuture {}
impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
fn publish(&self, req: crate::model::PublishRequest, _options: gax::options::RequestOptions) -> impl Future<Output=gax::Result<gax::response::Response<crate::model::PublishResponse>>> + Send;
}
}

#[tokio::test]
async fn test_worker_success() {
let mut mock = MockGapicPublisher::new();
Expand Down Expand Up @@ -662,7 +677,7 @@ mod tests {

#[tokio::test(start_paused = true)]
#[allow(clippy::get_first)]
async fn test_batching_ordering_key() {
async fn test_batching_on_ordering_key() {
// Publish messages with different ordering key and validate that they are in different batches.
let mut mock = MockGapicPublisher::new();
mock.expect_publish().returning({
Expand Down Expand Up @@ -774,6 +789,84 @@ mod tests {
}
}

#[tokio::test(start_paused = true)]
#[allow(clippy::get_first)]
async fn test_ordering_key_only_one_outstanding_batch() {
// Verify that Publisher must only have 1 outstanding batch inflight at a time.
// This is done by validating that the 2 expected publish calls are done in sequence
// with a sleep delay in the first Publish reply.
let mut seq = Sequence::new();
let mut mock = MockGapicPublisherWithFuture::new();
mock.expect_publish()
.times(1)
.in_sequence(&mut seq)
.returning({
|r, _| {
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(r.topic, "my-topic");
assert_eq!(r.messages.len(), 1);
let ids = r
.messages
.iter()
.map(|m| String::from_utf8(m.data.to_vec()).unwrap());
Ok(gax::response::Response::from(
PublishResponse::new().set_message_ids(ids),
))
})
}
});

mock.expect_publish()
.times(1)
.in_sequence(&mut seq)
.returning({
|r, _| {
Box::pin(async move {
assert_eq!(r.topic, "my-topic");
assert_eq!(r.messages.len(), 1);
let ids = r
.messages
.iter()
.map(|m| String::from_utf8(m.data.to_vec()).unwrap());
Ok(gax::response::Response::from(
PublishResponse::new().set_message_ids(ids),
))
})
}
});

let client = GapicPublisher::from_stub(mock);
// Use a low message count to trigger batch sends.
let publisher = PublisherBuilder::new(client, "my-topic".to_string())
.set_message_count_threshold(1_u32)
.set_byte_threshold(MAX_BYTES)
.set_delay_threshold(std::time::Duration::MAX)
.build();

let messages = [
PubsubMessage::new()
.set_data("hello 1".to_string())
.set_ordering_key("ordering key"),
PubsubMessage::new()
.set_data("hello 2".to_string())
.set_ordering_key("ordering key"),
];

let start = tokio::time::Instant::now();
let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
assert_eq!(msg2_handle.await.expect("expected message id"), "hello 2");
assert_eq!(
start.elapsed(),
Duration::from_millis(10),
"the second batch of messages should have sent after the first which is has been delayed by {:?}",
Duration::from_millis(10)
);
// Also validate the content of the first publish.
assert_eq!(msg1_handle.await.expect("expected message id"), "hello 1");
}

#[tokio::test]
async fn builder() -> anyhow::Result<()> {
let client = Client::builder().build().await?;
Expand Down
Loading