Skip to content

Commit

Permalink
WIP: Now initialize all the parallel_scan accumulation
Browse files Browse the repository at this point in the history
variables with reduction_identity_sum_or_value_initialize
  • Loading branch information
nliber committed Aug 16, 2024
1 parent 81a9b31 commit 23f9f52
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 40 deletions.
35 changes: 21 additions & 14 deletions core/src/Cuda/Kokkos_Cuda_Task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define KOKKOS_IMPL_CUDA_TASK_HPP

#include <Kokkos_Macros.hpp>
#include <Kokkos_ReductionIdentity.hpp>
#if defined(KOKKOS_ENABLE_TASKDAG)

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -1063,12 +1064,14 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
const iType bound = loop_boundaries.end + loop_boundaries.start;
const int lane = threadIdx.y * blockDim.x;

value_type accum = 0;
value_type val, y, local_total;
value_type accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
value_type val;
Impl::reduction_identity_sum_or_value_initialize(val);

for (iType i = loop_boundaries.start; i < bound;
i += loop_boundaries.increment) {
val = 0;
Impl::reduction_identity_sum_or_value_initialize(val);
if (i < loop_boundaries.end) closure(i, val, false);

// intra-blockDim.y exclusive scan on 'val'
Expand All @@ -1077,29 +1080,30 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
// INCLUSIVE scan
for (int offset = blockDim.x; offset < Impl::CudaTraits::WarpSize;
offset <<= 1) {
y = Kokkos::shfl_up(val, offset, Impl::CudaTraits::WarpSize);
value_type y = Kokkos::shfl_up(val, offset, Impl::CudaTraits::WarpSize);
if (lane >= offset) {
val += y;
}
}

// pass accum to all threads
local_total = shfl_warp_broadcast<value_type>(
value_type local_total = shfl_warp_broadcast<value_type>(
val, threadIdx.x + Impl::CudaTraits::WarpSize - blockDim.x,
Impl::CudaTraits::WarpSize);

// make EXCLUSIVE scan by shifting values over one
val = Kokkos::shfl_up(val, blockDim.x, Impl::CudaTraits::WarpSize);
if (threadIdx.y == 0) {
val = 0;
Impl::reduction_identity_sum_or_value_initialize(val);
}

val += accum;
if (i < loop_boundaries.end) closure(i, val, true);
accum += local_total;
}
} else {
value_type accum = 0;
value_type accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
for (iType i = loop_boundaries.start; i < loop_boundaries.end;
i += loop_boundaries.increment) {
closure(i, accum, true);
Expand Down Expand Up @@ -1128,41 +1132,44 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
// make sure all threads perform all loop iterations
const iType bound = loop_boundaries.end + loop_boundaries.start;

value_type accum = 0;
value_type val, y, local_total;
value_type accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
value_type val;
Impl::reduction_identity_sum_or_value_initialize(val);

for (iType i = loop_boundaries.start; i < bound;
i += loop_boundaries.increment) {
val = 0;
Impl::reduction_identity_sum_or_value_initialize(val);
if (i < loop_boundaries.end) closure(i, val, false);

// intra-blockDim.x exclusive scan on 'val'
// accum = accumulated, sum in total for this iteration

// INCLUSIVE scan
for (int offset = 1; offset < blockDim.x; offset <<= 1) {
y = Kokkos::shfl_up(val, offset, blockDim.x);
value_type y = Kokkos::shfl_up(val, offset, blockDim.x);
if (threadIdx.x >= offset) {
val += y;
}
}

// pass accum to all threads
local_total =
value_type local_total =
shfl_warp_broadcast<value_type>(val, blockDim.x - 1, blockDim.x);

// make EXCLUSIVE scan by shifting values over one
val = Kokkos::shfl_up(val, 1, blockDim.x);
if (threadIdx.x == 0) {
val = 0;
Impl::reduction_identity_sum_or_value_initialize(val);
}

val += accum;
if (i < loop_boundaries.end) closure(i, val, true);
accum += local_total;
}
} else {
value_type accum = 0;
value_type accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
for (iType i = loop_boundaries.start; i < loop_boundaries.end;
i += loop_boundaries.increment) {
closure(i, accum, true);
Expand Down
9 changes: 6 additions & 3 deletions core/src/Cuda/Kokkos_Cuda_Team.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,18 +707,20 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
const auto team_size = member.team_size();
const auto team_rank = member.team_rank();
const auto nchunk = (end - start + team_size - 1) / team_size;
ValueType accum = 0;
ValueType accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
// each team has to process one or more chunks of the prefix scan
for (iType i = 0; i < nchunk; ++i) {
auto ii = start + i * team_size + team_rank;
// local accumulation for this chunk
ValueType local_accum = 0;
ValueType local_accum;
Impl::reduction_identity_sum_or_value_initialize(local_accum);
// user updates value with prefix value
if (ii < loop_bounds.end) lambda(ii, local_accum, false);
// perform team scan
local_accum = member.team_scan(local_accum);
// add this blocks accum to total accumulation
auto val = accum + local_accum;
ValueType val = accum + local_accum;
// user updates their data with total accumulation
if (ii < loop_bounds.end) lambda(ii, val, true);
// the last value needs to be propogated to next chunk
Expand Down Expand Up @@ -748,6 +750,7 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
void>::value_type;

value_type dummy;
Impl::reduction_identity_sum_or_value_initialize(dummy);
parallel_scan(loop_bounds, lambda, dummy);
}

Expand Down
11 changes: 8 additions & 3 deletions core/src/HIP/Kokkos_HIP_Team.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,18 +555,20 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
const auto team_size = member.team_size();
const auto team_rank = member.team_rank();
const auto nchunk = (end - start + team_size - 1) / team_size;
ValueType accum = {};
ValueType accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
// each team has to process one or more chunks of the prefix scan
for (iType i = 0; i < nchunk; ++i) {
auto ii = start + i * team_size + team_rank;
// local accumulation for this chunk
ValueType local_accum = 0;
ValueType local_accum;
Impl::reduction_identity_sum_or_value_initialize(local_accum);
// user updates value with prefix value
if (ii < loop_bounds.end) lambda(ii, local_accum, false);
// perform team scan
local_accum = member.team_scan(local_accum);
// add this blocks accum to total accumulation
auto val = accum + local_accum;
ValueType val = accum + local_accum;
// user updates their data with total accumulation
if (ii < loop_bounds.end) lambda(ii, val, true);
// the last value needs to be propogated to next chunk
Expand Down Expand Up @@ -596,6 +598,7 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
void>::value_type;

value_type scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);
parallel_scan(loop_bounds, lambda, scan_val);
}

Expand Down Expand Up @@ -851,6 +854,7 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
Kokkos::Impl::FunctorPatternInterface::SCAN, void, Closure,
void>::value_type;
value_type dummy;
Impl::reduction_identity_sum_or_value_initialize(dummy);
parallel_scan(loop_boundaries, closure, Kokkos::Sum<value_type>(dummy));
}

Expand All @@ -875,6 +879,7 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
"Non-matching value types of closure and return type");

ValueType accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
parallel_scan(loop_boundaries, closure, Kokkos::Sum<ValueType>(accum));

return_val = accum;
Expand Down
7 changes: 5 additions & 2 deletions core/src/HPX/Kokkos_HPX.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1860,7 +1860,8 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
static_assert(std::is_same_v<functor_value_type, ValueType>,
"Non-matching value types of functor and return type");

ValueType scan_val{};
ValueType scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);

// Intra-member scan
for (iType i = loop_boundaries.start; i < loop_boundaries.end;
Expand Down Expand Up @@ -1890,6 +1891,7 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
void>::value_type;

value_type scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);
parallel_scan(loop_bounds, lambda, scan_val);
}

Expand All @@ -1914,7 +1916,8 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
TeamPolicy<Experimental::HPX>, FunctorType,
void>::value_type;

value_type scan_val = value_type();
value_type scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);

#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
#pragma ivdep
Expand Down
19 changes: 13 additions & 6 deletions core/src/OpenMPTarget/Kokkos_OpenMPTarget_ParallelScan_Team.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <sstream>
#include <Kokkos_Parallel.hpp>
#include <OpenMPTarget/Kokkos_OpenMPTarget_Parallel.hpp>
#include <Kokkos_ReductionIdentity.hpp>

// FIXME_OPENMPTARGET - Using this macro to implement a workaround for
// hierarchical scan. It avoids hitting the code path which we wanted to
Expand Down Expand Up @@ -52,7 +53,8 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
const auto team_rank = member.team_rank();

#if defined(KOKKOS_IMPL_TEAM_SCAN_WORKAROUND)
ValueType scan_val = {};
ValueType scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);

if (team_rank == 0) {
for (iType i = start; i < end; ++i) {
Expand All @@ -66,19 +68,21 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
#else
const auto team_size = member.team_size();
const auto nchunk = (end - start + team_size - 1) / team_size;
ValueType accum = {};
ValueType accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
// each team has to process one or
// more chunks of the prefix scan
for (iType i = 0; i < nchunk; ++i) {
auto ii = start + i * team_size + team_rank;
// local accumulation for this chunk
ValueType local_accum = {};
ValueType local_accum;
Impl::reduction_identity_sum_or_value_initialize(local_accum);
// user updates value with prefix value
if (ii < loop_bounds.end) lambda(ii, local_accum, false);
// perform team scan
local_accum = member.team_scan(local_accum);
// add this blocks accum to total accumulation
auto val = accum + local_accum;
ValueType val = accum + local_accum;
// user updates their data with total accumulation
if (ii < loop_bounds.end) lambda(ii, val, true);
// the last value needs to be propogated to next chunk
Expand All @@ -101,6 +105,7 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
FunctorType, void>;
using value_type = typename Analysis::value_type;
value_type scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);
parallel_scan(loop_bounds, lambda, scan_val);
}
} // namespace Kokkos
Expand Down Expand Up @@ -131,7 +136,8 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
static_assert(std::is_same_v<analysis_value_type, ValueType>,
"Non-matching value types of functor and return type");

ValueType scan_val = {};
ValueType scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);

#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
#pragma ivdep
Expand All @@ -153,7 +159,8 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
FunctorType, void>;
using value_type = typename Analysis::value_type;

value_type scan_val = value_type();
value_type scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);
parallel_scan(loop_boundaries, lambda, scan_val);
}

Expand Down
15 changes: 10 additions & 5 deletions core/src/OpenMPTarget/Kokkos_OpenMPTarget_Task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,10 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
iType, Impl::TaskExec<Kokkos::Experimental::OpenMPTarget> >&
loop_boundaries,
const Lambda& lambda) {
ValueType accum = 0;
ValueType val, local_total;
ValueType accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
ValueType val;
Impl::reduction_identity_sum_or_value_initialize(val);
ValueType* shared = (ValueType*)loop_boundaries.thread.team_shared();
int team_size = loop_boundaries.thread.team_size();
int team_rank =
Expand All @@ -268,7 +270,8 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
// Intra-member scan
for (iType i = loop_boundaries.start; i < loop_boundaries.end;
i += loop_boundaries.increment) {
local_total = 0;
ValueType local_total;
Impl::reduction_identity_sum_or_value_initialize(local_total);
lambda(i, local_total, false);
val = accum;
lambda(i, val, true);
Expand All @@ -283,7 +286,8 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
for (iType i = 1; i < team_size; i += 1) {
shared[i] += shared[i - 1];
}
accum = 0; // Member 0 set accum to 0 in preparation for inter-member scan
// Member 0 set accum to sum identity in preparation for inter-member scan
Impl::reduction_identity_sum_or_value_initialize(accum);
}

loop_boundaries.thread.team_barrier();
Expand All @@ -294,7 +298,8 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
}
for (iType i = loop_boundaries.start; i < loop_boundaries.end;
i += loop_boundaries.increment) {
local_total = 0;
ValueType local_total;
Impl::reduction_identity_sum_or_value_initialize(local_total);
lambda(i, local_total, false);
val = accum;
lambda(i, val, true);
Expand Down
9 changes: 6 additions & 3 deletions core/src/SYCL/Kokkos_SYCL_Team.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,18 +600,20 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
const auto team_size = member.team_size();
const auto team_rank = member.team_rank();
const auto nchunk = (end - start + team_size - 1) / team_size;
ValueType accum = 0;
ValueType accum;
Impl::reduction_identity_sum_or_value_initialize(accum);
// each team has to process one or more chunks of the prefix scan
for (iType i = 0; i < nchunk; ++i) {
auto ii = start + i * team_size + team_rank;
// local accumulation for this chunk
ValueType local_accum = 0;
ValueType local_accum;
Impl::reduction_identity_sum_or_value_initialize(local_accum);
// user updates value with prefix value
if (ii < loop_bounds.end) lambda(ii, local_accum, false);
// perform team scan
local_accum = member.team_scan(local_accum);
// add this blocks accum to total accumulation
auto val = accum + local_accum;
ValueType val = accum + local_accum;
// user updates their data with total accumulation
if (ii < loop_bounds.end) lambda(ii, val, true);
// the last value needs to be propogated to next chunk
Expand All @@ -633,6 +635,7 @@ KOKKOS_INLINE_FUNCTION void parallel_scan(
void>::value_type;

value_type scan_val;
Impl::reduction_identity_sum_or_value_initialize(scan_val);
parallel_scan(loop_bounds, lambda, scan_val);
}

Expand Down
Loading

0 comments on commit 23f9f52

Please sign in to comment.