Skip to content

Commit

Permalink
修复停止作业产生的误报Failed状态 (#4158)
Browse files Browse the repository at this point in the history
(cherry picked from commit c25f4dd3fded21365f4ef44f0d044931e5f59d80)
  • Loading branch information
lintingbin authored Jan 11, 2025
1 parent b5ecec0 commit 3881c9a
Showing 1 changed file with 41 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.streampark.flink.kubernetes.watcher

import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState, FlinkK8sExecuteMode}
Expand Down Expand Up @@ -101,43 +100,43 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
)

// 1) k8s application mode
val appFuture: Set[Future[Option[JobStatusCV]]] =
trackIds.filter(_.executeMode == FlinkK8sExecuteMode.APPLICATION).map {
id =>
val future = Future(touchApplicationJob(id))
future.onComplete(_.getOrElse(None) match {
case Some(jobState) =>
val appFuture = trackIds.filter(_.executeMode == FlinkK8sExecuteMode.APPLICATION).map {
id =>
Future {
Try(touchApplicationJob(id)) match {
case Success(Some(jobState)) =>
updateState(id.copy(jobId = jobState.jobId), jobState)
case _ =>
})
future
}
}
}
}

// 2) k8s session mode
val sessionIds = trackIds.filter(_.executeMode == FlinkK8sExecuteMode.SESSION)
val sessionCluster = sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet
val sessionFuture = sessionCluster.map {
trackId =>
val future = Future(touchSessionAllJob(trackId))
future.onComplete(_.toOption match {
case Some(map) =>
map.find(_._1.jobId == trackId.jobId) match {
case Some(job) =>
updateState(job._1.copy(appId = trackId.appId), job._2)
case _ =>
touchSessionJob(trackId) match {
case Some(state) =>
if (FlinkJobState.isEndState(state.jobState)) {
// can't find that job in the k8s cluster.
watchController.unWatching(trackId)
}
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, state))
case _ =>
}
}
case _ =>
})
future
Future {
val result = Try(touchSessionAllJob(trackId))
result match {
case Success(map) =>
map.find(_._1.jobId == trackId.jobId) match {
case Some(job) =>
updateState(job._1.copy(appId = trackId.appId), job._2)
case _ =>
touchSessionJob(trackId) match {
case Some(state) =>
if (FlinkJobState.isEndState(state.jobState)) {
// can't find that job in the k8s cluster.
watchController.unWatching(trackId)
}
eventBus.postAsync(FlinkJobStatusChangeEvent(trackId, state))
case _ =>
}
}
case _ =>
}
}
}

// blocking until all future are completed or timeout is reached
Expand Down Expand Up @@ -220,7 +219,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
// set jobId to trackIds
watchController.trackIds.update(trackId)

eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
eventBus.postAsync(FlinkJobStatusChangeEvent(trackId, jobState))
}

if (FlinkJobState.isEndState(jobState.jobState)) {
Expand Down Expand Up @@ -273,14 +272,15 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
callJobsOverviewsApi(clusterRestUrl)
}.getOrElse {
logger.warn(
"Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
s"Failed to visit ${clusterKey.clusterId} remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
val clusterRestUrl = watchController.refreshClusterRestUrl(clusterKey).getOrElse(return None)
Try(callJobsOverviewsApi(clusterRestUrl)) match {
case Success(s) =>
logger.info("The retry is successful.")
logger.info(s"The retry ${clusterKey.clusterId} is successful.")
s
case Failure(e) =>
logger.warn(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.")
logger.warn(
s"The retry fetch ${clusterKey.clusterId} failed, final status failed, errorStack=${e.getMessage}.")
None
}
}
Expand Down Expand Up @@ -313,15 +313,17 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
trackId.clusterId
)
val jobState = trackId match {
case id if watchController.canceling.has(id) =>
case id
if watchController.canceling.has(id) || latest.jobState.equals(
FlinkJobState.CANCELLING) =>
logger.info(s"trackId ${trackId.toString} is canceling")
if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED
case _ =>
val isConnection = KubernetesDeploymentHelper.checkConnection()

if (deployExists) {
if (isConnection) {
logger.info("Enter the task starting process.")
logger.info(s"${trackId.clusterId} Enter the task starting process.")
KubernetesDeploymentHelper.watchDeploymentLog(
trackId.namespace,
trackId.clusterId,
Expand All @@ -331,7 +333,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
inferFromPreCache(latest)
}
} else if (isConnection) {
logger.info("The deployment is deleted and enters the task failure process.")
logger.info(
s"${trackId.clusterId} deployment is deleted and enters the task failure process.")
FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId))
} else {
inferFromPreCache(latest)
Expand Down Expand Up @@ -381,9 +384,7 @@ object FlinkJobStatusWatcher {
current match {
case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED =>
previous match {
case FlinkJobState.CANCELLING => {
FlinkJobState.CANCELED
}
case FlinkJobState.CANCELLING => FlinkJobState.CANCELED
case FlinkJobState.FAILING => FlinkJobState.FAILED
case _ =>
current match {
Expand Down

0 comments on commit 3881c9a

Please sign in to comment.