Skip to content

Commit

Permalink
Merge branch 'master' into support_s3_presigned_url
Browse files Browse the repository at this point in the history
  • Loading branch information
yantzu authored Nov 6, 2024
2 parents c5f898d + 3df5c1b commit a5c07ac
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 132 deletions.
2 changes: 2 additions & 0 deletions doc/source/cluster/kubernetes/user-guides.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ user-guides/helm-chart-rbac
user-guides/tls
user-guides/k8s-autoscaler
user-guides/static-ray-cluster-without-kuberay
user-guides/kubectl-plugin
```


Expand Down Expand Up @@ -53,3 +54,4 @@ at the {ref}`introductory guide <kuberay-quickstart>` first.
* {ref}`kuberay-gke-bucket`
* {ref}`ray-k8s-autoscaler-comparison`
* {ref}`deploy-a-static-ray-cluster-without-kuberay`
* {ref}`kubectl-plugin`
68 changes: 68 additions & 0 deletions doc/source/cluster/kubernetes/user-guides/kubectl-plugin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
(kubectl-plugin)=

# Use kubectl Plugin (alpha)

Starting from KubeRay v1.2.2, you can use the `kubectl ray` plugin to simplify common workflows when deploying Ray on Kubernetes. If you aren't familiar with Kubernetes, this plugin simplifies running Ray on Kubernetes.

## Installation

See [KubeRay kubectl Plugin](https://github.com/ray-project/kuberay/tree/master/kubectl-plugin) to install the plugin.

Install the Kuberay kubectl plugin using one of the following methods:

- Install using Krew kubectl plugin manager (recommended)
- Download from GitHub releases

### Install using the Krew kubectl plugin manager (recommended)

1. Install [Krew](https://krew.sigs.k8s.io/docs/user-guide/setup/install/).
2. Download the plugin list by running `kubectl krew update`.
3. Install the plugin by running `kubectl krew install ray`.
4. Run `kubectl ray --help` to verify the installation.

### Download from GitHub releases

Go to the [releases page](https://github.com/ray-project/kuberay/releases) and download the binary for your platform.

For example, to install kubectl plugin version 1.2.2 on Linux amd64:

```bash
curl -LO https://github.com/ray-project/kuberay/releases/download/v1.2.2/kubectl-ray_v1.2.2_linux_amd64.tar.gz
tar -xvf kubectl-ray_v1.2.2_linux_amd64.tar.gz
cp kubectl-ray ~/.local/bin
```

Replace `~/.local/bin` with the directory in your `PATH`.

## Usage

After installing the plugin, you can use `kubectl ray --help` to see the available commands and options.

## Example

This example assumes you have a Ray cluster running on Kubernetes. See {ref}`RayCluster Quickstart <kuberay-raycluster-quickstart>` if you don't have a Ray cluster running on Kubernetes.

### Forward local ports to Ray cluster

```shell
$ kubectl ray session ray-cluster-kuberay

Forwarding ports to service ray-cluster-kuberay-head-svc
Ray Dashboard: http://localhost:8265
Ray Interactive Client: http://localhost:10001

Forwarding from 127.0.0.1:8265 -> 8265
Forwarding from [::1]:8265 -> 8265
Forwarding from 127.0.0.1:10001 -> 10001
Forwarding from [::1]:10001 -> 10001
```

### Get Ray cluster logs

```shell
$ kubectl ray logs rayjob-sample-raycluster-kfhl6
No output directory specified, creating dir under current directory using cluster name.
Command set to retrieve both head and worker node logs.
Downloading log for Ray Node rayjob-sample-raycluster-kfhl6-head-87xpb
Downloading log for Ray Node rayjob-sample-raycluster-kfhl6-small-group-worker-54qfm
```
9 changes: 0 additions & 9 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
import pyarrow.dataset as pds
import pyarrow.parquet as pq
import pytest
from pkg_resources import parse_version
from pytest_lazyfixture import lazy_fixture

import ray
from ray._private.utils import _get_pyarrow_version
from ray.air.util.tensor_extensions.arrow import ArrowTensorType, ArrowTensorTypeV2
from ray.data import Schema
from ray.data._internal.datasource.parquet_bulk_datasource import ParquetBulkDatasource
Expand Down Expand Up @@ -571,13 +569,6 @@ def test_parquet_read_partitioned_with_columns(ray_start_regular_shared, fs, dat
]


# Skip this test if pyarrow is below version 7. As the old
# pyarrow does not support single path with partitioning,
# this issue cannot be resolved by Ray data itself.
@pytest.mark.skipif(
parse_version(_get_pyarrow_version()) < parse_version("7.0.0"),
reason="Old pyarrow behavior cannot be fixed.",
)
@pytest.mark.parametrize(
"fs,data_path",
[
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/kuberay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ together run these things for you.
3. Finally, make sure that the `Dockerfile` is using the same python version as
what you're using to run the test. By default, this dockerfile is built using
the `rayproject/ray:nightly-py310` build.
4. Modify `EXAMPLE_CLUSTER_PATH` in `test_autoscaling_e2e.py`.

Now you're ready to run the test.

Expand Down
9 changes: 0 additions & 9 deletions python/ray/tests/kuberay/scripts/scale_up_custom.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import os

import pytest

import ray

import time


Expand All @@ -13,10 +8,6 @@ def main():
Also, validates runtime env data submitted with the Ray Job that executes
this script.
"""
# The next two lines validate the runtime env in which this code runs.
# (See the function ray_job_submit() in tests/kuberay/utils.py)
assert pytest.__version__ == "6.0.0"
assert os.getenv("key_foo") == "value_bar"

# Workers and head are annotated as having 5 "Custom2" capacity each,
# so this should trigger upscaling of two workers.
Expand Down
48 changes: 19 additions & 29 deletions python/ray/tests/kuberay/test_autoscaling_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
get_pod,
get_pod_names,
get_raycluster,
ray_client_port_forward,
ray_job_submit,
switch_to_ray_parent_dir,
kubectl_exec_python_script,
kubectl_logs,
Expand All @@ -27,12 +25,6 @@
wait_for_ray_health,
)

from ray.tests.kuberay.scripts import (
gpu_actor_placement,
gpu_actor_validation,
non_terminated_nodes_count,
)

logger = logging.getLogger(__name__)

# This image will be used for both the Ray nodes and the autoscaler.
Expand Down Expand Up @@ -190,10 +182,6 @@ def _apply_ray_cr(
stderr=sys.stderr,
)

def _non_terminated_nodes_count(self) -> int:
with ray_client_port_forward(head_service=HEAD_SERVICE):
return non_terminated_nodes_count.main()

def testAutoscaling(self):
"""Test the following behaviors:
Expand All @@ -206,11 +194,6 @@ def testAutoscaling(self):
7. Autoscaler logs work. Autoscaler events are piped to the driver.
8. Ray utils show correct resource limits in the head container.
Tests the following modes of interaction with a Ray cluster on K8s:
1. kubectl exec
2. Ray Client
3. Ray Job Submission
TODO (Dmitri): Split up the test logic.
Too much is stuffed into this one test case.
Expand Down Expand Up @@ -303,11 +286,13 @@ def testAutoscaling(self):
)
# 2. Trigger GPU upscaling by requesting placement of a GPU actor.
logger.info("Scheduling an Actor with GPU demands.")
# Use Ray Client to validate that it works against KubeRay.
with ray_client_port_forward( # Interaction mode #2: Ray Client
head_service=HEAD_SERVICE, ray_namespace="gpu-test"
):
gpu_actor_placement.main()
kubectl_exec_python_script(
script_name="gpu_actor_placement.py",
pod=head_pod,
container="ray-head",
namespace="default",
)

# 3. Confirm new pod number and presence of fake GPU worker.
logger.info("Confirming fake GPU worker up-scaling.")
wait_for_pods(goal_num_pods=4, namespace=RAY_CLUSTER_NAMESPACE)
Expand All @@ -321,10 +306,13 @@ def testAutoscaling(self):
# 4. Confirm that the GPU actor is up and that Ray believes
# the node the actor is on has a GPU.
logger.info("Confirming GPU actor placement.")
with ray_client_port_forward(
head_service=HEAD_SERVICE, ray_namespace="gpu-test"
):
out = gpu_actor_validation.main()
out = kubectl_exec_python_script(
script_name="gpu_actor_validation.py",
pod=head_pod,
container="ray-head",
namespace="default",
)

# Confirms the actor was placed on a GPU-annotated node.
# (See gpu_actor_validation.py for details.)
assert "on-a-gpu-node" in out
Expand Down Expand Up @@ -358,11 +346,13 @@ def testAutoscaling(self):
# Submit two {"Custom2": 3} bundles to upscale two workers with 5
# Custom2 capacity each.
logger.info("Scaling up workers with request for custom resources.")
job_logs = ray_job_submit( # Interaction mode #3: Ray Job Submission
out = kubectl_exec_python_script(
script_name="scale_up_custom.py",
head_service=HEAD_SERVICE,
pod=head_pod,
container="ray-head",
namespace="default",
)
assert "Submitted custom scale request!" in job_logs, job_logs
assert "Submitted custom scale request!" in out, out

logger.info("Confirming two workers have scaled up.")
wait_for_pods(goal_num_pods=3, namespace=RAY_CLUSTER_NAMESPACE)
Expand Down
85 changes: 0 additions & 85 deletions python/ray/tests/kuberay/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
import yaml
import os

import ray
from ray.job_submission import JobStatus, JobSubmissionClient


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -428,88 +425,6 @@ def terminate_process():
terminate_process()


@contextlib.contextmanager
def ray_client_port_forward(
head_service: str,
k8s_namespace: str = "default",
ray_namespace: Optional[str] = None,
ray_client_port: int = 10001,
):
"""Context manager which manages a Ray client connection using kubectl port-forward.
Args:
head_service: The name of the Ray head K8s service.
k8s_namespace: K8s namespace the Ray cluster belongs to.
ray_namespace: The Ray namespace to connect to.
ray_client_port: The port on which the Ray head is running the Ray client
server.
"""
with _kubectl_port_forward(
service=head_service, namespace=k8s_namespace, target_port=ray_client_port
) as local_port:
with ray.init(f"ray://127.0.0.1:{local_port}", namespace=ray_namespace):
yield


def ray_job_submit(
script_name: str,
head_service: str,
k8s_namespace: str = "default",
ray_dashboard_port: int = 8265,
) -> str:
"""Submits a Python script via the Ray Job Submission API, using the Python SDK.
Waits for successful completion of the job and returns the job logs as a string.
Uses `kubectl port-forward` to access the Ray head's dashboard port.
Scripts live in `tests/kuberay/scripts`. This directory is used as the working
dir for the job.
Args:
script_name: The name of the script to submit.
head_service: The name of the Ray head K8s service.
k8s_namespace: K8s namespace the Ray cluster belongs to.
ray_dashboard_port: The port on which the Ray head is running the Ray dashboard.
"""
with _kubectl_port_forward(
service=head_service, namespace=k8s_namespace, target_port=ray_dashboard_port
) as local_port:
# It takes a bit of time to establish the connection.
# Try a few times to instantiate the JobSubmissionClient, as the client's
# instantiation does not retry on connection errors.
for trie in range(1, 7):
time.sleep(5)
try:
client = JobSubmissionClient(f"http://127.0.0.1:{local_port}")
except ConnectionError as e:
if trie < 6:
logger.info("Job client connection failed. Retrying in 5 seconds.")
else:
raise e from None
job_id = client.submit_job(
entrypoint=f"python {script_name}",
runtime_env={
"working_dir": SCRIPTS_DIR,
# Throw in some extra data for fun, to validate runtime envs.
"pip": ["pytest==6.0.0"],
"env_vars": {"key_foo": "value_bar"},
},
)
# Wait for the job to complete successfully.
# This logic is copied from the Job Submission docs.
start = time.time()
timeout = 60
while time.time() - start <= timeout:
status = client.get_job_status(job_id)
print(f"status: {status}")
if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
break
time.sleep(5)

assert status == JobStatus.SUCCEEDED
return client.get_job_logs(job_id)


def kubectl_patch(
kind: str,
name: str,
Expand Down

0 comments on commit a5c07ac

Please sign in to comment.