Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safe, Channel-like, iteration of chunks #49

Closed
fredrikekre opened this issue Sep 22, 2024 · 7 comments
Closed

Thread-safe, Channel-like, iteration of chunks #49

fredrikekre opened this issue Sep 22, 2024 · 7 comments

Comments

@fredrikekre
Copy link
Contributor

I find the following pattern to be useful:

using ChunkSplitters

ntasks = Threads.nthreads()

# Job producer
chunks = ChunkSplitters.chunks(itr; kwargs...)
chunk_channel = Channel{eltype(chunks)}(2 * ntasks; spawn = true) do ch
    for chunk in chunks
        put!(ch, chunk)
    end
end

# Job consumers
@sync for _ in 1:ntasks
    Threads.@spawn begin
        for chunk in chunk_channel
            for idx in chunk
                # Do work
            end
        end
    end
end

However, the job producer task, and the task switching between producer and
consumers is unnecessary because the jobs are already "produced" and the
Channel is only needed here as a way to iterate through the chunks in a
thread-safe manner.

To get around this, something like AtomicChunk (name suggestion?) can be
used (Inspired by @KnutAM's similar construct in https://github.com/KnutAM/FerriteAssembly.jl/blob/18fb1a541af37d6ea095cb111e39c3ac1fea8929/src/Multithreading/TaskChunks.jl#L1-L37):

mutable struct AtomicChunk{C <: Chunk}
    const chunk::C
    @atomic idx::Int
    function AtomicChunk(chunk::C) where {C <: Chunk}
        new{C}(chunk, 0)
    end
end

Base.length(a::AtomicChunk) = length(a.chunk)
Base.eltype(a::AtomicChunk) = eltype(a.chunk)

function Base.iterate(c::AtomicChunk, unused = nothing)
    this = @atomic c.idx += 1
    if this <= length(c.chunk)
        return (getchunk(c.chunk, this), nothing)
    else
        return nothing
    end
end

With this we can write the first code block as just a loop over the chunks:

using ChunkSplitters

ntasks = Threads.nthreads()

chunks = AtomicChunk(ChunkSplitters.chunks(itr; kwargs...))

# Job consumers
@sync for _ in 1:ntasks
    Threads.@spawn begin
        for chunk in chunks
            for idx in chunk
                # Do work
            end
        end
    end
end

Does such an iterator wrapper exist somewhere? Would something like this be useful to add to this package?

@fredrikekre
Copy link
Contributor Author

Some timings:

using ChunkSplitters, BenchmarkTools

function mc_parallel_producer_consumers(N; ntasks = Threads.nthreads(), chunk_size)
    chunks = ChunkSplitters.chunks(1:N; size = chunk_size)
    channel = Channel{eltype(chunks)}(2 * ntasks; spawn = true) do ch for chunk in chunks
            put!(ch, chunk)
        end
    end
    tasks = map(1:ntasks) do _
        Threads.@spawn begin
            local s = 0.0
            for chunk in channel
                for _ in chunk
                    s += rand()^2 + rand()^2 < 1.0
                end
            end
            s
        end
    end
    return sum(fetch, tasks)
end

function mc_parallel_consumers(N; ntasks = Threads.nthreads(), chunk_size)
    chunks = ChunkSplitters.AtomicChunk(ChunkSplitters.chunks(1:N; size = chunk_size))
    tasks = map(1:ntasks) do _
        Threads.@spawn begin
            local s = 0.0
            for chunk in chunks
                for _ in chunk
                    s += rand()^2 + rand()^2 < 1.0
                end
            end
            s
        end
    end
    return sum(fetch, tasks)
end

const N = 10_000_000

@btime mc_parallel_producer_consumers(N; chunk_size = 1)
@btime mc_parallel_consumers(N; chunk_size = 1)

@btime mc_parallel_producer_consumers(N; chunk_size = 256)
@btime mc_parallel_consumers(N; chunk_size = 256)

@btime mc_parallel_producer_consumers(N; chunk_size = 10_000)
@btime mc_parallel_consumers(N; chunk_size = 10_000)

which gives

  11.340 s (52 allocations: 4.73 KiB)
  392.088 ms (29 allocations: 2.17 KiB)
  27.058 ms (48 allocations: 4.30 KiB)
  9.128 ms (29 allocations: 2.17 KiB)
  8.719 ms (49 allocations: 4.41 KiB)
  8.615 ms (29 allocations: 2.17 KiB)

@lmiq
Copy link
Collaborator

lmiq commented Sep 22, 2024

I wander if it makes sense to make the access to chunks and chunk indices always thread safe?

I guess that wouldn't affect the current applications.

Do you have any thoughts on this @carstenbauer ?

@fredrikekre
Copy link
Contributor Author

fredrikekre commented Sep 22, 2024

I think the iterator would need to be stateful then (like my code above and like Channel).

@lmiq
Copy link
Collaborator

lmiq commented Sep 25, 2024

Do you want to propose a PR before we release 3.0? Probably it is a good time to introduce something new in the API.

@carstenbauer
Copy link
Member

I think we should add this as an option (not the default). Since this is a new feature, and thus non breaking, it doesn't have to go into 3.0 but can be added in any minor version.

The remaining question here is how we want to expose this feature. Should users wrap their chunks in a type of should this be a keyword option for chunks?

@fredrikekre
Copy link
Contributor Author

fredrikekre commented Sep 25, 2024

It can easily be generalized to any indexable thing, so perhaps it shouldn't live in this package at all. The main issue with exposing this by default is that you can not iterate over it twice:

X = AtomicChunk(ChunkSplitters.chunk(...))

for x in X
    # do something
end

for x in X
    # this will never be reached
end

so I think you really want to wrap the thing in this in a more local fashion where you know you are about to iterate over it twice, just like in #121.

Perhaps this can just be an internal thing in OhMyThreads for now?

@carstenbauer
Copy link
Member

Perhaps this can just be an internal thing in OhMyThreads for now?

Yeah, let's add this to OhMyThreads for now. (JuliaFolds2/OhMyThreads.jl#121)

@carstenbauer carstenbauer reopened this Sep 27, 2024
@carstenbauer carstenbauer closed this as not planned Won't fix, can't repro, duplicate, stale Sep 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants