Skip to content

[core][bug] Fix infinite loop on E2BIG error when spawning a process #54838

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 47 commits into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e2a335f
Fix E2BIG problem
mjacar Jul 22, 2025
dbd6813
Update src/ray/util/process.cc
mjacar Jul 22, 2025
c329b6f
Update src/ray/raylet/worker_pool.cc
mjacar Jul 22, 2025
e26f6fe
Update src/ray/raylet/worker_pool.cc
mjacar Jul 22, 2025
41a23f1
Remove debugging line
mjacar Jul 22, 2025
7c986cf
Add more logging for potential zombie processes
mjacar Jul 22, 2025
221abf0
[core] combine python protobuf compiles rules (#54810)
aslonnie Jul 22, 2025
15dfd9c
[core] Skip `test_object_assign_owner` in client mode (#54826)
edoakes Jul 22, 2025
a08d4ad
[Data] schema warning change (#54630)
iamjustinhsu Jul 22, 2025
4989f33
Fix fragments usage in ray.data.read_lance (#54707)
dshepelev15 Jul 22, 2025
73ec845
[serve] Avoid errant cancelation of LongPollClient method (#54832)
edoakes Jul 22, 2025
1474511
Update src/ray/raylet/worker_pool.cc
mjacar Jul 22, 2025
680192b
Update src/ray/raylet/worker_pool.h
mjacar Jul 22, 2025
34b6a70
Update src/ray/raylet/worker_pool.h
mjacar Jul 22, 2025
ec669af
Update src/ray/raylet/worker_pool.h
mjacar Jul 22, 2025
c088f34
Try hybrid approach
mjacar Jul 22, 2025
5aa46e8
Add dup2 call
mjacar Jul 22, 2025
ec7c609
Update src/ray/raylet/worker_pool.cc
mjacar Jul 22, 2025
f78b8ad
Update src/ray/raylet/worker_pool.cc
mjacar Jul 22, 2025
2c5c0c2
Code review
mjacar Jul 22, 2025
2864ac3
Code review
mjacar Jul 22, 2025
3f0e17f
Update src/ray/raylet/worker_pool.cc
mjacar Jul 22, 2025
57f64d5
Update src/ray/raylet/worker_pool.cc
mjacar Jul 22, 2025
79bc480
Update src/ray/raylet/worker_pool.cc
mjacar Jul 22, 2025
cbe635f
get rid of extra brace
mjacar Jul 22, 2025
f363ad0
Update src/ray/util/process.cc
mjacar Jul 22, 2025
87e8070
More robust reads
mjacar Jul 22, 2025
e01f3ec
Linter
mjacar Jul 22, 2025
587a56b
[serve.llm] Skip batching logic if `batch_interval_ms` == 0 (#54751)
lk-chen Jul 22, 2025
d85e0ad
[dashboard] fix typos (#54550)
yantarou Jul 22, 2025
b6fc13c
Potential fixes
mjacar Jul 23, 2025
dc4c614
[core][autoscaler] add release tests on RAY_UP_enable_autoscaler_v2=1…
rueian Jul 22, 2025
1053dfc
Potential fix
mjacar Jul 23, 2025
e424d43
Potential fix
mjacar Jul 23, 2025
d09aed0
More robust
mjacar Jul 23, 2025
76d8c00
Merge branch 'master' into fix-e2big-problem
mjacar Jul 23, 2025
a1530f5
Merge branch 'master' into fix-e2big-problem
mjacar Jul 23, 2025
d786a88
Update src/ray/raylet/local_task_manager.cc
mjacar Jul 23, 2025
131b55a
Code review
mjacar Jul 23, 2025
d038bea
Comments
mjacar Jul 24, 2025
9652d44
Merge branch 'master' into fix-e2big-problem
mjacar Jul 24, 2025
f25fc3e
Code review
mjacar Jul 24, 2025
ef3dc94
Annotate test
mjacar Jul 24, 2025
e347cdc
Potental test fix
mjacar Jul 25, 2025
18b7faf
Merge branch 'master' into fix-e2big-problem
mjacar Jul 25, 2025
b5e413d
Cleanup
mjacar Jul 25, 2025
3a71943
Merge branch 'master' into fix-e2big-problem
mjacar Jul 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions python/ray/tests/test_runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest

import ray
from ray.exceptions import RuntimeEnvSetupError
from ray.runtime_env import RuntimeEnv, RuntimeEnvConfig


Expand Down Expand Up @@ -167,5 +168,33 @@ def run(runtime_env):
run(runtime_env)


@pytest.mark.skipif(
sys.platform != "linux",
reason="The process spawning and error code passing behavior is Linux-specific",
)
def test_large_runtime_env_fails_fast(start_cluster_shared):
"""
Tests that a task with a runtime_env that is too large fails quickly
instead of hanging. This is a regression test for GitHub issue #47432.
"""
cluster, address = start_cluster_shared
ray.init(address)

# Create a runtime_env with a very large environment variable to trigger
# a E2BIG error.
large_env_vars = {"MY_HUGE_VAR": "X" * 4096 * 100}
runtime_env = {"env_vars": large_env_vars}

@ray.remote
def f():
# This code should not be reached.
return 1

# The E2BIG error from the raylet is propagated to the
# driver, which should raise a RuntimeEnvSetupError.
with pytest.raises(RuntimeEnvSetupError, match="Worker command arguments too long"):
ray.get(f.options(runtime_env=runtime_env).remote())


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
10 changes: 5 additions & 5 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,11 @@ bool LocalTaskManager::PoppedWorkerHandler(
<< "This node has available resources, but no worker processes "
"to grant the lease: status "
<< status;
if (status == PopWorkerStatus::RuntimeEnvCreationFailed) {
// In case of runtime env creation failed, we cancel this task
// directly and raise a `RuntimeEnvSetupError` exception to user
// eventually. The task will be removed from dispatch queue in
// `CancelTask`.
if (status == PopWorkerStatus::RuntimeEnvCreationFailed ||
status == PopWorkerStatus::ArgumentListTooLong) {
// In case of runtime env creation or worker startup failure, we cancel this task
// directly and raise an exception to user eventually. The task will be removed
// from the dispatch queue in `CancelTask`.
CancelTasks(
[task_id](const auto &work) {
return task_id == work->task.GetTaskSpecification().TaskId();
Expand Down
59 changes: 40 additions & 19 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,26 +210,31 @@ void WorkerPool::SetRuntimeEnvAgentClient(
runtime_env_agent_client_ = std::move(runtime_env_agent_client);
}

void WorkerPool::PopWorkerCallbackAsync(PopWorkerCallback callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) {
// This method shouldn't be invoked when runtime env creation has failed because
// when runtime env is failed to be created, they are all
// invoking the callback immediately.
RAY_CHECK(status != PopWorkerStatus::RuntimeEnvCreationFailed);
void WorkerPool::PopWorkerCallbackAsync(
PopWorkerCallback callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) {
// Call back this function asynchronously to make sure executed in different stack.
io_service_->post(
[this, callback = std::move(callback), worker = std::move(worker), status]() {
PopWorkerCallbackInternal(callback, worker, status);
[this,
callback = std::move(callback),
worker = std::move(worker),
status,
runtime_env_setup_error_message]() {
PopWorkerCallbackInternal(
callback, worker, status, runtime_env_setup_error_message);
},
"WorkerPool.PopWorkerCallback");
}

void WorkerPool::PopWorkerCallbackInternal(const PopWorkerCallback &callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) {
void WorkerPool::PopWorkerCallbackInternal(
const PopWorkerCallback &callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) {
RAY_CHECK(callback);
auto used = callback(worker, status, /*runtime_env_setup_error_message=*/"");
auto used = callback(worker, status, runtime_env_setup_error_message);
if (worker && !used) {
// The invalid worker not used, restore it to worker pool.
PushWorker(worker);
Expand Down Expand Up @@ -521,8 +526,16 @@ std::tuple<Process, StartupToken> WorkerPool::StartWorkerProcess(
state);

auto start = std::chrono::high_resolution_clock::now();
std::error_code ec;
// Start a process and measure the startup time.
Process proc = StartProcess(worker_command_args, env);
Process proc = StartProcess(worker_command_args, env, ec);
if (ec) {
RAY_CHECK(ec.value() == E2BIG);
RAY_LOG(WARNING) << "E2BIG error occurred when starting worker process. Worker "
"command arguments likely too long.";
*status = PopWorkerStatus::ArgumentListTooLong;
return {Process(), (StartupToken)-1};
}
stats::NumWorkersStarted.Record(1);
RAY_LOG(INFO) << "Started worker process with pid " << proc.GetId() << ", the token is "
<< worker_startup_token_counter_;
Expand Down Expand Up @@ -634,7 +647,8 @@ void WorkerPool::MonitorPopWorkerRequestForRegistration(
}

Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_args,
const ProcessEnvironment &env) {
const ProcessEnvironment &env,
std::error_code &ec) {
if (RAY_LOG_ENABLED(DEBUG)) {
std::string debug_info;
debug_info.append("Starting worker process with command:");
Expand All @@ -658,7 +672,6 @@ Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_
}

// Launch the process to create the worker.
std::error_code ec;
std::vector<const char *> argv;
for (const std::string &arg : worker_command_args) {
argv.push_back(arg.c_str());
Expand All @@ -667,8 +680,10 @@ Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_

Process child(argv.data(), io_service_, ec, /*decouple=*/false, env);
if (!child.IsValid() || ec) {
// errorcode 24: Too many files. This is caused by ulimit.
if (ec.value() == 24) {
if (ec.value() == E2BIG) {
// Do nothing here; the error code `ec` will be propagated to the caller.
} else if (ec.value() == 24) {
// errorcode 24: Too many files. This is caused by ulimit.
RAY_LOG(FATAL) << "Too many workers, failed to create a file. Try setting "
<< "`ulimit -n <num_files>` then restart Ray.";
} else {
Expand Down Expand Up @@ -1323,7 +1338,13 @@ void WorkerPool::StartNewWorker(
state.pending_start_requests.emplace_back(std::move(pop_worker_request));
} else {
DeleteRuntimeEnvIfPossible(serialized_runtime_env);
PopWorkerCallbackAsync(std::move(pop_worker_request->callback), nullptr, status);
// If we failed due to E2BIG, we provide a more specific error message.
const std::string error_msg = (status == PopWorkerStatus::ArgumentListTooLong)
? "Worker command arguments too long. This can "
"be caused by a large runtime environment."
: "";
PopWorkerCallbackAsync(
std::move(pop_worker_request->callback), nullptr, status, error_msg);
}
};

Expand Down
25 changes: 14 additions & 11 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,23 @@ using WorkerCommandMap =

enum PopWorkerStatus {
// OK.
// A registered worker will be returned with callback.
OK = 0,
// Job config is not found.
// A nullptr worker will be returned with callback.
JobConfigMissing = 1,
// Worker process startup rate is limited.
// A nullptr worker will be returned with callback.
TooManyStartingWorkerProcesses = 2,
// Worker process has been started, but the worker did not register at the raylet within
// the timeout.
// A nullptr worker will be returned with callback.
WorkerPendingRegistration = 3,
// Any fails of runtime env creation.
// A nullptr worker will be returned with callback.
RuntimeEnvCreationFailed = 4,
// The task's job has finished.
// A nullptr worker will be returned with callback.
JobFinished = 5,
// The worker process failed to launch because the OS returned an `E2BIG`
// (Argument list too long) error. This typically occurs when a `runtime_env`
// is so large that its serialized context exceeds the kernel's command-line
// argument size limit.
ArgumentListTooLong = 6,
};

/// \param[in] worker The started worker instance. Nullptr if worker is not started.
Expand Down Expand Up @@ -597,15 +596,17 @@ class WorkerPool : public WorkerPoolInterface {
/// the environment variables of the parent process.
/// \return An object representing the started worker process.
virtual Process StartProcess(const std::vector<std::string> &worker_command_args,
const ProcessEnvironment &env);
const ProcessEnvironment &env,
std::error_code &ec);

/// Push an warning message to user if worker pool is getting to big.
virtual void WarnAboutSize();

/// Make this synchronized function for unit test.
void PopWorkerCallbackInternal(const PopWorkerCallback &callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status);
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message);

/// Look up worker's dynamic options by startup token.
/// TODO(scv119): replace dynamic options by runtime_env.
Expand Down Expand Up @@ -772,9 +773,11 @@ class WorkerPool : public WorkerPoolInterface {

/// Call the `PopWorkerCallback` function asynchronously to make sure executed in
/// different stack.
virtual void PopWorkerCallbackAsync(PopWorkerCallback callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status);
virtual void PopWorkerCallbackAsync(
PopWorkerCallback callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message = "");

/// We manage all runtime env resources locally by the two methods:
/// `GetOrCreateRuntimeEnv` and `DeleteRuntimeEnvIfPossible`.
Expand Down
13 changes: 8 additions & 5 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,17 @@ class WorkerPoolMock : public WorkerPool {
using WorkerPool::PopWorkerCallbackInternal;

// Mock `PopWorkerCallbackAsync` to synchronized function.
void PopWorkerCallbackAsync(PopWorkerCallback callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status = PopWorkerStatus::OK) override {
PopWorkerCallbackInternal(callback, worker, status);
void PopWorkerCallbackAsync(
PopWorkerCallback callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message = "") override {
PopWorkerCallbackInternal(callback, worker, status, runtime_env_setup_error_message);
}

Process StartProcess(const std::vector<std::string> &worker_command_args,
const ProcessEnvironment &env) override {
const ProcessEnvironment &env,
std::error_code &ec) override {
// Use a bogus process ID that won't conflict with those in the system
auto pid = static_cast<pid_t>(PID_MAX_LIMIT + 1 + worker_commands_by_proc_.size());
last_worker_process_ = Process::FromPid(pid);
Expand Down
Loading