Skip to content

Commit

Permalink
Fix buffer_drain_concurrency not doing anything.
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Schreiber <[email protected]>
  • Loading branch information
arthurschreiber committed Nov 18, 2023
1 parent 81777e5 commit f020258
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions go/vt/vtgate/buffer/shard_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,32 @@ func (sb *shardBuffer) drain(q []*entry, err error) {
sb.timeoutThread.stop()

start := sb.timeNow()
// TODO(mberlin): Parallelize the drain by pumping the data through a channel.

// Parallelize the drain by pumping the data through a channel.
entryChan := make(chan *entry, len(q))

parallelism := min(bufferDrainConcurrency, len(q))

var wg sync.WaitGroup
wg.Add(parallelism)

for i := 0; i < parallelism; i++ {
go func() {
for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
}

wg.Done()
}()
}

for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
entryChan <- e
}

close(entryChan)
wg.Wait()

d := sb.timeNow().Sub(start)
log.Infof("Draining finished for shard: %s Took: %v for: %d requests.", topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d, len(q))
requestsDrained.Add(sb.statsKey, int64(len(q)))
Expand Down

0 comments on commit f020258

Please sign in to comment.