diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 5a189ea66f03..6f8249159a3e 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -9,6 +9,7 @@ import pytest import ray +from ray.exceptions import RuntimeEnvSetupError from ray.runtime_env import RuntimeEnv, RuntimeEnvConfig @@ -167,5 +168,33 @@ def run(runtime_env): run(runtime_env) +@pytest.mark.skipif( + sys.platform != "linux", + reason="The process spawning and error code passing behavior is Linux-specific", +) +def test_large_runtime_env_fails_fast(start_cluster_shared): + """ + Tests that a task with a runtime_env that is too large fails quickly + instead of hanging. This is a regression test for GitHub issue #47432. + """ + cluster, address = start_cluster_shared + ray.init(address) + + # Create a runtime_env with a very large environment variable to trigger + # a E2BIG error. + large_env_vars = {"MY_HUGE_VAR": "X" * 4096 * 100} + runtime_env = {"env_vars": large_env_vars} + + @ray.remote + def f(): + # This code should not be reached. + return 1 + + # The E2BIG error from the raylet is propagated to the + # driver, which should raise a RuntimeEnvSetupError. + with pytest.raises(RuntimeEnvSetupError, match="Worker command arguments too long"): + ray.get(f.options(runtime_env=runtime_env).remote()) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 44be7658d3d9..4e0894e909a2 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -606,11 +606,11 @@ bool LocalTaskManager::PoppedWorkerHandler( << "This node has available resources, but no worker processes " "to grant the lease: status " << status; - if (status == PopWorkerStatus::RuntimeEnvCreationFailed) { - // In case of runtime env creation failed, we cancel this task - // directly and raise a `RuntimeEnvSetupError` exception to user - // eventually. The task will be removed from dispatch queue in - // `CancelTask`. + if (status == PopWorkerStatus::RuntimeEnvCreationFailed || + status == PopWorkerStatus::ArgumentListTooLong) { + // In case of runtime env creation or worker startup failure, we cancel this task + // directly and raise an exception to user eventually. The task will be removed + // from the dispatch queue in `CancelTask`. CancelTasks( [task_id](const auto &work) { return task_id == work->task.GetTaskSpecification().TaskId(); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index c8e539f265e0..35d7c1843177 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -210,26 +210,31 @@ void WorkerPool::SetRuntimeEnvAgentClient( runtime_env_agent_client_ = std::move(runtime_env_agent_client); } -void WorkerPool::PopWorkerCallbackAsync(PopWorkerCallback callback, - std::shared_ptr worker, - PopWorkerStatus status) { - // This method shouldn't be invoked when runtime env creation has failed because - // when runtime env is failed to be created, they are all - // invoking the callback immediately. - RAY_CHECK(status != PopWorkerStatus::RuntimeEnvCreationFailed); +void WorkerPool::PopWorkerCallbackAsync( + PopWorkerCallback callback, + std::shared_ptr worker, + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message) { // Call back this function asynchronously to make sure executed in different stack. io_service_->post( - [this, callback = std::move(callback), worker = std::move(worker), status]() { - PopWorkerCallbackInternal(callback, worker, status); + [this, + callback = std::move(callback), + worker = std::move(worker), + status, + runtime_env_setup_error_message]() { + PopWorkerCallbackInternal( + callback, worker, status, runtime_env_setup_error_message); }, "WorkerPool.PopWorkerCallback"); } -void WorkerPool::PopWorkerCallbackInternal(const PopWorkerCallback &callback, - std::shared_ptr worker, - PopWorkerStatus status) { +void WorkerPool::PopWorkerCallbackInternal( + const PopWorkerCallback &callback, + std::shared_ptr worker, + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message) { RAY_CHECK(callback); - auto used = callback(worker, status, /*runtime_env_setup_error_message=*/""); + auto used = callback(worker, status, runtime_env_setup_error_message); if (worker && !used) { // The invalid worker not used, restore it to worker pool. PushWorker(worker); @@ -521,8 +526,16 @@ std::tuple WorkerPool::StartWorkerProcess( state); auto start = std::chrono::high_resolution_clock::now(); + std::error_code ec; // Start a process and measure the startup time. - Process proc = StartProcess(worker_command_args, env); + Process proc = StartProcess(worker_command_args, env, ec); + if (ec) { + RAY_CHECK(ec.value() == E2BIG); + RAY_LOG(WARNING) << "E2BIG error occurred when starting worker process. Worker " + "command arguments likely too long."; + *status = PopWorkerStatus::ArgumentListTooLong; + return {Process(), (StartupToken)-1}; + } stats::NumWorkersStarted.Record(1); RAY_LOG(INFO) << "Started worker process with pid " << proc.GetId() << ", the token is " << worker_startup_token_counter_; @@ -634,7 +647,8 @@ void WorkerPool::MonitorPopWorkerRequestForRegistration( } Process WorkerPool::StartProcess(const std::vector &worker_command_args, - const ProcessEnvironment &env) { + const ProcessEnvironment &env, + std::error_code &ec) { if (RAY_LOG_ENABLED(DEBUG)) { std::string debug_info; debug_info.append("Starting worker process with command:"); @@ -658,7 +672,6 @@ Process WorkerPool::StartProcess(const std::vector &worker_command_ } // Launch the process to create the worker. - std::error_code ec; std::vector argv; for (const std::string &arg : worker_command_args) { argv.push_back(arg.c_str()); @@ -667,8 +680,10 @@ Process WorkerPool::StartProcess(const std::vector &worker_command_ Process child(argv.data(), io_service_, ec, /*decouple=*/false, env); if (!child.IsValid() || ec) { - // errorcode 24: Too many files. This is caused by ulimit. - if (ec.value() == 24) { + if (ec.value() == E2BIG) { + // Do nothing here; the error code `ec` will be propagated to the caller. + } else if (ec.value() == 24) { + // errorcode 24: Too many files. This is caused by ulimit. RAY_LOG(FATAL) << "Too many workers, failed to create a file. Try setting " << "`ulimit -n ` then restart Ray."; } else { @@ -1323,7 +1338,13 @@ void WorkerPool::StartNewWorker( state.pending_start_requests.emplace_back(std::move(pop_worker_request)); } else { DeleteRuntimeEnvIfPossible(serialized_runtime_env); - PopWorkerCallbackAsync(std::move(pop_worker_request->callback), nullptr, status); + // If we failed due to E2BIG, we provide a more specific error message. + const std::string error_msg = (status == PopWorkerStatus::ArgumentListTooLong) + ? "Worker command arguments too long. This can " + "be caused by a large runtime environment." + : ""; + PopWorkerCallbackAsync( + std::move(pop_worker_request->callback), nullptr, status, error_msg); } }; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 3b394e3e4b88..a1913973c577 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -51,24 +51,23 @@ using WorkerCommandMap = enum PopWorkerStatus { // OK. - // A registered worker will be returned with callback. OK = 0, // Job config is not found. - // A nullptr worker will be returned with callback. JobConfigMissing = 1, // Worker process startup rate is limited. - // A nullptr worker will be returned with callback. TooManyStartingWorkerProcesses = 2, // Worker process has been started, but the worker did not register at the raylet within // the timeout. - // A nullptr worker will be returned with callback. WorkerPendingRegistration = 3, // Any fails of runtime env creation. - // A nullptr worker will be returned with callback. RuntimeEnvCreationFailed = 4, // The task's job has finished. - // A nullptr worker will be returned with callback. JobFinished = 5, + // The worker process failed to launch because the OS returned an `E2BIG` + // (Argument list too long) error. This typically occurs when a `runtime_env` + // is so large that its serialized context exceeds the kernel's command-line + // argument size limit. + ArgumentListTooLong = 6, }; /// \param[in] worker The started worker instance. Nullptr if worker is not started. @@ -597,7 +596,8 @@ class WorkerPool : public WorkerPoolInterface { /// the environment variables of the parent process. /// \return An object representing the started worker process. virtual Process StartProcess(const std::vector &worker_command_args, - const ProcessEnvironment &env); + const ProcessEnvironment &env, + std::error_code &ec); /// Push an warning message to user if worker pool is getting to big. virtual void WarnAboutSize(); @@ -605,7 +605,8 @@ class WorkerPool : public WorkerPoolInterface { /// Make this synchronized function for unit test. void PopWorkerCallbackInternal(const PopWorkerCallback &callback, std::shared_ptr worker, - PopWorkerStatus status); + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message); /// Look up worker's dynamic options by startup token. /// TODO(scv119): replace dynamic options by runtime_env. @@ -772,9 +773,11 @@ class WorkerPool : public WorkerPoolInterface { /// Call the `PopWorkerCallback` function asynchronously to make sure executed in /// different stack. - virtual void PopWorkerCallbackAsync(PopWorkerCallback callback, - std::shared_ptr worker, - PopWorkerStatus status); + virtual void PopWorkerCallbackAsync( + PopWorkerCallback callback, + std::shared_ptr worker, + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message = ""); /// We manage all runtime env resources locally by the two methods: /// `GetOrCreateRuntimeEnv` and `DeleteRuntimeEnvIfPossible`. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 4590780c8cd4..76455f6287d1 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -169,14 +169,17 @@ class WorkerPoolMock : public WorkerPool { using WorkerPool::PopWorkerCallbackInternal; // Mock `PopWorkerCallbackAsync` to synchronized function. - void PopWorkerCallbackAsync(PopWorkerCallback callback, - std::shared_ptr worker, - PopWorkerStatus status = PopWorkerStatus::OK) override { - PopWorkerCallbackInternal(callback, worker, status); + void PopWorkerCallbackAsync( + PopWorkerCallback callback, + std::shared_ptr worker, + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message = "") override { + PopWorkerCallbackInternal(callback, worker, status, runtime_env_setup_error_message); } Process StartProcess(const std::vector &worker_command_args, - const ProcessEnvironment &env) override { + const ProcessEnvironment &env, + std::error_code &ec) override { // Use a bogus process ID that won't conflict with those in the system auto pid = static_cast(PID_MAX_LIMIT + 1 + worker_commands_by_proc_.size()); last_worker_process_ = Process::FromPid(pid); diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index 3412b2d5f902..2fb7792a152c 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -80,6 +80,30 @@ void SetFdCloseOnExec(int fd) { RAY_CHECK_NE(ret, -1) << "fcntl error: errno = " << errno << ", fd = " << fd; RAY_LOG(DEBUG) << "set FD_CLOEXEC to fd " << fd; } + +// A helper function to robustly read a specific number of bytes from a file descriptor. +// This handles partial reads and interruptions by signals. +static inline ssize_t ReadBytesFromFd(int fd, void *buffer, size_t count) { + ssize_t total_bytes_read = 0; + while (total_bytes_read < (ssize_t)count) { + ssize_t bytes_read = read(fd, + reinterpret_cast(buffer) + total_bytes_read, + count - total_bytes_read); + if (bytes_read == 0) { + // EOF reached before all bytes were read. + return total_bytes_read; + } + if (bytes_read == -1) { + if (errno == EINTR) { + continue; // Interrupted by signal, retry. + } else { + return -1; // A real read error occurred. + } + } + total_bytes_read += bytes_read; + } + return total_bytes_read; +} #endif bool EnvironmentVariableLess::operator()(char a, char b) const { @@ -124,7 +148,6 @@ class ProcessFD { const ProcessEnvironment &env, bool pipe_to_stdin) { ec = std::error_code(); - intptr_t fd; pid_t pid; ProcessEnvironment new_env; for (char *const *e = environ; *e; ++e) { @@ -142,6 +165,7 @@ class ProcessFD { } #ifdef _WIN32 + intptr_t fd; (void)decouple; // Windows doesn't require anything particular for decoupling. std::vector args; for (size_t i = 0; argv[i]; ++i) { @@ -189,96 +213,161 @@ class ProcessFD { new_env_ptrs.push_back(static_cast(NULL)); char **envp = &new_env_ptrs[0]; - // TODO(mehrdadn): Use clone() on Linux or posix_spawnp() on Mac to avoid duplicating - // file descriptors into the child process, as that can be problematic. - int pipefds[2]; // Create pipe to get PID & track lifetime - int parent_lifetime_pipe[2]; - - // Create pipes to health check parent <> child. - // pipefds is used for parent to check child's health. - if (pipe(pipefds) == -1) { - pipefds[0] = pipefds[1] = -1; + intptr_t fd = -1; + // Pipe for getting startup status (PID and potential errno) from the child. + int status_pipe[2]; + if (pipe(status_pipe) == -1) { + ec = std::error_code(errno, std::system_category()); + return ProcessFD(-1, -1); } - // parent_lifetime_pipe is used for child to check parent's health. + + // Pipe for parent lifetime tracking, connected to child's stdin. + int parent_lifetime_pipe[2] = {-1, -1}; if (pipe_to_stdin) { if (pipe(parent_lifetime_pipe) == -1) { - parent_lifetime_pipe[0] = parent_lifetime_pipe[1] = -1; + close(status_pipe[0]); + close(status_pipe[1]); + ec = std::error_code(errno, std::system_category()); + return ProcessFD(-1, -1); } } - pid = pipefds[1] != -1 ? fork() : -1; + pid = fork(); - // If we don't pipe to stdin close pipes that are not needed. - if (pid <= 0 && pipefds[0] != -1) { - close(pipefds[0]); // not the parent, so close the read end of the pipe - pipefds[0] = -1; - } - if (pid != 0 && pipefds[1] != -1) { - close(pipefds[1]); // not the child, so close the write end of the pipe - pipefds[1] = -1; - // make sure the read end of the pipe is closed on exec - SetFdCloseOnExec(pipefds[0]); - } + if (pid == 0) { + // --- Child Process (or Intermediate Process if decoupled) --- + close(status_pipe[0]); // Child only writes to the status pipe. + if (pipe_to_stdin) { + close(parent_lifetime_pipe[1]); // Child only reads from the lifetime pipe. + } - // Create a pipe and redirect the read pipe to a child's stdin. - // Child can use it to detect the parent's lifetime. - // See the below link for details. - // https://stackoverflow.com/questions/12193581/detect-death-of-parent-process - if (pipe_to_stdin) { - if (pid <= 0 && parent_lifetime_pipe[1] != -1) { - // Child. Close sthe write end of the pipe from child. - close(parent_lifetime_pipe[1]); - parent_lifetime_pipe[1] = -1; - SetFdCloseOnExec(parent_lifetime_pipe[0]); + signal(SIGCHLD, SIG_DFL); + + if (decouple) { + if (fork() != 0) { + // --- Intermediate Parent --- + // This process must close ALL inherited pipe FDs before exiting + // to prevent leaking them to the grandchild or holding pipes open. + close(status_pipe[1]); + if (pipe_to_stdin) { + close(parent_lifetime_pipe[0]); + } + _exit(0); + } } - if (pid != 0 && parent_lifetime_pipe[0] != -1) { - // Parent. Close the read end of the pipe. + + // --- Grandchild (if decoupled) or Direct Child (if not) --- + if (pipe_to_stdin) { + if (dup2(parent_lifetime_pipe[0], STDIN_FILENO) == -1) { + _exit(errno); + } + // After dup2, this original FD is no longer needed. close(parent_lifetime_pipe[0]); - parent_lifetime_pipe[0] = -1; - // Make sure the write end of the pipe is closed on exec. - SetFdCloseOnExec(parent_lifetime_pipe[1]); } - } else { - // parent_lifetime_pipe pipes are not used. - parent_lifetime_pipe[0] = -1; - parent_lifetime_pipe[1] = -1; - } - if (pid == 0) { - // Child process case. Reset the SIGCHLD handler. - signal(SIGCHLD, SIG_DFL); - // If process needs to be decoupled, double-fork to avoid zombies. - if (pid_t pid2 = decouple ? fork() : 0) { - _exit(pid2 == -1 ? errno : 0); // Parent of grandchild; must exit + // If execve succeeds, this FD will be closed automatically. + if (!decouple) { + // Only set FD_CLOEXEC in the non-decouple case + SetFdCloseOnExec(status_pipe[1]); } - // Redirect the read pipe to stdin so that child can track the - // parent lifetime. - if (parent_lifetime_pipe[0] != -1) { - dup2(parent_lifetime_pipe[0], STDIN_FILENO); + if (decouple) { + pid_t my_pid = getpid(); + if (write(status_pipe[1], &my_pid, sizeof(my_pid)) != sizeof(my_pid)) { + _exit(errno); + } } - // This is the spawned process. Any intermediate parent is now dead. - pid_t my_pid = getpid(); - if (write(pipefds[1], &my_pid, sizeof(my_pid)) == sizeof(my_pid)) { - execvpe( - argv[0], const_cast(argv), const_cast(envp)); + execvpe(argv[0], const_cast(argv), const_cast(envp)); + + // If execvpe returns, an error occurred. Write errno to the pipe. + int err = errno; + (void)!write(status_pipe[1], &err, sizeof(err)); + _exit(err); + + } else if (pid > 0) { + // --- Parent Process --- + close(status_pipe[1]); // Parent only reads from the status pipe. + if (pipe_to_stdin) { + close(parent_lifetime_pipe[0]); // Parent only writes to the lifetime pipe. } - _exit(errno); // fork() succeeded and exec() failed, so abort the child - } - if (pid > 0) { - // Parent process case - if (decouple) { - int s; - (void)waitpid(pid, &s, 0); // can't do much if this fails, so ignore return value - int r = read(pipefds[0], &pid, sizeof(pid)); - (void)r; // can't do much if this fails, so ignore return value + + if (!decouple) { + // Simple case for non-decoupled process + int err_from_child; + ssize_t bytes_read = + ReadBytesFromFd(status_pipe[0], &err_from_child, sizeof(err_from_child)); + if (bytes_read == 0) { + // Success: exec'd, pipe closed by CLOEXEC. + ec = std::error_code(); + } else { + // Failure: got an error from child or pipe broke. + if (bytes_read == sizeof(err_from_child)) { + // We received a full error code from the child. + ec = std::error_code(err_from_child, std::system_category()); + } else { + // The pipe was closed before we could read the full error. + // This can happen if the child crashes. + // If read() returned an error, use that errno. Otherwise, use EPIPE. + ec = std::error_code(bytes_read < 0 ? errno : EPIPE, std::system_category()); + } + while (waitpid(pid, NULL, 0) == -1 && errno == EINTR) { + continue; + } + pid = -1; + } + close(status_pipe[0]); + } else { + while (waitpid(pid, NULL, 0) == -1 && errno == EINTR) { + continue; + } + + // Read the grandchild's PID from the pipe. + ssize_t bytes_read_pid = ReadBytesFromFd(status_pipe[0], &pid, sizeof(pid)); + if (bytes_read_pid != sizeof(pid)) { + // If we can't get the PID, it's a startup failure. + ec = std::error_code(ECHILD, std::system_category()); + pid = -1; + close(status_pipe[0]); + } else { + // We got the PID. Now, do a NON-BLOCKING read to check for an exec error. + int flags = fcntl(status_pipe[0], F_GETFL, 0); + fcntl(status_pipe[0], F_SETFL, flags | O_NONBLOCK); + int exec_errno = 0; + ssize_t bytes_read_errno = + read(status_pipe[0], &exec_errno, sizeof(exec_errno)); + fcntl(status_pipe[0], F_SETFL, flags); // Restore original flags. + + if (bytes_read_errno == sizeof(exec_errno)) { + // We got an error code back. Launch failed. + ec = std::error_code(exec_errno, std::system_category()); + pid = -1; + close(status_pipe[0]); + } else { + // No error code was present. Launch was successful. + // For backward compatibility with tests, we need to keep the pipe + // open but NOT mark it with FD_CLOEXEC, so that child processes + // inherit it and it stays open until all descendants exit. + ec = std::error_code(); + fd = status_pipe[0]; + + // Remove the FD_CLOEXEC flag that was set earlier + flags = fcntl(fd, F_GETFD, 0); + if (flags != -1) { + fcntl(fd, F_SETFD, flags & ~FD_CLOEXEC); + } + } + } } - } - // Use pipe to track process lifetime. (The pipe closes when process terminates.) - fd = pipefds[0]; - if (pid == -1) { + } else { + // --- Fork Failed --- ec = std::error_code(errno, std::system_category()); + close(status_pipe[0]); + close(status_pipe[1]); + if (pipe_to_stdin) { + close(parent_lifetime_pipe[0]); + close(parent_lifetime_pipe[1]); + } } #endif return ProcessFD(pid, fd);