diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala index e28922002e..c359ac9a0c 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala @@ -29,7 +29,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient import java.io.File import scala.collection.JavaConversions._ -import scala.util.{Success, Try} +import scala.util.{Failure, Success, Try} object KubernetesDeploymentHelper extends Logger { @@ -91,12 +91,14 @@ object KubernetesDeploymentHelper extends Logger { deleteConfigMap(nameSpace, deploymentName) } - def checkConnection(): Boolean = { + def checkConnection(e: Throwable => Unit): Boolean = { Try(new DefaultKubernetesClient) match { case Success(client) => client.close() true - case _ => false + case Failure(exception) => + e(exception) + false } } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index aa82962f39..2c1b8cf505 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -17,7 +17,6 @@ package org.apache.streampark.flink.kubernetes.watcher -import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever} import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState, FlinkK8sExecuteMode} @@ -168,7 +167,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi */ def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = { touchSessionAllJob(trackId) - .find(id => id._1.jobId == trackId.jobId && id._2.jobState != FlinkJobState.SILENT) + .find(id => id._1.jobId == trackId.jobId && id._2.jobState != FlinkJobState.LOST) .map(_._2) .orElse(inferState(trackId)) } @@ -214,12 +213,11 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi private[this] def updateState(trackId: TrackId, jobState: JobStatusCV): Unit = { val latest: JobStatusCV = watchController.jobStatuses.get(trackId) + // put job status to cache + watchController.jobStatuses.put(trackId, jobState) if (jobState.diff(latest)) { - // put job status to cache - watchController.jobStatuses.put(trackId, jobState) // set jobId to trackIds watchController.trackIds.update(trackId) - eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState)) } @@ -244,21 +242,23 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi lazy val pollEmitTime = System.currentTimeMillis val preCache = watchController.jobStatuses.get(id) val state = inferFromPreCache(preCache) - val nonFirstSilent = state == FlinkJobState.SILENT && + + val firstSilent = state == FlinkJobState.SILENT && preCache != null && - preCache.jobState == FlinkJobState.SILENT - val jobState = if (nonFirstSilent) { + preCache.jobState != FlinkJobState.SILENT + + val jobState = if (firstSilent) { JobStatusCV( - jobState = state, + jobState = FlinkJobState.LOST, jobId = id.jobId, - pollEmitTime = preCache.pollEmitTime, - pollAckTime = preCache.pollAckTime) + pollEmitTime = pollEmitTime, + pollAckTime = System.currentTimeMillis) } else { JobStatusCV( - jobState = state, + jobState = FlinkJobState.LOST, jobId = id.jobId, - pollEmitTime = pollEmitTime, - pollAckTime = System.currentTimeMillis) + pollEmitTime = preCache.pollEmitTime, + pollAckTime = preCache.pollAckTime) } Option(jobState) } @@ -315,10 +315,16 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi val jobState = trackId match { case id if watchController.canceling.has(id) => logger.info(s"trackId ${trackId.toString} is canceling") - if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED + if (deployExists) { + FlinkJobState.CANCELLING + } else { + FlinkJobState.CANCELED + } case _ => - val isConnection = KubernetesDeploymentHelper.checkConnection() - + val isConnection = KubernetesDeploymentHelper.checkConnection( + e => { + logger.warn("connect Kubernetes client failed. ", e) + }) if (deployExists) { if (isConnection) { logger.info("Enter the task starting process.") @@ -339,14 +345,15 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi } val jobStatusCV = JobStatusCV( - jobState = jobState, + jobState = if (jobState == FlinkJobState.SILENT) FlinkJobState.LOST else jobState, jobId = trackId.jobId, pollEmitTime = pollEmitTime, - pollAckTime = System.currentTimeMillis) + pollAckTime = System.currentTimeMillis + ) - if ( + val flag = jobState == FlinkJobState.SILENT && latest != null && latest.jobState == FlinkJobState.SILENT - ) { + if (flag) { Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime = latest.pollAckTime)) } else { Some(jobStatusCV)