Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Use boost iostream in pipe logger for cross-platform #50044

Merged
merged 31 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9f7ded8
fix test comment
dentiny Jan 23, 2025
8bdd94a
use boost iostream for cross-platform
dentiny Jan 23, 2025
ad89e4e
remove unnecessary cross-platform
dentiny Jan 24, 2025
dfc4ddc
fix windows build
dentiny Jan 24, 2025
df8c7ce
rename
dentiny Jan 24, 2025
94db87c
another rename
dentiny Jan 24, 2025
4fb9002
doc
dentiny Jan 24, 2025
025bad1
fix naming
dentiny Jan 25, 2025
5d74185
fix data flush
dentiny Jan 25, 2025
e6c6ce1
Merge branch 'master' into hjiang/boost-windows-jan-23
dentiny Jan 25, 2025
48ce959
no need to flush
dentiny Jan 25, 2025
64ff960
disable tsan
dentiny Jan 25, 2025
7f661f4
add check
dentiny Jan 27, 2025
342e401
Add another check
dentiny Jan 27, 2025
860120f
early exit for eof state
dentiny Jan 27, 2025
2847484
remove no tsan
dentiny Jan 27, 2025
f79043b
remove unreachable EOF branch
dentiny Jan 28, 2025
e624686
disable TSAN
dentiny Jan 28, 2025
3a78143
windows test
dentiny Jan 29, 2025
9aa55db
more TSAN comment
dentiny Jan 29, 2025
6468e62
fix windows
dentiny Jan 29, 2025
39b512a
remove
dentiny Jan 29, 2025
2316eab
unset logger to destruct
dentiny Jan 30, 2025
d72927b
Merge branch 'hjiang/boost-windows-jan-23' of github.com:dentiny/ray …
dentiny Jan 30, 2025
7d410fd
todo
dentiny Jan 30, 2025
ccd438f
Merge branch 'master' into hjiang/boost-windows-jan-23
dentiny Jan 31, 2025
d1c3c00
exit hook to cleanup
dentiny Jan 31, 2025
f359d4c
hook
dentiny Jan 31, 2025
39b3498
Merge branch 'hjiang/boost-windows-jan-23' of github.com:dentiny/ray …
dentiny Jan 31, 2025
cb3d01d
cleanup
dentiny Jan 31, 2025
740a886
Merge branch 'master' into hjiang/boost-windows-jan-23
dentiny Feb 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ ray_cc_library(
":stream_redirection_options",
":thread_utils",
":util",
"@boost//:iostreams",
"@com_github_spdlog//:spdlog",
"@com_google_absl//absl/strings",
],
Expand Down
263 changes: 121 additions & 142 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,119 +33,50 @@ namespace ray {

namespace {

// Default pipe log read buffer size.
constexpr size_t kDefaultPipeLogReadBufSize = 1024;

size_t GetPipeLogReadSizeOrDefault() {
// TODO(hjiang): Write a util function `GetEnvOrDefault`.
const char *var_value = std::getenv(kPipeLogReadBufSizeEnv.data());
if (var_value != nullptr) {
size_t read_buf_size = 0;
if (absl::SimpleAtoi(var_value, &read_buf_size) && read_buf_size > 0) {
return read_buf_size;
}
}
return kDefaultPipeLogReadBufSize;
}

struct StreamDumper {
absl::Mutex mu;
bool stopped ABSL_GUARDED_BY(mu) = false;
std::deque<std::string> content ABSL_GUARDED_BY(mu);
};

// File descriptors which indicates standard stream.
#if defined(__APPLE__) || defined(__linux__)
struct StdStreamFd {
int stdout_fd = STDOUT_FILENO;
int stderr_fd = STDERR_FILENO;
};
#elif defined(_WIN32)
// TODO(hjiang): not used for windows, implement later.
struct StdStreamFd {
int stdout_fd = -1;
int stderr_fd = -1;
// Used to write to dup-ed stdout and stderr; use shared pointer to make it copy
// constructible.
struct StreamSink {
dentiny marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stdout_sink;
dentiny marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stderr_sink;
dentiny marked this conversation as resolved.
Show resolved Hide resolved
};
#endif

// Read bytes from handle into [data], return number of bytes read.
// If read fails, throw exception.
#if defined(__APPLE__) || defined(__linux__)
size_t Read(int read_fd, char *data, size_t len) {
// TODO(hjiang): Notice frequent read could cause performance issue.
ssize_t bytes_read = read(read_fd, data, len);
// TODO(hjiang): Add macros which checks for syscalls.
RAY_CHECK(bytes_read != -1) << "Fails to read from pipe because " << strerror(errno);
return bytes_read;
}
#endif

template <typename ReadFunc, typename WriteFunc, typename FlushFunc>
void StartStreamDump(ReadFunc read_func,
WriteFunc write_func,
FlushFunc flush_func,
std::function<void()> close_read_handle,
std::function<void()> on_close_completion) {
template <typename WriteFunc, typename FlushFunc>
void StartStreamDump(
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_source>>
pipe_instream,
WriteFunc write_func,
FlushFunc flush_func,
std::function<void()> on_close_completion) {
auto stream_dumper = std::make_shared<StreamDumper>();

// Create two threads, so there's no IO operation within critical section thus no
// blocking on write.
std::thread([read_func = std::move(read_func),
close_read_handle = std::move(close_read_handle),
std::thread([pipe_instream = std::move(pipe_instream),
stream_dumper = stream_dumper]() {
SetThreadName("PipeReaderThd");

const size_t buf_size = GetPipeLogReadSizeOrDefault();
// TODO(hjiang): Should resize without initialization.
std::string content(buf_size, '\0');
// Logging are written in lines, `last_line` records part of the strings left in
// last `read` syscall.
std::string last_line;

while (true) {
size_t bytes_read = read_func(content.data(), content.length());

// Bytes read of size 0 indicates write-side of pipe has been closed.
if (bytes_read == 0) {
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
if (!last_line.empty()) {
stream_dumper->content.emplace_back(std::move(last_line));
}
}

// Place IO operation out of critical section.
close_read_handle();

return;
std::string newline;
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
}

std::string_view cur_content{content.data(), bytes_read};
std::vector<std::string_view> newlines = absl::StrSplit(cur_content, '\n');

for (size_t idx = 0; idx < newlines.size() - 1; ++idx) {
std::string cur_new_line = std::move(last_line);
cur_new_line += newlines[idx];
last_line.clear();

// Backfill newliner for current segment.
cur_new_line += '\n';
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(cur_new_line));
}
}

// Special handle the last segment we've read.
//
// Nothing to do if we've read a complete newline.
if (content.back() == '\n') {
continue;
}
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
}

// Otherwise record the newline so we could reuse in the next read iteration.
last_line += newlines.back();
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}
}).detach();

Expand Down Expand Up @@ -218,22 +149,18 @@ bool ShouldUsePipeStream(const StreamRedirectionOption &stream_redirect_opt) {

#if defined(__APPLE__) || defined(__linux__)
RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {
int fd = open(file_path.data(), O_WRONLY | O_CREAT, 0644);
RAY_CHECK_NE(fd, -1) << "Fails to open file " << file_path << " with failure reason "
<< strerror(errno);

auto flush_fn = [fd]() {
RAY_CHECK_EQ(fsync(fd), 0) << "Fails to flush data to disk because "
<< strerror(errno);
};
auto close_fn = [fd]() {
RAY_CHECK_EQ(fsync(fd), 0) << "Fails to flush data to disk because "
<< strerror(errno);
RAY_CHECK_EQ(close(fd), 0) << "Fails to close redirection file because "
<< strerror(errno);
boost::iostreams::file_descriptor_sink sink{file_path, std::ios_base::out};
auto handle = sink.handle();
auto ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
auto flush_fn = [ostream]() { ostream->flush(); };
dentiny marked this conversation as resolved.
Show resolved Hide resolved
auto close_fn = [ostream]() {
ostream->flush();
ostream->close();
};

return RedirectionFileHandle{fd, std::move(flush_fn), std::move(close_fn)};
return RedirectionFileHandle{
handle, std::move(ostream), std::move(flush_fn), std::move(close_fn)};
dentiny marked this conversation as resolved.
Show resolved Hide resolved
}
#elif defined(_WIN32)
#include <windows.h>
Expand Down Expand Up @@ -266,7 +193,6 @@ RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {

} // namespace

#if defined(__APPLE__) || defined(__linux__)
RedirectionFileHandle CreateRedirectionFileHandle(
const StreamRedirectionOption &stream_redirect_opt) {
// Case-1: only redirection, but not rotation and tee involved.
Expand All @@ -284,30 +210,93 @@ RedirectionFileHandle CreateRedirectionFileHandle(
// Invoked after flush and close finished.
auto on_close_completion = [promise = promise]() { promise->set_value(); };

StdStreamFd std_stream_fd{};
StreamSink std_stream_fd{};
dentiny marked this conversation as resolved.
Show resolved Hide resolved

#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
std_stream_fd.stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(std_stream_fd.stdout_fd, -1)
<< "Fails to duplicate stdout: " << strerror(errno);
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stdout_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_stream_fd.stdout_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (stream_redirect_opt.tee_to_stderr) {
std_stream_fd.stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(std_stream_fd.stderr_fd, -1)
<< "Fails to duplicate stderr: " << strerror(errno);
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stderr_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_stream_fd.stderr_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

// TODO(hjiang): Use `boost::iostreams` to represent pipe write fd, which supports
// cross-platform and line-wise read.
int pipefd[2] = {0};
// TODO(hjiang): We shoud have our own syscall macro.
RAY_CHECK_EQ(pipe(pipefd), 0);
int read_fd = pipefd[0];
int write_fd = pipefd[1];
boost::iostreams::file_descriptor_source pipe_read_source{
read_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};

#elif defined(_WIN32)
if (tream_redirect_opt.tee_to_stdout) {
int duped_stderr_fd = -1;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stderr_fd,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";

boost::iostreams::file_descriptor_sink sink{duped_stderr_fd, std::ios_base::out};
std_stream_fd.stdout = std::make_shared<
dentiny marked this conversation as resolved.
Show resolved Hide resolved
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (tream_redirect_opt.tee_to_stderr) {
int duped_stderr_fd = -1;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_fd,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";

boost::iostreams::file_descriptor_sink sink{duped_stderr_fd, std::ios_base::out};
std_stream_fd.stderr_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

HANDLE read_pipe = nullptr;
HANDLE write_pipe = nullptr;
SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), nullptr, TRUE};
RAY_CHECK(CreatePipe(&read_pipe, &write_pipe, &sa, 0)) << "Fails to create pipe";
boost::iostreams::file_descriptor_source pipe_read_source{read_pipe, std::ios_base::in};
boost::iostreams::file_descriptor_sink pipe_write_sink{write_pipe, std::ios_base::out};

auto read_func = [read_fd](char *data, size_t len) { return Read(read_fd, data, len); };
auto close_read_handle = [read_fd]() { RAY_CHECK_EQ(close(read_fd), 0); };
auto close_fn = [write_fd, promise]() {
RAY_CHECK_EQ(close(write_fd), 0);
#endif

auto pipe_instream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_source>>(
std::move(pipe_read_source));
auto pipe_ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(pipe_write_sink));

auto close_fn = [pipe_ostream, promise]() mutable {
pipe_ostream->flush();
pipe_ostream->close();
// Block until destruction finishes.
promise->get_future().get();
};
Expand All @@ -320,12 +309,10 @@ RedirectionFileHandle CreateRedirectionFileHandle(
stream_redirect_opt = stream_redirect_opt,
std_stream_fd = std_stream_fd](std::string content) {
if (stream_redirect_opt.tee_to_stdout) {
RAY_CHECK_EQ(write(std_stream_fd.stdout_fd, content.data(), content.length()),
static_cast<ssize_t>(content.length()));
std_stream_fd.stdout_sink->write(content.data(), content.length());
}
if (stream_redirect_opt.tee_to_stderr) {
RAY_CHECK_EQ(write(std_stream_fd.stderr_fd, content.data(), content.length()),
static_cast<ssize_t>(content.length()));
std_stream_fd.stderr_sink->write(content.data(), content.length());
}
if (logger != nullptr) {
// spdlog adds newliner for every content, no need to maintan the application-passed
Expand All @@ -343,30 +330,22 @@ RedirectionFileHandle CreateRedirectionFileHandle(
logger->flush();
}
if (stream_redirect_opt.tee_to_stdout) {
fsync(std_stream_fd.stdout_fd);
std_stream_fd.stdout_sink->flush();
}
if (stream_redirect_opt.tee_to_stderr) {
std_stream_fd.stderr_sink->flush();
}
// No need to sync for stderr since it's unbuffered.
};

StartStreamDump(std::move(read_func),
StartStreamDump(std::move(pipe_instream),
std::move(write_fn),
flush_fn,
std::move(close_read_handle),
std::move(on_close_completion));

RedirectionFileHandle redirection_file_handle{
write_fd, std::move(flush_fn), std::move(close_fn)};
write_fd, std::move(pipe_ostream), std::move(flush_fn), std::move(close_fn)};

return redirection_file_handle;
}

#elif defined(_WIN32)
RedirectionFileHandle CreateRedirectionFileHandle(
const StreamRedirectionOption &stream_redirect_opt) {
// TODO(hjiang): For windows, we currently doesn't support redirection with rotation and
// tee to stdout/stderr.
return OpenFileForRedirection(stream_redirect_opt.file_path);
}
#endif

} // namespace ray
Loading
Loading