Skip to content

Commit

Permalink
[Bug] RestOptions.AWAIT_LEADER_TIMEOUT type compatibility improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Jan 24, 2025
1 parent 9c80377 commit 9f8bfe1
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,33 @@

package org.apache.streampark.flink.client.tool

import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.KubernetesRetriever

import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration.{Configuration, CoreOptions}
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
import org.apache.hc.core5.util.Timeout
import org.apache.streampark.common.util.Logger
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization

import java.io.File
import java.nio.charset.StandardCharsets

import java.time.Duration
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}

object FlinkSessionSubmitHelper extends Logger {

// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout

// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
private lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(30000L)

@transient
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats

Expand All @@ -59,8 +64,8 @@ object FlinkSessionSubmitHelper extends Logger {
// upload flink-job jar
val uploadResult = Request
.post(s"$jmRestUrl/jars/upload")
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.body(
MultipartEntityBuilder
.create()
Expand Down Expand Up @@ -90,8 +95,8 @@ object FlinkSessionSubmitHelper extends Logger {
// refer to https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
val resp = Request
.post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run")
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.body(new StringEntity(Serialization.write(new JarRunRequest(flinkConfig))))
.execute
.returnContent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,25 @@

package org.apache.streampark.flink.kubernetes

import org.apache.streampark.common.util.{DateUtils, Logger, Utils}
import org.apache.streampark.common.util.Utils.using
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey

import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException}
import org.apache.flink.client.cli.ClientOptions
import org.apache.flink.client.deployment.{ClusterClientFactory, DefaultClusterClientServiceLoader}
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{Configuration, DeploymentOptions, RestOptions}
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.hc.core5.util.Timeout
import org.apache.streampark.common.util.Utils.using
import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey

import javax.annotation.Nullable

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

object KubernetesRetriever extends Logger {

// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)

// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())

private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]()

/** get new KubernetesClient */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@

package org.apache.streampark.flink.kubernetes.watcher

import org.apache.hc.client5.http.fluent.Request
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
import org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEvent
import org.apache.streampark.flink.kubernetes.model.{CheckpointCV, ClusterKey, TrackId}

import org.apache.hc.client5.http.fluent.Request
import org.json4s.{DefaultFormats, JNull}
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, MetricWatcherConfig}
import org.json4s.JsonAST.JNothing
import org.json4s.jackson.JsonMethods.parse

import javax.annotation.concurrent.ThreadSafe
import org.json4s.{DefaultFormats, JNull}

import java.nio.charset.StandardCharsets
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import javax.annotation.concurrent.ThreadSafe
import scala.concurrent.duration.DurationLong
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -119,8 +116,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.def
Checkpoint.as(
Request
.get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints")
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8)) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
JobDetails.as(
Request
.get(s"$restUrl/jobs/overview")
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent()
.asString(StandardCharsets.UTF_8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@

package org.apache.streampark.flink.kubernetes.watcher

import org.apache.flink.configuration.{JobManagerOptions, MemorySize, TaskManagerOptions}
import org.apache.hc.client5.http.fluent.Request
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
import org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
import org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV, TrackId}

import org.apache.flink.configuration.{JobManagerOptions, MemorySize, TaskManagerOptions}
import org.apache.hc.client5.http.fluent.Request
import org.json4s.{DefaultFormats, JArray}
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, MetricWatcherConfig}
import org.json4s.jackson.JsonMethods.parse

import javax.annotation.concurrent.ThreadSafe
import org.json4s.{DefaultFormats, JArray}

import java.nio.charset.StandardCharsets
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import javax.annotation.concurrent.ThreadSafe
import scala.concurrent.duration.DurationLong
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -131,8 +128,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default
.as(
Request
.get(s"$flinkJmRestUrl/overview")
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8))
Expand All @@ -144,8 +141,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default
.as(
Request
.get(s"$flinkJmRestUrl/jobmanager/config")
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@

package org.apache.streampark.flink.kubernetes.watcher

import org.apache.hc.core5.util.Timeout

import java.time.Duration
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean

import scala.language.implicitConversions

trait FlinkWatcher extends AutoCloseable {

// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout

// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(30000L)

private[this] val started: AtomicBoolean = new AtomicBoolean(false)

private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)
Expand Down

0 comments on commit 9f8bfe1

Please sign in to comment.