-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50735][CONNECT] Failure in ExecuteResponseObserver results in infinite reattaching requests #49370
Conversation
e495228
to
d3391c7
Compare
Hey, @juliuszsompolski , I hope you are doing well. Can you please review this change? |
039cb9d
to
8e8773c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @changgyoopark-db
I think I need some more description to understand this change.
...server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
Outdated
Show resolved
Hide resolved
...nnect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
Outdated
Show resolved
Hide resolved
8e8773c
to
8a15185
Compare
def isOrphan(): Boolean = { | ||
// Check runner.completed() before others as the acquire memory fence in the method ensures the | ||
// current thread to read the last known state of responseObserver correctly. | ||
runner.completed() && | ||
!runner.shouldDelegateCompleteResponse(request) && | ||
!responseObserver.completed() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to look at it fresh tomorrow because with the new version of the code I am again confused :-).
This basically checks if the ExecuteThreadRunner exited without sending onCompleted / onError.
onCompleted is send at the end of executeInternal.
execute is wrapped in various try catches.
How about checking there that executeInternal exited without closing the stream, and closing the stream with an onError from there? Then the RPC handler side should get this error via the usual route.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While self-reviewing the code, I was also confused (+ I found a small logical hole with an interrupt) :-( I'll resume working on it tomorrow. + Closing the stream with an onError when reattaching seems to be a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just calling onError doesn't work nicely because the client will retry if a custom RetryPolicy is specified; it's translated into an UNKNOWN
gRPC error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you stick another ErrorUtils.handleError
with a new IllegalStateException
in this finally of execute
if at this stage the stream is not closed (and not delegated), it would get turned into an error that shouldn't be swallowed by retry policy?
} finally {
// Make sure to transition to completed in order to prevent the thread from being interrupted
// afterwards.
var currentState = state.getAcquire()
while (currentState == ThreadState.started ||
currentState == ThreadState.startedInterrupted) {
val interrupted = currentState == ThreadState.startedInterrupted
val prevState = state.compareAndExchangeRelease(currentState, ThreadState.completed)
if (prevState == currentState) {
if (interrupted) {
try {
ErrorUtils.handleError(
"execute",
executeHolder.responseObserver,
executeHolder.sessionHolder.userId,
executeHolder.sessionHolder.sessionId,
Some(executeHolder.eventsManager),
true)(new SparkSQLException("OPERATION_CANCELED", Map.empty))
} finally {
executeHolder.cleanup()
}
}
return
}
currentState = prevState
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting something there (the finally block of ExecuteThreadRunner) is not 100% safe since creating an exception can cause OOM errors; it's also a heap allocation. That's why I would like to stick to handling the situation in the reattach handler.
- OOM there (ExecuteThreadRunner) is unrecoverable.
- Reattach->check will eventually succeed if the JVM manages to spare some memory.
...src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
Outdated
Show resolved
Hide resolved
…/service/SparkConnectReattachExecuteHandler.scala Co-authored-by: Juliusz Sompolski <[email protected]>
@HyukjinKwon Hi, can you please merge this PR? Thanks! |
Merged to master and branch-4.0. |
…infinite reattaching requests ### What changes were proposed in this pull request? The Spark Connect reattach request handler checks whether the associated ExecuteThreadRunner is completed, and returns an error if it has failed to record the outcome. ### Why are the changes needed? ExecuteResponseObserver.{onError, onComplete} are fallible while they are not retried; this leads to a situation where the ExecuteThreadRunner is completed without succeeding in responding to the client, and thus the client keeps retrying by reattaching the execution. To be specific, if an ExecuteThreadRunner fails to record the completion of execution or an error on the observer and then just disappears, the client will endlessly reattach, hoping that "someone" will eventually record "some data," but since the ExecuteThreadRunner is gone, this becomes a deadlock situation. The fix is that when the client attaches, the handler checks the status of the ExecuteThreadRunner, and if it finds that the execution cannot make any progress, an error is returned. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? testOnly org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49370 from changgyoopark-db/SPARK-50735. Lead-authored-by: changgyoopark-db <[email protected]> Co-authored-by: Changgyoo Park <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 07aa4ff) Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
The Spark Connect reattach request handler checks whether the associated ExecuteThreadRunner is completed, and returns an error if it has failed to record the outcome.
Why are the changes needed?
ExecuteResponseObserver.{onError, onComplete} are fallible while they are not retried; this leads to a situation where the ExecuteThreadRunner is completed without succeeding in responding to the client, and thus the client keeps retrying by reattaching the execution.
To be specific, if an ExecuteThreadRunner fails to record the completion of execution or an error on the observer and then just disappears, the client will endlessly reattach, hoping that "someone" will eventually record "some data," but since the ExecuteThreadRunner is gone, this becomes a deadlock situation.
The fix is that when the client attaches, the handler checks the status of the ExecuteThreadRunner, and if it finds that the execution cannot make any progress, an error is returned.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
testOnly org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite
Was this patch authored or co-authored using generative AI tooling?
No.