From 3881c9a4b2a14952bf3f8fd4da1123348dc4442b Mon Sep 17 00:00:00 2001 From: Darcy <331046161@qq.com> Date: Sat, 11 Jan 2025 21:57:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=81=9C=E6=AD=A2=E4=BD=9C?= =?UTF-8?q?=E4=B8=9A=E4=BA=A7=E7=94=9F=E7=9A=84=E8=AF=AF=E6=8A=A5Failed?= =?UTF-8?q?=E7=8A=B6=E6=80=81=20(#4158)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (cherry picked from commit c25f4dd3fded21365f4ef44f0d044931e5f59d80) --- .../watcher/FlinkJobStatusWatcher.scala | 81 ++++++++++--------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index aa82962f39..2d574e40d9 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -17,7 +17,6 @@ package org.apache.streampark.flink.kubernetes.watcher -import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever} import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState, FlinkK8sExecuteMode} @@ -101,43 +100,43 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi ) // 1) k8s application mode - val appFuture: Set[Future[Option[JobStatusCV]]] = - trackIds.filter(_.executeMode == FlinkK8sExecuteMode.APPLICATION).map { - id => - val future = Future(touchApplicationJob(id)) - future.onComplete(_.getOrElse(None) match { - case Some(jobState) => + val appFuture = trackIds.filter(_.executeMode == FlinkK8sExecuteMode.APPLICATION).map { + id => + Future { + Try(touchApplicationJob(id)) match { + case Success(Some(jobState)) => updateState(id.copy(jobId = jobState.jobId), jobState) case _ => - }) - future - } + } + } + } // 2) k8s session mode val sessionIds = trackIds.filter(_.executeMode == FlinkK8sExecuteMode.SESSION) val sessionCluster = sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet val sessionFuture = sessionCluster.map { trackId => - val future = Future(touchSessionAllJob(trackId)) - future.onComplete(_.toOption match { - case Some(map) => - map.find(_._1.jobId == trackId.jobId) match { - case Some(job) => - updateState(job._1.copy(appId = trackId.appId), job._2) - case _ => - touchSessionJob(trackId) match { - case Some(state) => - if (FlinkJobState.isEndState(state.jobState)) { - // can't find that job in the k8s cluster. - watchController.unWatching(trackId) - } - eventBus.postSync(FlinkJobStatusChangeEvent(trackId, state)) - case _ => - } - } - case _ => - }) - future + Future { + val result = Try(touchSessionAllJob(trackId)) + result match { + case Success(map) => + map.find(_._1.jobId == trackId.jobId) match { + case Some(job) => + updateState(job._1.copy(appId = trackId.appId), job._2) + case _ => + touchSessionJob(trackId) match { + case Some(state) => + if (FlinkJobState.isEndState(state.jobState)) { + // can't find that job in the k8s cluster. + watchController.unWatching(trackId) + } + eventBus.postAsync(FlinkJobStatusChangeEvent(trackId, state)) + case _ => + } + } + case _ => + } + } } // blocking until all future are completed or timeout is reached @@ -220,7 +219,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi // set jobId to trackIds watchController.trackIds.update(trackId) - eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState)) + eventBus.postAsync(FlinkJobStatusChangeEvent(trackId, jobState)) } if (FlinkJobState.isEndState(jobState.jobState)) { @@ -273,14 +272,15 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi callJobsOverviewsApi(clusterRestUrl) }.getOrElse { logger.warn( - "Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.") + s"Failed to visit ${clusterKey.clusterId} remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.") val clusterRestUrl = watchController.refreshClusterRestUrl(clusterKey).getOrElse(return None) Try(callJobsOverviewsApi(clusterRestUrl)) match { case Success(s) => - logger.info("The retry is successful.") + logger.info(s"The retry ${clusterKey.clusterId} is successful.") s case Failure(e) => - logger.warn(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.") + logger.warn( + s"The retry fetch ${clusterKey.clusterId} failed, final status failed, errorStack=${e.getMessage}.") None } } @@ -313,7 +313,9 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi trackId.clusterId ) val jobState = trackId match { - case id if watchController.canceling.has(id) => + case id + if watchController.canceling.has(id) || latest.jobState.equals( + FlinkJobState.CANCELLING) => logger.info(s"trackId ${trackId.toString} is canceling") if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED case _ => @@ -321,7 +323,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi if (deployExists) { if (isConnection) { - logger.info("Enter the task starting process.") + logger.info(s"${trackId.clusterId} Enter the task starting process.") KubernetesDeploymentHelper.watchDeploymentLog( trackId.namespace, trackId.clusterId, @@ -331,7 +333,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi inferFromPreCache(latest) } } else if (isConnection) { - logger.info("The deployment is deleted and enters the task failure process.") + logger.info( + s"${trackId.clusterId} deployment is deleted and enters the task failure process.") FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId)) } else { inferFromPreCache(latest) @@ -381,9 +384,7 @@ object FlinkJobStatusWatcher { current match { case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED => previous match { - case FlinkJobState.CANCELLING => { - FlinkJobState.CANCELED - } + case FlinkJobState.CANCELLING => FlinkJobState.CANCELED case FlinkJobState.FAILING => FlinkJobState.FAILED case _ => current match {