Skip to content

Commit 564f956

Browse files
committed
chore: misc bug fixes, add logs for outbound req
1 parent 6758e7e commit 564f956

File tree

7 files changed

+23
-16
lines changed

7 files changed

+23
-16
lines changed

engine/packages/gasoline/src/ctx/operation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub struct OperationCtx {
2929
pools: rivet_pools::Pools,
3030
cache: rivet_cache::Cache,
3131
msg_ctx: MessageCtx,
32-
from_workflow: bool,
32+
pub(crate) from_workflow: bool,
3333
}
3434

3535
impl OperationCtx {

engine/packages/gasoline/src/ctx/standalone.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl StandaloneCtx {
8585

8686
#[tracing::instrument(skip_all)]
8787
pub fn new_from_operation(ctx: &OperationCtx, req_id: Id) -> WorkflowResult<Self> {
88-
let mut ctx = StandaloneCtx::new(
88+
let mut new_ctx = StandaloneCtx::new(
8989
ctx.db().clone(),
9090
ctx.config().clone(),
9191
ctx.pools().clone(),
@@ -95,9 +95,9 @@ impl StandaloneCtx {
9595
req_id,
9696
)?;
9797

98-
ctx.from_workflow = ctx.from_workflow;
98+
new_ctx.from_workflow = ctx.from_workflow;
9999

100-
Ok(ctx)
100+
Ok(new_ctx)
101101
}
102102
}
103103

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ impl CustomServeTrait for PegboardGateway {
308308
) => {
309309
tracing::debug!(?close, "server closed websocket");
310310

311-
312311
if open_msg.can_hibernate && close.retry {
313312
// Successful closure
314313
return Err(WebSocketServiceRetry.build());

engine/packages/pegboard-serverless/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,16 @@ async fn outbound_handler(
387387

388388
return Ok(());
389389
}
390+
Err(sse::Error::InvalidStatusCode(code, res)) => {
391+
let body = res
392+
.text()
393+
.await
394+
.unwrap_or_else(|_| "<could not read body>".to_string());
395+
bail!(
396+
"invalid status code ({code}):\n{}",
397+
util::safe_slice(&body, 0, 512)
398+
);
399+
}
390400
Err(err) => return Err(err.into()),
391401
}
392402
}

engine/packages/pegboard/src/workflows/actor/actor_keys.rs renamed to engine/packages/pegboard/src/workflows/actor/keys.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,21 +241,20 @@ pub async fn reserve_actor_key(
241241
input.name.clone(),
242242
input.key.clone(),
243243
));
244-
let (start, end) = actor_key_subspace.range();
245244

246245
let mut stream = tx.get_ranges_keyvalues(
247246
universaldb::RangeOption {
248247
mode: StreamingMode::Iterator,
249-
..(start, end).into()
248+
..(&actor_key_subspace).into()
250249
},
251250
Serializable,
252251
);
253252

254253
while let Some(entry) = stream.try_next().await? {
255-
let (_idx_key, data) = tx.read_entry::<keys::ns::ActorByKeyKey>(&entry)?;
254+
let (idx_key, data) = tx.read_entry::<keys::ns::ActorByKeyKey>(&entry)?;
256255
if !data.is_destroyed {
257256
return Ok(ReserveActorKeyOutput::ExistingActor {
258-
existing_actor_id: _idx_key.actor_id,
257+
existing_actor_id: idx_key.actor_id,
259258
});
260259
}
261260
}

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use rivet_types::actors::CrashPolicy;
55

66
use crate::{errors, workflows::runner::AllocatePendingActorsInput};
77

8-
mod actor_keys;
98
mod destroy;
9+
mod keys;
1010
mod runtime;
1111
mod setup;
1212

@@ -147,7 +147,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
147147
.await?;
148148

149149
if let Some(key) = &input.key {
150-
match actor_keys::reserve_key(
150+
match keys::reserve_key(
151151
ctx,
152152
input.namespace_id,
153153
input.name.clone(),
@@ -156,8 +156,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
156156
)
157157
.await?
158158
{
159-
actor_keys::ReserveKeyOutput::Success => {}
160-
actor_keys::ReserveKeyOutput::ForwardToDatacenter { dc_label } => {
159+
keys::ReserveKeyOutput::Success => {}
160+
keys::ReserveKeyOutput::ForwardToDatacenter { dc_label } => {
161161
ctx.msg(Failed {
162162
error: errors::Actor::KeyReservedInDifferentDatacenter {
163163
datacenter_label: dc_label,
@@ -181,7 +181,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
181181

182182
return Ok(());
183183
}
184-
actor_keys::ReserveKeyOutput::KeyExists { existing_actor_id } => {
184+
keys::ReserveKeyOutput::KeyExists { existing_actor_id } => {
185185
ctx.msg(Failed {
186186
error: errors::Actor::DuplicateKey {
187187
key: key.clone(),
@@ -696,6 +696,7 @@ pub struct Lost {
696696
/// Immediately reschedules the actor regardless of its crash policy.
697697
pub force_reschedule: bool,
698698
/// Resets the rescheduling retry count to 0.
699+
#[serde(default)]
699700
pub reset_rescheduling: bool,
700701
}
701702

engine/packages/pegboard/src/workflows/runner.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,8 +1022,6 @@ pub(crate) async fn allocate_pending_actors(
10221022
let mut stream = tx.get_ranges_keyvalues(
10231023
universaldb::RangeOption {
10241024
mode: StreamingMode::Iterator,
1025-
// Containers bin pack so we reverse the order
1026-
reverse: true,
10271025
..(&runner_alloc_subspace).into()
10281026
},
10291027
// NOTE: This is not Serializable because we don't want to conflict with all of the

0 commit comments

Comments
 (0)