Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reference implementation for parallel_phase feature #1570

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5e5778e
Add initial impl of Parallel Block
pavelkumbrasev Nov 13, 2024
55fd835
Add test for workers_leave
pavelkumbrasev Nov 18, 2024
637a74d
Fix thread_leave_manager
pavelkumbrasev Nov 20, 2024
eb172a7
Add parallel_block API, improve tests
isaevil Nov 20, 2024
7f08af0
Improve tests for parallel_block
isaevil Nov 25, 2024
f6a139a
Add workers_leave::automatic, let hybric systems take advantage of pa…
isaevil Nov 26, 2024
0a9faed
Fix Windows compilation
isaevil Nov 26, 2024
c1802a1
Add win32 exports
isaevil Nov 27, 2024
ecd5a9e
Add mac64 symbols
isaevil Nov 27, 2024
0d32f80
Correct mask, add a clarifying comment to it
isaevil Nov 28, 2024
f61fc1d
Apply suggested typo fix
isaevil Nov 28, 2024
17f1dac
Rename block to phase
isaevil Nov 29, 2024
aa7cc2a
Align with RFC, utilize my_version_and_traits, improve readability
isaevil Dec 2, 2024
104606c
Fix non-preview compilation
isaevil Dec 3, 2024
597136f
Change scoped_parallel_phase default parameter, fix entry points for Win
isaevil Dec 4, 2024
1bc6e3a
Apply suggestions from code review
isaevil Dec 9, 2024
73742f3
Apply comments, change tests
isaevil Dec 6, 2024
ba5b922
Correct mac64 exports
isaevil Dec 10, 2024
7311cec
Improve test reporting, move thread demand into internals
isaevil Dec 13, 2024
1f22bc6
Extend testing suite with scoped phases and test with this_task_arena
isaevil Dec 13, 2024
fd4a24f
Apply suggestions from code review
isaevil Dec 17, 2024
77985c0
Simplify state transition logic
isaevil Dec 17, 2024
1a6b73c
Align leave_policy variables names across code
isaevil Dec 17, 2024
398d16b
Move parallel phase tests into a separate file
isaevil Dec 17, 2024
4dfaf5e
Address comments
isaevil Dec 17, 2024
6a951bd
Decrease the size of dummy work, don't execute tests for workerless a…
isaevil Dec 20, 2024
9d507ca
Apply suggestions from code review
isaevil Jan 14, 2025
515fd33
Improve test stability
isaevil Jan 15, 2025
0f233bb
Use uintptr_t
isaevil Jan 15, 2025
15c3ea5
Update copyright years
isaevil Jan 15, 2025
f041f3a
Reduce testing time
isaevil Jan 15, 2025
fdae151
Move RFC to the experimental stage
isaevil Jan 15, 2025
a178e85
Fix one time fast leave
isaevil Jan 20, 2025
0f1b1a5
Fix sources format
isaevil Jan 20, 2025
c38eb2f
Rename entry points, simplify state machine interaction
isaevil Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -534,4 +534,8 @@
#define __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1
#endif

#if TBB_PREVIEW_PARALLEL_PHASE || __TBB_BUILD
#define __TBB_PREVIEW_PARALLEL_PHASE 1
#endif

#endif // __TBB_detail__config_H
173 changes: 152 additions & 21 deletions include/oneapi/tbb/task_arena.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,11 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, s
TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);

#if __TBB_PREVIEW_PARALLEL_PHASE
TBB_EXPORT void __TBB_EXPORTED_FUNC enter_parallel_phase(d1::task_arena_base*, std::uintptr_t);
TBB_EXPORT void __TBB_EXPORTED_FUNC exit_parallel_phase(d1::task_arena_base*, std::uintptr_t);
#endif
} // namespace r1

namespace d2 {
Expand Down Expand Up @@ -122,6 +127,14 @@ class task_arena_base {
normal = 2 * priority_stride,
high = 3 * priority_stride
};

#if __TBB_PREVIEW_PARALLEL_PHASE
enum class leave_policy : int {
automatic = 0,
fast = 1
};
#endif

#if __TBB_ARENA_BINDING
using constraints = tbb::detail::d1::constraints;
#endif /*__TBB_ARENA_BINDING*/
Expand Down Expand Up @@ -162,13 +175,36 @@ class task_arena_base {
return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
}

#if __TBB_PREVIEW_PARALLEL_PHASE
leave_policy get_leave_policy() const {
return (my_version_and_traits & fast_leave_policy_flag) ? leave_policy::fast : leave_policy::automatic;
}

int leave_policy_trait(leave_policy lp) const {
return lp == leave_policy::fast ? fast_leave_policy_flag : 0;
}

void set_leave_policy(leave_policy lp) {
my_version_and_traits |= leave_policy_trait(lp);
}
#endif

enum {
default_flags = 0
, core_type_support_flag = 1
default_flags = 0,
core_type_support_flag = 1,
fast_leave_policy_flag = 1 << 1
};

task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
: my_version_and_traits(default_flags | core_type_support_flag)
task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp
#endif
)
: my_version_and_traits(default_flags | core_type_support_flag
#if __TBB_PREVIEW_PARALLEL_PHASE
| leave_policy_trait(lp)
#endif
)
, my_initialization_state(do_once_state::uninitialized)
, my_arena(nullptr)
, my_max_concurrency(max_concurrency)
Expand All @@ -180,8 +216,16 @@ class task_arena_base {
{}

#if __TBB_ARENA_BINDING
task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
: my_version_and_traits(default_flags | core_type_support_flag)
task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp
#endif
)
: my_version_and_traits(default_flags | core_type_support_flag
#if __TBB_PREVIEW_PARALLEL_PHASE
| leave_policy_trait(lp)
#endif
)
, my_initialization_state(do_once_state::uninitialized)
, my_arena(nullptr)
, my_max_concurrency(constraints_.max_concurrency)
Expand Down Expand Up @@ -259,31 +303,58 @@ class task_arena : public task_arena_base {
* Value of 1 is default and reflects behavior of implicit arenas.
**/
task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, lp
#endif
)
{}

#if __TBB_ARENA_BINDING
//! Creates task arena pinned to certain NUMA node
task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
: task_arena_base(constraints_, reserved_for_masters, a_priority)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
: task_arena_base(constraints_, reserved_for_masters, a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, lp
#endif
)
{}

//! Copies settings from another task_arena
task_arena(const task_arena &s) // copy settings but not the reference or instance
task_arena(const task_arena &a) // copy settings but not the reference or instance
: task_arena_base(
constraints{}
.set_numa_id(s.my_numa_id)
.set_max_concurrency(s.my_max_concurrency)
.set_core_type(s.my_core_type)
.set_max_threads_per_core(s.my_max_threads_per_core)
, s.my_num_reserved_slots, s.my_priority)
.set_numa_id(a.my_numa_id)
.set_max_concurrency(a.my_max_concurrency)
.set_core_type(a.my_core_type)
.set_max_threads_per_core(a.my_max_threads_per_core)
, a.my_num_reserved_slots, a.my_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, a.get_leave_policy()
#endif
)

{}
#else
//! Copies settings from another task_arena
task_arena(const task_arena& a) // copy settings but not the reference or instance
: task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
: task_arena_base(a.my_max_concurrency,
a.my_num_reserved_slots,
a.my_priority,
#if __TBB_PREVIEW_PARALLEL_PHASE
a.get_leave_policy()
#endif
)
{}
#endif /*__TBB_ARENA_BINDING*/

Expand All @@ -292,7 +363,11 @@ class task_arena : public task_arena_base {

//! Creates an instance of task_arena attached to the current arena of the thread
explicit task_arena( attach )
: task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
: task_arena_base(automatic, 1, priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy::automatic
#endif
) // use default settings if attach fails
{
if (r1::attach(*this)) {
mark_initialized();
Expand All @@ -311,21 +386,32 @@ class task_arena : public task_arena_base {

//! Overrides concurrency level and forces initialization of internal representation
void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
{
__TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
if( !is_active() ) {
my_max_concurrency = max_concurrency_;
my_num_reserved_slots = reserved_for_masters;
my_priority = a_priority;
#if __TBB_PREVIEW_PARALLEL_PHASE
set_leave_policy(lp);
#endif
r1::initialize(*this);
mark_initialized();
}
}

#if __TBB_ARENA_BINDING
void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
{
__TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
if( !is_active() ) {
Expand All @@ -335,6 +421,9 @@ class task_arena : public task_arena_base {
my_max_threads_per_core = constraints_.max_threads_per_core;
my_num_reserved_slots = reserved_for_masters;
my_priority = a_priority;
#if __TBB_PREVIEW_PARALLEL_PHASE
set_leave_policy(lp);
#endif
r1::initialize(*this);
mark_initialized();
}
Expand Down Expand Up @@ -404,6 +493,32 @@ class task_arena : public task_arena_base {
return execute_impl<decltype(f())>(f);
}

#if __TBB_PREVIEW_PARALLEL_PHASE
void start_parallel_phase() {
initialize();
r1::enter_parallel_phase(this, /*reserved*/0);
}
void end_parallel_phase(bool with_fast_leave = false) {
__TBB_ASSERT(my_initialization_state.load(std::memory_order_relaxed) == do_once_state::initialized, nullptr);
// It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1
r1::exit_parallel_phase(this, static_cast<std::uintptr_t>(with_fast_leave));
}

class scoped_parallel_phase {
task_arena& arena;
bool one_time_fast_leave;
public:
scoped_parallel_phase(task_arena& ta, bool with_fast_leave = false)
: arena(ta), one_time_fast_leave(with_fast_leave)
{
arena.start_parallel_phase();
}
~scoped_parallel_phase() {
arena.end_parallel_phase(one_time_fast_leave);
}
};
#endif

#if __TBB_EXTRA_DEBUG
//! Returns my_num_reserved_slots
int debug_reserved_slots() const {
Expand Down Expand Up @@ -472,6 +587,17 @@ inline void enqueue(F&& f) {
enqueue_impl(std::forward<F>(f), nullptr);
}

#if __TBB_PREVIEW_PARALLEL_PHASE
inline void start_parallel_phase() {
r1::enter_parallel_phase(nullptr, /*reserved*/0);
}

inline void end_parallel_phase(bool with_fast_leave) {
// It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1
r1::exit_parallel_phase(nullptr, static_cast<std::uintptr_t>(with_fast_leave));
}
#endif

using r1::submit;

} // namespace d1
Expand All @@ -491,6 +617,11 @@ using detail::d1::max_concurrency;
using detail::d1::isolate;

using detail::d1::enqueue;

#if __TBB_PREVIEW_PARALLEL_PHASE
using detail::d1::start_parallel_phase;
using detail::d1::end_parallel_phase;
isaevil marked this conversation as resolved.
Show resolved Hide resolved
#endif
} // namespace this_task_arena

} // inline namespace v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ void scoped_parallel_phase_example() {
// Computation
}
}

```

## Considerations
Expand All @@ -256,6 +255,32 @@ it might introduce performance problems if:
Heavier involvement of less performant core types might result in artificial work
imbalance in the arena.

## Technical Details

To implement the proposed feature, the following changes were made:
* Added a new entity `thread_leave_manager` to the `r1::arena` which is responsible for
for managing the state of workers' arena leaving behaviour.
* Introduced two new entry points to the library.
* `r1::enter_parallel_phase(d1::task_arena_base*, std::uintptr_t)` - used to communicate
the start of parallel phase with the library.
* `r1::exit_parallel_phase(d1::task_arena_base*, std::uintptr_t)` - used to communicate
the end of parallel phase with the library.

### Thread Leave Manager

`thread_leave_manager` class implements the state machine described in proposal.
Specifically, it controls when worker threads are allowed to be retained in the arena.
`thread_leave_manager` is initialized with a state that determines the default
behavior for workers leaving the arena.

To support `start/end_parallel_phase` API, it provides functionality to override the default
state with a "Parallel Phase" state. It also keeps track of the number of active parallel phases.

The following sequence diagram illustrates the interaction between the user and
the `thread_leave_manager` during the execution of parallel phases. It shows how the
`thread_leave_manager` manages the state transitions when using `start/end_parallel_phase`.

<img src="parallel_phase_sequence_diagram.png" width=1000>

## Open Questions in Design

Expand All @@ -272,3 +297,10 @@ Some open questions that remain:
* Do we see any value if arena potentially can transition from one to another state?
* What if different types of workloads are mixed in one application?
* What if there concurrent calls to this API?

## Conditions to become fully supported

Following conditions need to be met for the feature to move from experimental to fully supported:
* Open questions regarding API should be resolved.
* The feature should demonstrate performance improvements in scenarios mentioned.
* oneTBB specification needs to be updated to reflect the new feature.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading