Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-8105 Fix concurrent flush attempts corrupting segment pool #4632

Merged
merged 1 commit into from
Jan 30, 2025

Conversation

bjhham
Copy link
Contributor

@bjhham bjhham commented Jan 28, 2025

Subsystem
Client, CIO

Motivation
Should solve all of our concurrency woes:

  • KTOR-8105 Race condition when writing to a buffer leads to NPE inside CIOReaderKt.readFrom
  • KTOR-8086 NPE in readBuffer
  • KTOR-8096 ArrayIndexOutOfBounds kotlinx-io

Solution
I found that the NIOSocketImpl is flushing the same channel as the TLS encoder coroutine, which can potentially lead to corruption due to the encoder writing the "close" message, then performing its own flushAndClose.

The stack traces in question:

[1318671573] write @ call-context#1025
  io.ktor.utils.io.ByteChannel.flush(ByteChannel.kt:91)
  io.ktor.utils.io.ByteChannel.flushAndClose(ByteChannel.kt:127)
  io.ktor.utils.io.CloseHookByteWriteChannel.flushAndClose(CloseHookByteWriteChannel.kt:21)
  io.ktor.utils.io.ByteWriteChannelKt$close$1.invoke(ByteWriteChannel.kt:39)
  io.ktor.utils.io.ByteWriteChannelKt$close$1.invoke(ByteWriteChannel.kt:39)
  kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt$createCoroutineUnintercepted$$inlined$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt$1.invokeSuspend(IntrinsicsJvm.kt:223)
  kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:279)
  kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:16)
  io.ktor.utils.io.ByteWriteChannelOperationsKt.fireAndForget(ByteWriteChannelOperations.kt:215)
  io.ktor.utils.io.ByteWriteChannelKt.close(ByteWriteChannel.kt:39)
  io.ktor.network.sockets.NIOSocketImpl.close(NIOSocketImpl.kt:65)
  io.ktor.network.tls.TLSCommonKt.tls(TLSCommon.kt:43)
  io.ktor.network.tls.TLSCommonKt$tls$3.invokeSuspend(TLSCommon.kt)
  kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:98)
  kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
  kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
  kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
[1318671573] write @ cio-to-nio-writer#2264
  io.ktor.utils.io.ByteChannel.flush(ByteChannel.kt:91)
  io.ktor.utils.io.ByteChannel.flushAndClose(ByteChannel.kt:127)
  io.ktor.utils.io.ByteReadChannelOperationsKt$reader$job$1.invokeSuspend(ByteReadChannelOperations.kt:319)
  kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:98)
  kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
  kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
  kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
[1318671573] write @ cio-tls-encoder#2326
  io.ktor.utils.io.ByteChannel.getWriteBuffer(ByteChannel.kt:49)
  io.ktor.utils.io.ByteWriteChannelOperationsKt.writeByte(ByteWriteChannelOperations.kt:17)
  io.ktor.network.tls.RenderKt.writeRecord(Render.kt:18)
  io.ktor.network.tls.TLSClientHandshake$output$1.invokeSuspend(TLSClientHandshake.kt:120)
  kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
  kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
  kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
  kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
  kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)

@bjhham bjhham force-pushed the bjhham/buffer-corruption-fix branch 2 times, most recently from 49cb390 to 7c88567 Compare January 28, 2025 18:58
@bjhham bjhham marked this pull request as ready for review January 29, 2025 07:34
@bjhham
Copy link
Contributor Author

bjhham commented Jan 29, 2025

FYI I am going to run my jury-rigged instrumentation to detect any other places where we might have two coroutines (writing/flusing) or reading on the same bytechannel instance.

@bjhham bjhham force-pushed the bjhham/buffer-corruption-fix branch from 7c88567 to d13913b Compare January 29, 2025 15:06
@bjhham bjhham requested review from osipxd and e5l January 29, 2025 15:10
@bjhham bjhham force-pushed the bjhham/buffer-corruption-fix branch from d13913b to 43fa9f3 Compare January 30, 2025 09:15
@bjhham bjhham merged commit 1c4f5b9 into main Jan 30, 2025
15 of 16 checks passed
@bjhham bjhham deleted the bjhham/buffer-corruption-fix branch January 30, 2025 10:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants