Skip to content

Commit

Permalink
[Improve] YarnClusterDescriptorWrapper minor improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Aug 29, 2024
1 parent 33577e0 commit 9058c9e
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration._
import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
import org.apache.flink.runtime.util.HadoopUtils
import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId

import java.util.Collections
import java.util.concurrent.Callable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.streampark.flink.util.FlinkUtils

import org.apache.flink.client.program.PackagedProgram
import org.apache.flink.configuration.{Configuration, DeploymentOptions}
import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.flink.yarn.configuration.YarnDeploymentTarget
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
import org.apache.hadoop.fs.{Path => HadoopPath}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.ClusterSpecification
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration._
import org.apache.flink.runtime.util.HadoopUtils
import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.{ApplicationId, FinalApplicationStatus}
import org.apache.hadoop.yarn.util.ConverterUtils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package org.apache.streampark.flink.core
import org.apache.flink.yarn.YarnClusterDescriptor

class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {}
extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package org.apache.streampark.flink.core
import org.apache.flink.yarn.YarnClusterDescriptor

class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {}
extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package org.apache.streampark.flink.core
import org.apache.flink.yarn.YarnClusterDescriptor

class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {}
extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ package org.apache.streampark.flink.core
import org.apache.flink.yarn.YarnClusterDescriptor

class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {}
extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ package org.apache.streampark.flink.core
import org.apache.flink.yarn.YarnClusterDescriptor

class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {}
extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ package org.apache.streampark.flink.core
import org.apache.flink.yarn.YarnClusterDescriptor

class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {}
extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ package org.apache.streampark.flink.core
import org.apache.flink.yarn.YarnClusterDescriptor

class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {}
extends YarnClusterDescriptorTrait(yarnClusterDescriptor)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.streampark.flink.core

import collection.JavaConversions._
import com.google.common.collect.Lists
import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.hadoop.fs.Path

Expand All @@ -28,8 +27,9 @@ import java.util
class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {

override def addShipFiles(input: util.List[File]) = {
val f = input.map(c => new Path(c.toURI))
yarnClusterDescriptor.addShipFiles(Lists.newArrayList(f: _*))
override def addShipFiles(input: util.List[File]): Unit = {
val list = new util.ArrayList[Path]()
input.foreach(x => list.add(new Path(x.toURI)))
yarnClusterDescriptor.addShipFiles(list)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.streampark.flink.core

import collection.JavaConversions._
import com.google.common.collect.Lists
import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.hadoop.fs.Path

Expand All @@ -28,8 +27,9 @@ import java.util
class YarnClusterDescriptorWrapper(yarnClusterDescriptor: YarnClusterDescriptor)
extends YarnClusterDescriptorTrait(yarnClusterDescriptor) {

override def addShipFiles(input: util.List[File]) = {
val f = input.map(c => new Path(c.toURI))
yarnClusterDescriptor.addShipFiles(Lists.newArrayList(f: _*))
override def addShipFiles(input: util.List[File]): Unit = {
val list = new util.ArrayList[Path]()
input.foreach(x => list.add(new Path(x.toURI)))
yarnClusterDescriptor.addShipFiles(list)
}
}

0 comments on commit 9058c9e

Please sign in to comment.