Skip to content

Commit

Permalink
[xla:cpu] Delete unused timeslice parameter from parallel loop runner
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 721953216
  • Loading branch information
ezhulenev authored and Google-ML-Automation committed Feb 1, 2025
1 parent 8e7e9dc commit c1ef7cc
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 48 deletions.
10 changes: 2 additions & 8 deletions xla/backends/cpu/runtime/parallel_loop_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ limitations under the License.
#include <cstdint>
#include <functional>
#include <limits>
#include <optional>
#include <utility>

#include "absl/base/attributes.h"
#include "absl/base/optimization.h"
#include "absl/log/check.h"
#include "absl/time/time.h"
#include "xla/backends/cpu/runtime/work_queue.h"
#include "xla/tsl/concurrency/async_value_ref.h"
#include "xla/tsl/concurrency/chain.h"
Expand All @@ -54,12 +52,8 @@ static tsl::AsyncValueRef<tsl::Chain> OkDoneEventSingleton() {
return singleton->AsRef();
}

ParallelLoopRunner::ParallelLoopRunner(
const Eigen::ThreadPoolDevice* device,
std::optional<absl::Duration> worker_timeslice)
: done_event_(OkDoneEventSingleton()),
device_(device),
worker_timeslice_(worker_timeslice) {}
ParallelLoopRunner::ParallelLoopRunner(const Eigen::ThreadPoolDevice* device)
: done_event_(OkDoneEventSingleton()), device_(device) {}

tsl::AsyncValueRef<tsl::Chain> ParallelLoopRunner::ResetDoneEvent() {
auto done_event = std::move(done_event_);
Expand Down
9 changes: 1 addition & 8 deletions xla/backends/cpu/runtime/parallel_loop_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ limitations under the License.
#include <functional>
#include <optional>

#include "absl/time/time.h"
#include "xla/tsl/concurrency/async_value_ref.h"
#include "xla/tsl/concurrency/chain.h"

Expand Down Expand Up @@ -59,9 +58,7 @@ namespace xla::cpu {
// synchronized by the user.
class ParallelLoopRunner {
public:
explicit ParallelLoopRunner(
const Eigen::ThreadPoolDevice* device,
std::optional<absl::Duration> worker_timeslice = std::nullopt);
explicit ParallelLoopRunner(const Eigen::ThreadPoolDevice* device);

// Takes ownership of the runner and returns a done event. After the done
// event is transferred to the caller, it is illegal to schedule more parallel
Expand Down Expand Up @@ -150,10 +147,6 @@ class ParallelLoopRunner {
// pools for different NUMA nodes, and we have to be able to switch between
// them from run to run.
std::atomic<const Eigen::ThreadPoolDevice*> device_;

// The approximate amount of compute (in terms of wall time) that each
// persistent worker should handle.
std::optional<absl::Duration> worker_timeslice_;
};

} // namespace xla::cpu
Expand Down
40 changes: 11 additions & 29 deletions xla/backends/cpu/runtime/parallel_loop_runner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ limitations under the License.

#include <cstddef>
#include <cstdint>
#include <optional>
#include <utility>

#include "absl/algorithm/container.h"
#include "absl/cleanup/cleanup.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "xla/tsl/concurrency/async_value_ref.h"
#include "xla/tsl/platform/env.h"
Expand All @@ -36,14 +34,11 @@ limitations under the License.
namespace xla::cpu {
namespace {

class ParallelLoopRunnerTest
: public testing::TestWithParam<std::optional<absl::Duration>> {};

TEST_P(ParallelLoopRunnerTest, Parallelize1D) {
TEST(ParallelLoopRunnerTest, Parallelize1D) {
tsl::thread::ThreadPool threads(tsl::Env::Default(), "test", 8);
Eigen::ThreadPoolDevice device(threads.AsEigenThreadPool(),
threads.NumThreads());
ParallelLoopRunner runner(&device, GetParam());
ParallelLoopRunner runner(&device);

constexpr int32_t d0 = 128;

Expand All @@ -63,11 +58,11 @@ TEST_P(ParallelLoopRunnerTest, Parallelize1D) {
[](int32_t value) { return value == 5; }));
}

TEST_P(ParallelLoopRunnerTest, Parallelize1DTile1D) {
TEST(ParallelLoopRunnerTest, Parallelize1DTile1D) {
tsl::thread::ThreadPool threads(tsl::Env::Default(), "test", 8);
Eigen::ThreadPoolDevice device(threads.AsEigenThreadPool(),
threads.NumThreads());
ParallelLoopRunner runner(&device, GetParam());
ParallelLoopRunner runner(&device);

constexpr int32_t d0 = 128;

Expand All @@ -91,11 +86,11 @@ TEST_P(ParallelLoopRunnerTest, Parallelize1DTile1D) {
[](int32_t value) { return value == 5; }));
}

TEST_P(ParallelLoopRunnerTest, Parallelize2DTile1D) {
TEST(ParallelLoopRunnerTest, Parallelize2DTile1D) {
tsl::thread::ThreadPool threads(tsl::Env::Default(), "test", 8);
Eigen::ThreadPoolDevice device(threads.AsEigenThreadPool(),
threads.NumThreads());
ParallelLoopRunner runner(&device, GetParam());
ParallelLoopRunner runner(&device);

constexpr int32_t d0 = 4;
constexpr int32_t d1 = 39;
Expand All @@ -120,11 +115,11 @@ TEST_P(ParallelLoopRunnerTest, Parallelize2DTile1D) {
[](int32_t value) { return value == 5; }));
}

TEST_P(ParallelLoopRunnerTest, Parallelize3DTile2D) {
TEST(ParallelLoopRunnerTest, Parallelize3DTile2D) {
tsl::thread::ThreadPool threads(tsl::Env::Default(), "test", 8);
Eigen::ThreadPoolDevice device(threads.AsEigenThreadPool(),
threads.NumThreads());
ParallelLoopRunner runner(&device, GetParam());
ParallelLoopRunner runner(&device);

constexpr int32_t d0 = 4;
constexpr int32_t d1 = 39;
Expand Down Expand Up @@ -153,13 +148,6 @@ TEST_P(ParallelLoopRunnerTest, Parallelize3DTile2D) {
[](int32_t value) { return value == 5; }));
}

INSTANTIATE_TEST_SUITE_P(ParallelLoopRunner, ParallelLoopRunnerTest,
testing::Values(std::nullopt, absl::Nanoseconds(100),
absl::Nanoseconds(500),
absl::Microseconds(1),
absl::Microseconds(10),
absl::Milliseconds(1)));

//===----------------------------------------------------------------------===//
// Performance benchmarks.
//===----------------------------------------------------------------------===//
Expand All @@ -183,10 +171,7 @@ static void BM_Parallelize2DTile1D(benchmark::State& state) {
Eigen::ThreadPoolDevice device(threads.AsEigenThreadPool(),
threads.NumThreads());

size_t timeslice = state.range(0);
ParallelLoopRunner runner(
&device, timeslice ? std::make_optional(absl::Nanoseconds(timeslice))
: std::nullopt);
ParallelLoopRunner runner(&device);

size_t range = 4;
size_t tile = 1;
Expand All @@ -204,10 +189,7 @@ static void BM_Parallelize3DTile2D(benchmark::State& state) {
Eigen::ThreadPoolDevice device(threads.AsEigenThreadPool(),
threads.NumThreads());

size_t timeslice = state.range(0);
ParallelLoopRunner runner(
&device, timeslice ? std::make_optional(absl::Nanoseconds(timeslice))
: std::nullopt);
ParallelLoopRunner runner(&device);

size_t range = 4;
size_t tile = 1;
Expand All @@ -219,7 +201,7 @@ static void BM_Parallelize3DTile2D(benchmark::State& state) {
}
}

BENCHMARK(BM_Parallelize3DTile2D)->Arg(0)->Arg(100)->Arg(10000);
BENCHMARK(BM_Parallelize3DTile2D);

} // namespace
} // namespace xla::cpu
4 changes: 1 addition & 3 deletions xla/backends/cpu/runtime/xnnpack/xnn_fusion_thunk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ limitations under the License.
#include "absl/memory/memory.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "pthreadpool.h"
#include "xla/backends/cpu/runtime/parallel_loop_runner.h"
Expand Down Expand Up @@ -184,8 +183,7 @@ absl::StatusOr<XnnFusionThunk::XnnRuntime> XnnFusionThunk::CreateXnnRuntime(

// Configure XNNPACK runtime thread pool if parallelization is enabled.
if (parallelization_mode == ParallelizationMode::kParallelLoopRunner) {
runtime.runner = std::make_unique<ParallelLoopRunner>(
device, /*worker_timeslice=*/absl::Microseconds(100));
runtime.runner = std::make_unique<ParallelLoopRunner>(device);
runtime.threadpool = CreateCustomPthreadpool(runtime.runner.get());
} else if (parallelization_mode == ParallelizationMode::kPThreadPool) {
runtime.threadpool = DefaultPthreadpool();
Expand Down

0 comments on commit c1ef7cc

Please sign in to comment.