From 9f8bfe13f7a00f3eb64d1673e5f63f3fbb6eef96 Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 24 Jan 2025 22:07:13 +0800 Subject: [PATCH] [Bug] RestOptions.AWAIT_LEADER_TIMEOUT type compatibility improvement --- .../tool/FlinkSessionClientHelper.scala | 21 ++++++++++------- .../kubernetes/KubernetesRetriever.scala | 21 ++++------------- .../watcher/FlinkCheckpointWatcher.scala | 17 ++++++-------- .../watcher/FlinkJobStatusWatcher.scala | 4 ++-- .../watcher/FlinkMetricsWatcher.scala | 23 ++++++++----------- .../kubernetes/watcher/FlinkWatcher.scala | 10 +++++++- 6 files changed, 46 insertions(+), 50 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala index fcfcd3fc16..1fc86c7890 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala @@ -17,9 +17,6 @@ package org.apache.streampark.flink.client.tool -import org.apache.streampark.common.util.Logger -import org.apache.streampark.flink.kubernetes.KubernetesRetriever - import org.apache.flink.client.deployment.application.ApplicationConfiguration import org.apache.flink.configuration.{Configuration, CoreOptions} import org.apache.flink.runtime.jobgraph.SavepointConfigOptions @@ -27,18 +24,26 @@ import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder import org.apache.hc.client5.http.fluent.Request import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity +import org.apache.hc.core5.util.Timeout +import org.apache.streampark.common.util.Logger import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization import java.io.File import java.nio.charset.StandardCharsets - +import java.time.Duration import scala.collection.JavaConversions._ import scala.util.{Failure, Success, Try} object FlinkSessionSubmitHelper extends Logger { + // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT} + private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout + + // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT + private lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(30000L) + @transient implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats @@ -59,8 +64,8 @@ object FlinkSessionSubmitHelper extends Logger { // upload flink-job jar val uploadResult = Request .post(s"$jmRestUrl/jars/upload") - .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) - .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) + .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC) .body( MultipartEntityBuilder .create() @@ -90,8 +95,8 @@ object FlinkSessionSubmitHelper extends Logger { // refer to https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload val resp = Request .post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run") - .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) - .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) + .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC) .body(new StringEntity(Serialization.write(new JarRunRequest(flinkConfig)))) .execute .returnContent() diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index da6870f176..8b0cd69080 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -17,12 +17,6 @@ package org.apache.streampark.flink.kubernetes -import org.apache.streampark.common.util.{DateUtils, Logger, Utils} -import org.apache.streampark.common.util.Utils.using -import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode -import org.apache.streampark.flink.kubernetes.ingress.IngressController -import org.apache.streampark.flink.kubernetes.model.ClusterKey - import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException} import org.apache.flink.client.cli.ClientOptions import org.apache.flink.client.deployment.{ClusterClientFactory, DefaultClusterClientServiceLoader} @@ -30,23 +24,18 @@ import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration.{Configuration, DeploymentOptions, RestOptions} import org.apache.flink.kubernetes.KubernetesClusterDescriptor import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions -import org.apache.hc.core5.util.Timeout +import org.apache.streampark.common.util.Utils.using +import org.apache.streampark.common.util.{Logger, Utils} +import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode +import org.apache.streampark.flink.kubernetes.ingress.IngressController +import org.apache.streampark.flink.kubernetes.model.ClusterKey import javax.annotation.Nullable - import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} object KubernetesRetriever extends Logger { - // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT} - val FLINK_CLIENT_TIMEOUT_SEC: Timeout = - Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis) - - // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT - val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = - Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue()) - private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]() /** get new KubernetesClient */ diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala index 4abd10df89..bd52794415 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala @@ -17,23 +17,20 @@ package org.apache.streampark.flink.kubernetes.watcher +import org.apache.hc.client5.http.fluent.Request import org.apache.streampark.common.util.Logger -import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig} import org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEvent import org.apache.streampark.flink.kubernetes.model.{CheckpointCV, ClusterKey, TrackId} - -import org.apache.hc.client5.http.fluent.Request -import org.json4s.{DefaultFormats, JNull} +import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, MetricWatcherConfig} import org.json4s.JsonAST.JNothing import org.json4s.jackson.JsonMethods.parse - -import javax.annotation.concurrent.ThreadSafe +import org.json4s.{DefaultFormats, JNull} import java.nio.charset.StandardCharsets import java.util.concurrent.{ScheduledFuture, TimeUnit} - -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} +import javax.annotation.concurrent.ThreadSafe import scala.concurrent.duration.DurationLong +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} import scala.language.postfixOps import scala.util.{Failure, Success, Try} @@ -119,8 +116,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.def Checkpoint.as( Request .get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints") - .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) - .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) + .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC) .execute .returnContent .asString(StandardCharsets.UTF_8)) match { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index 2d574e40d9..05835252fa 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -291,8 +291,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi JobDetails.as( Request .get(s"$restUrl/jobs/overview") - .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) - .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) + .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC) .execute .returnContent() .asString(StandardCharsets.UTF_8) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala index 269ca6bc30..889714a2d1 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala @@ -17,23 +17,20 @@ package org.apache.streampark.flink.kubernetes.watcher +import org.apache.flink.configuration.{JobManagerOptions, MemorySize, TaskManagerOptions} +import org.apache.hc.client5.http.fluent.Request import org.apache.streampark.common.util.Logger -import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig} import org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent import org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV, TrackId} - -import org.apache.flink.configuration.{JobManagerOptions, MemorySize, TaskManagerOptions} -import org.apache.hc.client5.http.fluent.Request -import org.json4s.{DefaultFormats, JArray} +import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, MetricWatcherConfig} import org.json4s.jackson.JsonMethods.parse - -import javax.annotation.concurrent.ThreadSafe +import org.json4s.{DefaultFormats, JArray} import java.nio.charset.StandardCharsets import java.util.concurrent.{ScheduledFuture, TimeUnit} - -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} +import javax.annotation.concurrent.ThreadSafe import scala.concurrent.duration.DurationLong +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} import scala.language.postfixOps import scala.util.{Failure, Success, Try} @@ -131,8 +128,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default .as( Request .get(s"$flinkJmRestUrl/overview") - .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) - .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) + .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC) .execute .returnContent .asString(StandardCharsets.UTF_8)) @@ -144,8 +141,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default .as( Request .get(s"$flinkJmRestUrl/jobmanager/config") - .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) - .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) + .connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(FLINK_CLIENT_TIMEOUT_SEC) .execute .returnContent .asString(StandardCharsets.UTF_8)) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala index fbedc52873..6ecb0bbe1b 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala @@ -17,13 +17,21 @@ package org.apache.streampark.flink.kubernetes.watcher +import org.apache.hc.core5.util.Timeout + +import java.time.Duration import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.atomic.AtomicBoolean - import scala.language.implicitConversions trait FlinkWatcher extends AutoCloseable { + // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT} + lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout + + // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT + lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(30000L) + private[this] val started: AtomicBoolean = new AtomicBoolean(false) private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)