Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50735][CONNECT] Failure in ExecuteResponseObserver results in infinite reattaching requests #49370

Closed
wants to merge 15 commits into from

Conversation

changgyoopark-db
Copy link
Contributor

@changgyoopark-db changgyoopark-db commented Jan 6, 2025

What changes were proposed in this pull request?

The Spark Connect reattach request handler checks whether the associated ExecuteThreadRunner is completed, and returns an error if it has failed to record the outcome.

Why are the changes needed?

ExecuteResponseObserver.{onError, onComplete} are fallible while they are not retried; this leads to a situation where the ExecuteThreadRunner is completed without succeeding in responding to the client, and thus the client keeps retrying by reattaching the execution.

To be specific, if an ExecuteThreadRunner fails to record the completion of execution or an error on the observer and then just disappears, the client will endlessly reattach, hoping that "someone" will eventually record "some data," but since the ExecuteThreadRunner is gone, this becomes a deadlock situation.

The fix is that when the client attaches, the handler checks the status of the ExecuteThreadRunner, and if it finds that the execution cannot make any progress, an error is returned.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

testOnly org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite

Was this patch authored or co-authored using generative AI tooling?

No.

@changgyoopark-db changgyoopark-db force-pushed the SPARK-50735 branch 4 times, most recently from e495228 to d3391c7 Compare January 6, 2025 13:49
@changgyoopark-db
Copy link
Contributor Author

Hey, @juliuszsompolski , I hope you are doing well. Can you please review this change?
-> Short description. If ExecuteThreadRunner fails to record the completion/error to the observer (e.g., due to OOM), the client permanently tries to reattach to the.
-> The fix is to let the stream sender send an error if ExecuteThreadRunner is gone without recording anything.
-> This does not cover streaming queries (if there's any problem).

Copy link
Contributor

@juliuszsompolski juliuszsompolski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @changgyoopark-db
I think I need some more description to understand this change.

@changgyoopark-db changgyoopark-db marked this pull request as draft January 13, 2025 15:07
@changgyoopark-db changgyoopark-db marked this pull request as ready for review January 13, 2025 17:07
Comment on lines 133 to 139
def isOrphan(): Boolean = {
// Check runner.completed() before others as the acquire memory fence in the method ensures the
// current thread to read the last known state of responseObserver correctly.
runner.completed() &&
!runner.shouldDelegateCompleteResponse(request) &&
!responseObserver.completed()
}
Copy link
Contributor

@juliuszsompolski juliuszsompolski Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to look at it fresh tomorrow because with the new version of the code I am again confused :-).
This basically checks if the ExecuteThreadRunner exited without sending onCompleted / onError.
onCompleted is send at the end of executeInternal.
execute is wrapped in various try catches.

How about checking there that executeInternal exited without closing the stream, and closing the stream with an onError from there? Then the RPC handler side should get this error via the usual route.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While self-reviewing the code, I was also confused (+ I found a small logical hole with an interrupt) :-( I'll resume working on it tomorrow. + Closing the stream with an onError when reattaching seems to be a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just calling onError doesn't work nicely because the client will retry if a custom RetryPolicy is specified; it's translated into an UNKNOWN gRPC error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you stick another ErrorUtils.handleError with a new IllegalStateException in this finally of execute if at this stage the stream is not closed (and not delegated), it would get turned into an error that shouldn't be swallowed by retry policy?

    } finally {
      // Make sure to transition to completed in order to prevent the thread from being interrupted
      // afterwards.
      var currentState = state.getAcquire()
      while (currentState == ThreadState.started ||
        currentState == ThreadState.startedInterrupted) {
        val interrupted = currentState == ThreadState.startedInterrupted
        val prevState = state.compareAndExchangeRelease(currentState, ThreadState.completed)
        if (prevState == currentState) {
          if (interrupted) {
            try {
              ErrorUtils.handleError(
                "execute",
                executeHolder.responseObserver,
                executeHolder.sessionHolder.userId,
                executeHolder.sessionHolder.sessionId,
                Some(executeHolder.eventsManager),
                true)(new SparkSQLException("OPERATION_CANCELED", Map.empty))
            } finally {
              executeHolder.cleanup()
            }
          }
          return
        }
        currentState = prevState
      }
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting something there (the finally block of ExecuteThreadRunner) is not 100% safe since creating an exception can cause OOM errors; it's also a heap allocation. That's why I would like to stick to handling the situation in the reattach handler.

  • OOM there (ExecuteThreadRunner) is unrecoverable.
  • Reattach->check will eventually succeed if the JVM manages to spare some memory.

@changgyoopark-db changgyoopark-db marked this pull request as draft January 14, 2025 08:35
@changgyoopark-db changgyoopark-db marked this pull request as ready for review January 14, 2025 09:10
@changgyoopark-db
Copy link
Contributor Author

@HyukjinKwon Hi, can you please merge this PR? Thanks!

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 20, 2025

Merged to master and branch-4.0.

HyukjinKwon pushed a commit that referenced this pull request Jan 21, 2025
…infinite reattaching requests

### What changes were proposed in this pull request?

The Spark Connect reattach request handler checks whether the associated ExecuteThreadRunner is completed, and returns an error if it has failed to record the outcome.

### Why are the changes needed?

ExecuteResponseObserver.{onError, onComplete} are fallible while they are not retried; this leads to a situation where the ExecuteThreadRunner is completed without succeeding in responding to the client, and thus the client keeps retrying by reattaching the execution.

To be specific, if an ExecuteThreadRunner fails to record the completion of execution or an error on the observer and then just disappears, the client will endlessly reattach, hoping that "someone" will eventually record "some data," but since the ExecuteThreadRunner is gone, this becomes a deadlock situation.

The fix is that when the client attaches, the handler checks the status of the ExecuteThreadRunner, and if it finds that the execution cannot make any progress, an error is returned.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

testOnly org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49370 from changgyoopark-db/SPARK-50735.

Lead-authored-by: changgyoopark-db <[email protected]>
Co-authored-by: Changgyoo Park <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 07aa4ff)
Signed-off-by: Hyukjin Kwon <[email protected]>
@changgyoopark-db changgyoopark-db deleted the SPARK-50735 branch January 21, 2025 06:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants