Skip to content

Commit

Permalink
use boost iostream for cross-platform
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Jan 23, 2025
1 parent 9f7ded8 commit e72abed
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 243 deletions.
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ ray_cc_library(
":stream_redirection_options",
":thread_utils",
":util",
"@boost//:iostreams",
"@com_github_spdlog//:spdlog",
"@com_google_absl//absl/strings",
],
Expand Down
259 changes: 119 additions & 140 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,119 +33,50 @@ namespace ray {

namespace {

// Default pipe log read buffer size.
constexpr size_t kDefaultPipeLogReadBufSize = 1024;

size_t GetPipeLogReadSizeOrDefault() {
// TODO(hjiang): Write a util function `GetEnvOrDefault`.
const char *var_value = std::getenv(kPipeLogReadBufSizeEnv.data());
if (var_value != nullptr) {
size_t read_buf_size = 0;
if (absl::SimpleAtoi(var_value, &read_buf_size) && read_buf_size > 0) {
return read_buf_size;
}
}
return kDefaultPipeLogReadBufSize;
}

struct StreamDumper {
absl::Mutex mu;
bool stopped ABSL_GUARDED_BY(mu) = false;
std::deque<std::string> content ABSL_GUARDED_BY(mu);
};

// File descriptors which indicates standard stream.
#if defined(__APPLE__) || defined(__linux__)
struct StdStreamFd {
int stdout_fd = STDOUT_FILENO;
int stderr_fd = STDERR_FILENO;
};
#elif defined(_WIN32)
// TODO(hjiang): not used for windows, implement later.
// Used to write to dup-ed stdout and stderr; use shared pointer to make it copy
// constructible.
struct StdStreamFd {
int stdout_fd = -1;
int stderr_fd = -1;
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stdout_sink;
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stderr_sink;
};
#endif

// Read bytes from handle into [data], return number of bytes read.
// If read fails, throw exception.
#if defined(__APPLE__) || defined(__linux__)
size_t Read(int read_fd, char *data, size_t len) {
// TODO(hjiang): Notice frequent read could cause performance issue.
ssize_t bytes_read = read(read_fd, data, len);
// TODO(hjiang): Add macros which checks for syscalls.
RAY_CHECK(bytes_read != -1) << "Fails to read from pipe because " << strerror(errno);
return bytes_read;
}
#endif

template <typename ReadFunc, typename WriteFunc, typename FlushFunc>
void StartStreamDump(ReadFunc read_func,
WriteFunc write_func,
FlushFunc flush_func,
std::function<void()> close_read_handle,
std::function<void()> on_close_completion) {
template <typename WriteFunc, typename FlushFunc>
void StartStreamDump(
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_source>>
pipe_instream,
WriteFunc write_func,
FlushFunc flush_func,
std::function<void()> on_close_completion) {
auto stream_dumper = std::make_shared<StreamDumper>();

// Create two threads, so there's no IO operation within critical section thus no
// blocking on write.
std::thread([read_func = std::move(read_func),
close_read_handle = std::move(close_read_handle),
std::thread([pipe_instream = std::move(pipe_instream),
stream_dumper = stream_dumper]() {
SetThreadName("PipeReaderThd");

const size_t buf_size = GetPipeLogReadSizeOrDefault();
// TODO(hjiang): Should resize without initialization.
std::string content(buf_size, '\0');
// Logging are written in lines, `last_line` records part of the strings left in
// last `read` syscall.
std::string last_line;

while (true) {
size_t bytes_read = read_func(content.data(), content.length());

// Bytes read of size 0 indicates write-side of pipe has been closed.
if (bytes_read == 0) {
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
if (!last_line.empty()) {
stream_dumper->content.emplace_back(std::move(last_line));
}
}

// Place IO operation out of critical section.
close_read_handle();

return;
std::string newline;
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
}

std::string_view cur_content{content.data(), bytes_read};
std::vector<std::string_view> newlines = absl::StrSplit(cur_content, '\n');

for (size_t idx = 0; idx < newlines.size() - 1; ++idx) {
std::string cur_new_line = std::move(last_line);
cur_new_line += newlines[idx];
last_line.clear();

// Backfill newliner for current segment.
cur_new_line += '\n';
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(cur_new_line));
}
}

// Special handle the last segment we've read.
//
// Nothing to do if we've read a complete newline.
if (content.back() == '\n') {
continue;
}
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
}

// Otherwise record the newline so we could reuse in the next read iteration.
last_line += newlines.back();
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}
}).detach();

Expand Down Expand Up @@ -218,22 +149,18 @@ bool ShouldUsePipeStream(const StreamRedirectionOption &stream_redirect_opt) {

#if defined(__APPLE__) || defined(__linux__)
RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {
int fd = open(file_path.data(), O_WRONLY | O_CREAT, 0644);
RAY_CHECK_NE(fd, -1) << "Fails to open file " << file_path << " with failure reason "
<< strerror(errno);

auto flush_fn = [fd]() {
RAY_CHECK_EQ(fsync(fd), 0) << "Fails to flush data to disk because "
<< strerror(errno);
};
auto close_fn = [fd]() {
RAY_CHECK_EQ(fsync(fd), 0) << "Fails to flush data to disk because "
<< strerror(errno);
RAY_CHECK_EQ(close(fd), 0) << "Fails to close redirection file because "
<< strerror(errno);
boost::iostreams::file_descriptor_sink sink{file_path, std::ios_base::out};
auto handle = sink.handle();
auto ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
auto flush_fn = [ostream]() { ostream->flush(); };
auto close_fn = [ostream]() {
ostream->flush();
ostream->close();
};

return RedirectionFileHandle{fd, std::move(flush_fn), std::move(close_fn)};
return RedirectionFileHandle{
handle, std::move(ostream), std::move(flush_fn), std::move(close_fn)};
}
#elif defined(_WIN32)
#include <windows.h>
Expand Down Expand Up @@ -266,7 +193,6 @@ RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {

} // namespace

#if defined(__APPLE__) || defined(__linux__)
RedirectionFileHandle CreateRedirectionFileHandle(
const StreamRedirectionOption &stream_redirect_opt) {
// Case-1: only redirection, but not rotation and tee involved.
Expand All @@ -285,29 +211,92 @@ RedirectionFileHandle CreateRedirectionFileHandle(
auto on_close_completion = [promise = promise]() { promise->set_value(); };

StdStreamFd std_stream_fd{};

#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
std_stream_fd.stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(std_stream_fd.stdout_fd, -1)
<< "Fails to duplicate stdout: " << strerror(errno);
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stdout_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_stream_fd.stdout_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (stream_redirect_opt.tee_to_stderr) {
std_stream_fd.stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(std_stream_fd.stderr_fd, -1)
<< "Fails to duplicate stderr: " << strerror(errno);
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stderr_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_stream_fd.stderr_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

// TODO(hjiang): Use `boost::iostreams` to represent pipe write fd, which supports
// cross-platform and line-wise read.
int pipefd[2] = {0};
// TODO(hjiang): We shoud have our own syscall macro.
RAY_CHECK_EQ(pipe(pipefd), 0);
int read_fd = pipefd[0];
int write_fd = pipefd[1];
boost::iostreams::file_descriptor_source pipe_read_source{
read_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};

#elif defined(_WIN32)
if (tream_redirect_opt.tee_to_stdout) {
int duped_stderr_fd = -1;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stderr_fd,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";

boost::iostreams::file_descriptor_sink sink{duped_stderr_fd, std::ios_base::out};
std_stream_fd.stdout = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (tream_redirect_opt.tee_to_stderr) {
int duped_stderr_fd = -1;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_fd,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";

boost::iostreams::file_descriptor_sink sink{duped_stderr_fd, std::ios_base::out};
std_stream_fd.stderr_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

HANDLE read_pipe = nullptr;
HANDLE write_pipe = nullptr;
SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), nullptr, TRUE};
RAY_CHECK(CreatePipe(&read_pipe, &write_pipe, &sa, 0)) << "Fails to create pipe";
boost::iostreams::file_descriptor_source pipe_read_source{read_pipe, std::ios_base::in};
boost::iostreams::file_descriptor_sink pipe_write_sink{write_pipe, std::ios_base::out};

auto read_func = [read_fd](char *data, size_t len) { return Read(read_fd, data, len); };
auto close_read_handle = [read_fd]() { RAY_CHECK_EQ(close(read_fd), 0); };
auto close_fn = [write_fd, promise]() {
RAY_CHECK_EQ(close(write_fd), 0);
#endif

auto pipe_instream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_source>>(
std::move(pipe_read_source));
auto pipe_ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(pipe_write_sink));

auto close_fn = [pipe_ostream, promise]() mutable {
pipe_ostream->flush();
pipe_ostream->close();
// Block until destruction finishes.
promise->get_future().get();
};
Expand All @@ -320,12 +309,10 @@ RedirectionFileHandle CreateRedirectionFileHandle(
stream_redirect_opt = stream_redirect_opt,
std_stream_fd = std_stream_fd](std::string content) {
if (stream_redirect_opt.tee_to_stdout) {
RAY_CHECK_EQ(write(std_stream_fd.stdout_fd, content.data(), content.length()),
static_cast<ssize_t>(content.length()));
std_stream_fd.stdout_sink->write(content.data(), content.length());
}
if (stream_redirect_opt.tee_to_stderr) {
RAY_CHECK_EQ(write(std_stream_fd.stderr_fd, content.data(), content.length()),
static_cast<ssize_t>(content.length()));
std_stream_fd.stderr_sink->write(content.data(), content.length());
}
if (logger != nullptr) {
// spdlog adds newliner for every content, no need to maintan the application-passed
Expand All @@ -343,30 +330,22 @@ RedirectionFileHandle CreateRedirectionFileHandle(
logger->flush();
}
if (stream_redirect_opt.tee_to_stdout) {
fsync(std_stream_fd.stdout_fd);
std_stream_fd.stdout_sink->flush();
}
if (stream_redirect_opt.tee_to_stderr) {
std_stream_fd.stderr_sink->flush();
}
// No need to sync for stderr since it's unbuffered.
};

StartStreamDump(std::move(read_func),
StartStreamDump(std::move(pipe_instream),
std::move(write_fn),
flush_fn,
std::move(close_read_handle),
std::move(on_close_completion));

RedirectionFileHandle redirection_file_handle{
write_fd, std::move(flush_fn), std::move(close_fn)};
write_fd, std::move(pipe_ostream), std::move(flush_fn), std::move(close_fn)};

return redirection_file_handle;
}

#elif defined(_WIN32)
RedirectionFileHandle CreateRedirectionFileHandle(
const StreamRedirectionOption &stream_redirect_opt) {
// TODO(hjiang): For windows, we currently doesn't support redirection with rotation and
// tee to stdout/stderr.
return OpenFileForRedirection(stream_redirect_opt.file_path);
}
#endif

} // namespace ray
Loading

0 comments on commit e72abed

Please sign in to comment.