diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 117d0a506615f..f350b610ca683 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -253,6 +253,8 @@ void WorkerPool::RemoveWorkerProcess(State &state, state.worker_processes.erase(proc_startup_token); } +// Intuition is, job id only decides which parameters to use for the executable, reduce a +// few of them. std::pair, ProcessEnvironment> WorkerPool::BuildProcessCommandArgs(const Language &language, rpc::JobConfig *job_config, @@ -357,6 +359,10 @@ WorkerPool::BuildProcessCommandArgs(const Language &language, worker_command_args.push_back("--worker-launch-time-ms=" + std::to_string(current_sys_time_ms())); worker_command_args.push_back("--node-id=" + node_id_.Hex()); + if (!is_prefetch) { + worker_command_args.push_back("--runtime-env-hash=" + + std::to_string(runtime_env_hash)); + } worker_command_args.push_back("--runtime-env-hash=" + std::to_string(runtime_env_hash)); } else if (language == Language::CPP) { @@ -368,8 +374,10 @@ WorkerPool::BuildProcessCommandArgs(const Language &language, if (serialized_runtime_env_context != "{}" && !serialized_runtime_env_context.empty()) { worker_command_args.push_back("--language=" + Language_Name(language)); - worker_command_args.push_back("--serialized-runtime-env-context=" + - serialized_runtime_env_context); + if (!is_prefetch) { + worker_command_args.push_back("--serialized-runtime-env-context=" + + serialized_runtime_env_context); + } } else if (language == Language::PYTHON && worker_command_args.size() >= 2 && worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) { // Check that the arg really is the path to the setup worker before erasing it, to @@ -384,6 +392,8 @@ WorkerPool::BuildProcessCommandArgs(const Language &language, worker_command_args.push_back("--ray-debugger-external"); } + is_prefetch = false; + ProcessEnvironment env; if (!IsIOWorkerType(worker_type)) { // We pass the job ID to worker processes via an environment variable, so we don't @@ -1482,6 +1492,8 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec, } } +thread_local bool is_prefetch = false; + void WorkerPool::PrestartWorkersInternal(const TaskSpecification &task_spec, int64_t num_needed) { RAY_LOG(DEBUG) << "PrestartWorkers " << num_needed; @@ -1507,6 +1519,7 @@ void WorkerPool::PrestartWorkersInternal(const TaskSpecification &task_spec, << setup_error_message; return; } + is_prefetch = true; PopWorkerStatus status; StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,