Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[observability][export-api] Env var config to write export events of specific source types #49541

Merged
merged 14 commits into from
Jan 22, 2025
26 changes: 26 additions & 0 deletions python/ray/_private/event/export_event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,29 @@ def get_export_event_logger(
_export_event_logger[source_name] = ExportEventLoggerAdapter(source, logger)

return _export_event_logger[source_name]


def check_export_api_enabled(
source: ExportEvent.SourceType,
) -> bool:
"""
Check RAY_ENABLE_EXPORT_API_WRITE and RAY_ENABLE_EXPORT_API_WRITE_CONFIG environment
variables to verify if export events should be written for the given source type.

Args:
source: The source of the export event.
"""
if ray_constants.RAY_ENABLE_EXPORT_API_WRITE:
return True
source_name = ExportEvent.SourceType.Name(source)
try:
ray_export_api_config_json = json.loads(
ray_constants.RAY_ENABLE_EXPORT_API_WRITE_CONFIG
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to parse this every time? Or just once on ray launch / first time this config is read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updated to move the JSON parsing to where the env var is read

except Exception:
global_logger.exception(
"Error parsing JSON for RAY_enable_export_api_write_config. "
f"No export events of source {source_name} will be written."
)
return False
return source_name in ray_export_api_config_json
14 changes: 14 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,22 @@ def gcs_actor_scheduling_enabled():

RAY_BACKEND_LOG_JSON_ENV_VAR = "RAY_BACKEND_LOG_JSON"

# Write export API event of all resource types to file if enabled.
# RAY_enable_export_api_write_config will not be considered if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should have this flag always be considered. This can be used as a hard shut off if this is not set or set to False.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change wouldn't be backward compatible if RAY_enable_export_api_write_config is always considered, right? I was thinking we can eventually deprecate RAY_enable_export_api_write, which should have the same effect.

# this is enabled.
RAY_ENABLE_EXPORT_API_WRITE = env_bool("RAY_enable_export_api_write", False)

# JSON configuration that lists individual resource types to write
# export API events for. This configuration is only used if
# RAY_enable_export_api_write is not enabled. Full list of valid
# resource types in ExportEvent.SourceType enum in
# src/ray/protobuf/export_api/export_event.proto
# Example config:
# `export RAY_enable_export_api_write_config='["EXPORT_SUBMISSION_JOB"]'`
RAY_ENABLE_EXPORT_API_WRITE_CONFIG = os.environ.get(
"RAY_enable_export_api_write_config", "[]"
)

RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES = env_bool(
"RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES", 100 * 1e6
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import pytest

# RAY_enable_export_api_write env var must be set before importing
# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE
# RAY_enable_export_api_write_config env var must be set before importing
# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE_CONFIG
# even outside a Ray driver.
os.environ["RAY_enable_export_api_write"] = "true"
os.environ["RAY_enable_export_api_write_config"] = '["EXPORT_SUBMISSION_JOB"]'

import ray
from ray._private.gcs_utils import GcsAioClient
Expand All @@ -32,6 +32,44 @@ async def check_job_succeeded(job_manager, job_id):
return status == JobStatus.SUCCEEDED


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
[
{
"env": {
"RAY_enable_export_api_write_config": '["EXPORT_SUBMISSION_JOB"]',
},
"cmd": "ray start --head",
}
],
indirect=True,
)
async def test_check_export_api_enabled(call_ray_start, tmp_path): # noqa: F811
"""
Test check_export_api_enabled is True for EXPORT_SUBMISSION_JOB but
not for EXPORT_ACTOR because RAY_enable_export_api_write_config
is set to '["EXPORT_SUBMISSION_JOB"]'.
"""

@ray.remote
def test_check_export_api_enabled_remote():
from ray._private import ray_constants
from ray._private.event.export_event_logger import check_export_api_enabled
from ray.core.generated.export_event_pb2 import ExportEvent

success = True
success = success and check_export_api_enabled(
ExportEvent.SourceType.EXPORT_SUBMISSION_JOB
)
success = success and (
not check_export_api_enabled(ExportEvent.SourceType.EXPORT_ACTOR)
)
return success

assert ray.get(test_check_export_api_enabled_remote.remote())


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
Expand All @@ -45,6 +83,43 @@ async def check_job_succeeded(job_manager, job_id):
],
indirect=True,
)
async def test_check_export_api_enabled_global(call_ray_start, tmp_path): # noqa: F811
"""
Test check_export_api_enabled always returns True because RAY_enable_export_api_write
is set to True.
"""

@ray.remote
def test_check_export_api_enabled_remote():
from ray._private import ray_constants
from ray._private.event.export_event_logger import check_export_api_enabled
from ray.core.generated.export_event_pb2 import ExportEvent

success = True
success = success and check_export_api_enabled(
ExportEvent.SourceType.EXPORT_SUBMISSION_JOB
)
success = success and check_export_api_enabled(
ExportEvent.SourceType.EXPORT_ACTOR
)
return success

assert ray.get(test_check_export_api_enabled_remote.remote())


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
[
{
"env": {
"RAY_enable_export_api_write_config": '["EXPORT_SUBMISSION_JOB"]',
},
"cmd": "ray start --head",
}
],
indirect=True,
)
async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811
"""
Test submission job events are correctly generated and written to file
Expand Down
7 changes: 5 additions & 2 deletions python/ray/dashboard/modules/job/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from typing import Any, Dict, Optional, Tuple, Union

from ray._private import ray_constants
from ray._private.event.export_event_logger import get_export_event_logger
from ray._private.event.export_event_logger import (
check_export_api_enabled,
get_export_event_logger,
)
from ray._private.gcs_utils import GcsAioClient
from ray._private.runtime_env.packaging import parse_uri
from ray.core.generated.export_event_pb2 import ExportEvent
Expand Down Expand Up @@ -213,7 +216,7 @@ def __init__(
self._export_submission_job_event_logger: logging.Logger = None
try:
if (
ray_constants.RAY_ENABLE_EXPORT_API_WRITE
check_export_api_enabled(ExportEvent.SourceType.EXPORT_SUBMISSION_JOB)
and export_event_log_dir_root is not None
):
self._export_submission_job_event_logger = get_export_event_logger(
Expand Down
12 changes: 11 additions & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -927,5 +927,15 @@ RAY_CONFIG(int, object_manager_client_connection_num, 4)
// Update this to overwrite it.
RAY_CONFIG(int, object_manager_rpc_threads_num, 0)

// Write export API events to file if enabled
// Write export API event of all resource types to file if enabled.
// RAY_enable_export_api_write_config will not be considered if
// this is enabled.
RAY_CONFIG(bool, enable_export_api_write, false)

// JSON configuration that lists individual resource types to write
// export API events for. This configuration is only used if
// RAY_enable_export_api_write is not enabled. Full list of valid
// resource types in ExportEvent.SourceType enum in
// src/ray/protobuf/export_api/export_event.proto
// Example config: `export RAY_enable_export_api_write_config='["EXPORT_ACTOR"]'`
RAY_CONFIG(std::string, enable_export_api_write_config, "[]")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the config will always be a list, I'm wondering if it can be converted to a vector here instead of using std::string type. In this case, the conversion will only be done once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, updated to use std::vector<std::string> and have the config be a comma separated string instead of JSON

5 changes: 1 addition & 4 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@

#include "ray/core_worker/task_event_buffer.h"

#include "ray/gcs/pb_util.h"
#include "ray/util/event.h"

namespace ray {
namespace core {

Expand Down Expand Up @@ -239,7 +236,7 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); }

Status TaskEventBufferImpl::Start(bool auto_flush) {
absl::MutexLock lock(&mutex_);
export_event_write_enabled_ = RayConfig::instance().enable_export_api_write();
export_event_write_enabled_ = TaskEventBufferImpl::IsExportAPIEnabledTask();
auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms();
RAY_CHECK(report_interval_ms > 0)
<< "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer.";
Expand Down
10 changes: 10 additions & 0 deletions src/ray/core_worker/task_event_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#include "ray/common/id.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/gcs/pb_util.h"
#include "ray/util/counter_map.h"
#include "ray/util/event.h"
#include "src/ray/protobuf/export_api/export_task_event.pb.h"
#include "src/ray/protobuf/gcs.pb.h"

Expand Down Expand Up @@ -378,6 +380,14 @@ class TaskEventBufferImpl : public TaskEventBuffer {
std::vector<std::shared_ptr<TaskEvent>> &&status_events_to_write_for_export,
std::vector<std::shared_ptr<TaskEvent>> &&profile_events_to_send);

// Verify if export events should be written for EXPORT_TASK source types
bool IsExportAPIEnabledTask() const {
return IsExportAPIEnabledSourceType(
"EXPORT_TASK",
::RayConfig::instance().enable_export_api_write(),
::RayConfig::instance().enable_export_api_write_config());
}

/// Reset the counters during flushing data to GCS.
void ResetCountersForFlush();

Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const {
rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; }

void GcsActor::WriteActorExportEvent() const {
/// Write actor_table_data_ as a export actor event if
/// enable_export_api_write() is enabled.
if (!RayConfig::instance().enable_export_api_write()) {
/// Verify actor export events should be written to file
/// and then write actor_table_data_ as an export event.
if (!GcsActor::IsExportAPIEnabledActor()) {
return;
}
std::shared_ptr<rpc::ExportActorData> export_actor_data_ptr =
Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ class GcsActor {
/// Write an event containing this actor's ActorTableData
/// to file for the Export API.
void WriteActorExportEvent() const;
// Verify if export events should be written for EXPORT_ACTOR source types
bool IsExportAPIEnabledActor() const {
return IsExportAPIEnabledSourceType(
"EXPORT_ACTOR",
RayConfig::instance().enable_export_api_write(),
RayConfig::instance().enable_export_api_write_config());
}

const ResourceRequest &GetAcquiredResources() const;
void SetAcquiredResources(ResourceRequest &&resource_request);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void GcsJobManager::WriteDriverJobExportEvent(rpc::JobTableData job_data) const
/// Write job_data as a export driver job event if
/// enable_export_api_write() is enabled and if this job is
/// not in the _ray_internal_ namespace.
if (!RayConfig::instance().enable_export_api_write()) {
if (!GcsJobManager::IsExportAPIEnabledDriverJob()) {
return;
}
if (job_data.config().ray_namespace().find(kRayInternalNamespacePrefix) == 0) {
Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ class GcsJobManager : public rpc::JobInfoHandler {

void WriteDriverJobExportEvent(rpc::JobTableData job_data) const;

// Verify if export events should be written for EXPORT_DRIVER_JOB source types
bool IsExportAPIEnabledDriverJob() const {
return IsExportAPIEnabledSourceType(
"EXPORT_DRIVER_JOB",
RayConfig::instance().enable_export_api_write(),
RayConfig::instance().enable_export_api_write_config());
}

/// Record metrics.
/// For job manager, (1) running jobs count gauge and (2) new finished jobs (whether
/// succeed or fail) will be reported periodically.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ GcsNodeManager::GcsNodeManager(GcsPublisher *gcs_publisher,
void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const {
/// Write node_info as a export node event if
/// enable_export_api_write() is enabled.
if (!RayConfig::instance().enable_export_api_write()) {
if (!GcsNodeManager::IsExportAPIEnabledNode()) {
return;
}
std::shared_ptr<rpc::ExportNodeData> export_node_data_ptr =
Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ class GcsNodeManager : public rpc::NodeInfoHandler {

void WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const;

// Verify if export events should be written for EXPORT_NODE source types
bool IsExportAPIEnabledNode() const {
return IsExportAPIEnabledSourceType(
"EXPORT_NODE",
RayConfig::instance().enable_export_api_write(),
RayConfig::instance().enable_export_api_write_config());
}

rpc::ExportNodeData::GcsNodeState ConvertGCSNodeStateToExport(
rpc::GcsNodeInfo::GcsNodeState node_state) const {
switch (node_state) {
Expand Down
23 changes: 23 additions & 0 deletions src/ray/util/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,4 +509,27 @@ void RayEventInit(const std::vector<SourceTypeVariant> source_types,
});
}

bool IsExportAPIEnabledSourceType(std::string source_type,
Copy link
Contributor

@alanwguo alanwguo Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this called once per event or per source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be once per source now

bool enable_export_api_write_global,
std::string enable_export_api_write_config_str) {
if (enable_export_api_write_global) {
return true;
}
nlohmann::json enable_export_api_write_config = {};
try {
enable_export_api_write_config =
nlohmann::json::parse(enable_export_api_write_config_str);
} catch (const nlohmann::json::parse_error &e) {
RAY_LOG(ERROR) << "Error parsing JSON for RAY_enable_export_api_write_config: "
<< e.what();
return false;
}
for (const auto &element : enable_export_api_write_config) {
if (element == source_type) {
return true;
}
}
return false;
}

} // namespace ray
4 changes: 4 additions & 0 deletions src/ray/util/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ class RayExportEvent {
ExportEventDataPtr event_data_ptr_;
};

bool IsExportAPIEnabledSourceType(std::string source_type,
bool enable_export_api_write_global,
std::string enable_export_api_write_config_str);

/// Ray Event initialization.
///
/// This function should be called when the main thread starts.
Expand Down
16 changes: 16 additions & 0 deletions src/ray/util/tests/event_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,22 @@ TEST_F(EventTest, TestExportEvent) {
EXPECT_EQ(raylet_event_as_json["message"].get<std::string>(), "test warning");
}

TEST_F(EventTest, TestIsExportAPIEnabledSourceType) {
EXPECT_EQ(IsExportAPIEnabledSourceType(
"EXPORT_TASK", false, "[\"EXPORT_TASK\", \"EXPORT_ACTOR\"]"),
true);
EXPECT_EQ(IsExportAPIEnabledSourceType(
"EXPORT_TASK", true, "[\"EXPORT_TASK\", \"EXPORT_ACTOR\"]"),
true);
EXPECT_EQ(IsExportAPIEnabledSourceType("EXPORT_TASK", false, "[\"EXPORT_ACTOR\"]"),
false);
EXPECT_EQ(IsExportAPIEnabledSourceType("EXPORT_TASK", true, "[\"EXPORT_ACTOR\"]"),
true);

// Invalid JSON
EXPECT_EQ(IsExportAPIEnabledSourceType("EXPORT_TASK", false, "EXPORT_TASK"), false);
}

TEST_F(EventTest, TestRayCheckAbort) {
auto custom_fields = absl::flat_hash_map<std::string, std::string>();
custom_fields.emplace("node_id", "node 1");
Expand Down
Loading