Skip to content

Commit

Permalink
[Improve] flink job-status bug fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Dec 7, 2024
1 parent aaef7b4 commit 1634ee5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient
import java.io.File

import scala.collection.JavaConversions._
import scala.util.{Success, Try}
import scala.util.{Failure, Success, Try}

object KubernetesDeploymentHelper extends Logger {

Expand Down Expand Up @@ -91,12 +91,14 @@ object KubernetesDeploymentHelper extends Logger {
deleteConfigMap(nameSpace, deploymentName)
}

def checkConnection(): Boolean = {
def checkConnection(e: Throwable => Unit): Boolean = {
Try(new DefaultKubernetesClient) match {
case Success(client) =>
client.close()
true
case _ => false
case Failure(exception) =>
e(exception)
false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.streampark.flink.kubernetes.watcher

import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState, FlinkK8sExecuteMode}
Expand Down Expand Up @@ -168,7 +167,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
*/
def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
touchSessionAllJob(trackId)
.find(id => id._1.jobId == trackId.jobId && id._2.jobState != FlinkJobState.SILENT)
.find(id => id._1.jobId == trackId.jobId && id._2.jobState != FlinkJobState.LOST)
.map(_._2)
.orElse(inferState(trackId))
}
Expand Down Expand Up @@ -214,12 +213,11 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi

private[this] def updateState(trackId: TrackId, jobState: JobStatusCV): Unit = {
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
if (jobState.diff(latest)) {
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
watchController.trackIds.update(trackId)

eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
}

Expand All @@ -244,21 +242,23 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
lazy val pollEmitTime = System.currentTimeMillis
val preCache = watchController.jobStatuses.get(id)
val state = inferFromPreCache(preCache)
val nonFirstSilent = state == FlinkJobState.SILENT &&

val firstSilent = state == FlinkJobState.SILENT &&
preCache != null &&
preCache.jobState == FlinkJobState.SILENT
val jobState = if (nonFirstSilent) {
preCache.jobState != FlinkJobState.SILENT

val jobState = if (firstSilent) {
JobStatusCV(
jobState = state,
jobState = FlinkJobState.LOST,
jobId = id.jobId,
pollEmitTime = preCache.pollEmitTime,
pollAckTime = preCache.pollAckTime)
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis)
} else {
JobStatusCV(
jobState = state,
jobState = FlinkJobState.LOST,
jobId = id.jobId,
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis)
pollEmitTime = preCache.pollEmitTime,
pollAckTime = preCache.pollAckTime)
}
Option(jobState)
}
Expand Down Expand Up @@ -315,10 +315,16 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
val jobState = trackId match {
case id if watchController.canceling.has(id) =>
logger.info(s"trackId ${trackId.toString} is canceling")
if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED
if (deployExists) {
FlinkJobState.CANCELLING
} else {
FlinkJobState.CANCELED
}
case _ =>
val isConnection = KubernetesDeploymentHelper.checkConnection()

val isConnection = KubernetesDeploymentHelper.checkConnection(
e => {
logger.warn("connect Kubernetes client failed. ", e)
})
if (deployExists) {
if (isConnection) {
logger.info("Enter the task starting process.")
Expand All @@ -339,14 +345,15 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
}

val jobStatusCV = JobStatusCV(
jobState = jobState,
jobState = if (jobState == FlinkJobState.SILENT) FlinkJobState.LOST else jobState,
jobId = trackId.jobId,
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis)
pollAckTime = System.currentTimeMillis
)

if (
val flag =
jobState == FlinkJobState.SILENT && latest != null && latest.jobState == FlinkJobState.SILENT
) {
if (flag) {
Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime = latest.pollAckTime))
} else {
Some(jobStatusCV)
Expand Down

0 comments on commit 1634ee5

Please sign in to comment.