diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala index ca6d50b254..313b06963c 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala @@ -17,8 +17,8 @@ package org.apache.streampark.common.fs -import org.apache.streampark.common.util.Logger -import org.apache.streampark.common.util.Utils.{isAnyBank, notEmpty} +import org.apache.streampark.common.util.{AssertUtils, Logger} +import org.apache.streampark.common.util.Utils.isAnyBank import org.apache.commons.codec.digest.DigestUtils import org.apache.commons.io.{FileUtils, IOUtils} @@ -41,7 +41,7 @@ object LfsOperator extends FsOperator with Logger { } override def delete(path: String): Unit = { - if (notEmpty(path)) { + if (AssertUtils.isNotEmpty(path)) { val file = new File(path) if (file.exists()) { FileUtils.forceDelete(file) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala new file mode 100644 index 0000000000..5d0eb50426 --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util + +import javax.annotation.Nullable + +import java.util +import java.util.{Collection => JavaCollection, Map => JavaMap} + +import scala.collection.JavaConversions._ + +/** @since 2.1.6 */ +object AssertUtils { + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the + * condition is not met (evaluates to {@code false}). + * + * @param condition + * The condition to check + * @throws IllegalArgumentException + * Thrown, if the condition is violated. + */ + def required(condition: Boolean): Unit = { + if (!condition) { + throw new IllegalArgumentException + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the + * condition is not met (evaluates to {@code false}). The exception will have the given error + * message. + * + * @param condition + * The condition to check + * @param message + * The message for the {@code IllegalArgumentException} that is thrown if the check fails. + * @throws IllegalArgumentException + * Thrown, if the condition is violated. + */ + def required(condition: Boolean, @Nullable message: String): Unit = { + if (!condition) { + throw new IllegalArgumentException(message) + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalStateException} if the + * condition is not met (evaluates to {@code false}). + * + * @param condition + * The condition to check + * @throws IllegalStateException + * Thrown, if the condition is violated. + */ + def state(condition: Boolean): Unit = { + if (!condition) { + throw new IllegalStateException + } + } + + /** + * Checks the given boolean condition, and throws an IllegalStateException if the condition is not + * met (evaluates to {@code false}). The exception will have the given error message. + * + * @param condition + * The condition to check + * @param message + * The message for the IllegalStateException that is thrown if the check fails. + * @throws IllegalStateException + * Thrown, if the condition is violated. + */ + def state(condition: Boolean, @Nullable message: String): Unit = { + if (!condition) { + throw new IllegalStateException(message) + } + } + + // ------------------------------------------------------------------------ + // Null checks + // ------------------------------------------------------------------------ + /** Ensures that the given object reference is not null. Upon violation, a */ + def notNull[T](@Nullable reference: T): T = { + if (reference == null) { + throw new NullPointerException + } + reference + } + + /** + * Ensures that the given object reference is not null. Upon violation, a NullPointerException + * that is thrown if the check fails. + * + * @return + * The object reference itself (generically typed). + * @throws NullPointerException + * Thrown, if the passed reference was null. + */ + def notNull[T](@Nullable reference: T, @Nullable message: String): T = { + if (reference == null) { + throw new NullPointerException(message) + } + reference + } + + def isEmpty(reference: AnyRef): Boolean = !isNotEmpty(reference) + + def isNotEmpty(elem: AnyRef): Boolean = { + elem match { + case null => false + case x if x.isInstanceOf[Array[_]] => elem.asInstanceOf[Array[_]].nonEmpty + case x if x.isInstanceOf[CharSequence] => elem.toString.trim.nonEmpty + case x if x.isInstanceOf[Traversable[_]] => x.asInstanceOf[Traversable[_]].nonEmpty + case x if x.isInstanceOf[Iterable[_]] => x.asInstanceOf[Iterable[_]].nonEmpty + case x if x.isInstanceOf[JavaCollection[_]] => !x.asInstanceOf[JavaCollection[_]].isEmpty + case x if x.isInstanceOf[JavaMap[_, _]] => !x.asInstanceOf[JavaMap[_, _]].isEmpty + case _ => true + } + } + + /** + * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and + * must contain at least one element.
AssertUtils.notEmpty(array, "must be
+   * contain elements");
+ * + * @param reference + * the object to check + * @throws IllegalArgumentException + * if the object array is {@code null} or contains no elements + */ + def notEmpty(reference: AnyRef): Unit = { + if (isEmpty(reference)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and + * must contain at least one element.
 AssertUtils.notEmpty(array, "must be
+   * contain elements");
+ * + * @param reference + * the object to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the object array is {@code null} or contains no elements + */ + def notEmpty(@Nullable reference: AnyRef, message: String): Unit = { + if (isEmpty(reference)) { + throw new IllegalArgumentException(message) + } + } + + /** + * Assert that an array contains no {@code null} elements.

Note: Does not complain if the array + * is empty!

AssertUtils.noNullElements(array, "The array must contain non-null
+   * elements");
+ * + * @param array + * the array to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the object array contains a {@code null} element + */ + def noNullElements(@Nullable array: Array[AnyRef], message: String): Unit = { + if (array != null) for (element <- array) { + if (element == null) throw new IllegalArgumentException(message) + } + } + + /** + * Assert that a collection contains no {@code null} elements.

Note: Does not complain if the + * collection is empty!

AssertUtils.noNullElements(collection, "Collection must
+   * contain non-null elements");
+ * + * @param collection + * the collection to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the collection contains a {@code null} element + */ + def noNullElements(@Nullable collection: util.Collection[_], message: String): Unit = { + if (collection != null) for (element <- collection) { + if (element == null) { + throw new IllegalArgumentException(message) + } + } + } + + /** + * Assert that the given String is not empty; that is, it must not be {@code null} and not the + * empty String.
AssertUtils.hasLength(name, "Name must not be empty");
+ * + * @param text + * the String to check + * @throws IllegalArgumentException + * if the text is empty + * @see + * StringUtils#hasLength + */ + def hasLength(@Nullable text: String): Unit = { + if (!getHasLength(text)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that the given String is not empty; that is, it must not be {@code null} and not the + * empty String.
AssertUtils.hasLength(name, "Name must not be empty");
+ * + * @param text + * the String to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the text is empty + * @see + * StringUtils#hasLength + */ + def hasLength(@Nullable text: String, message: String): Unit = { + if (!getHasLength(text)) { + throw new IllegalArgumentException(message) + } + } + + /** + * Assert that the given String contains valid text content; that is, it must not be {@code null} + * and must contain at least one non-whitespace character.
AssertUtils.hasText(name, "'name' must not be empty");
+ * + * @param text + * the String to check + * @throws IllegalArgumentException + * if the text does not contain valid text content + * @see + * StringUtils#hasText + */ + def hasText(@Nullable text: String): Unit = { + if (!getHasText(text)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that the given String contains valid text content; that is, it must not be {@code null} + * and must contain at least one non-whitespace character.
AssertUtils.hasText(name, "'name' must not be empty");
+ * + * @param text + * the String to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the text does not contain valid text content + * @see + * StringUtils#hasText + */ + def hasText(@Nullable text: String, message: String): Unit = { + if (!getHasText(text)) { + throw new IllegalArgumentException(message) + } + } + + private[this] def getHasLength(@Nullable str: String): Boolean = + str != null && str.nonEmpty + + private[this] def getHasText(@Nullable str: String): Boolean = { + str != null && str.nonEmpty && containsText(str) + } + + private[this] def containsText(str: CharSequence): Boolean = { + val strLen = str.length + for (i <- 0 until strLen) { + if (!Character.isWhitespace(str.charAt(i))) { + return true + } + } + false + } + +} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index d3eed9d2f2..3090a72a7d 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -92,8 +92,8 @@ object FileUtils { def getPathFromEnv(env: String): String = { val path = System.getenv(env) - require( - Utils.notEmpty(path), + AssertUtils.required( + AssertUtils.isNotEmpty(path), s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env") val file = new File(path) require(file.exists(), s"[StreamPark] FileUtils.getPathFromEnv: $env is not exist!") diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala index 3ce4fd9a79..1c6daa9832 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala @@ -134,7 +134,7 @@ private[this] object LoggerFactory extends LoggerFactoryBinder { private val shadedPackage = "org.apache.streampark.shaded" override def configureByResource(url: URL): Unit = { - Utils.notNull(url, "URL argument cannot be null") + AssertUtils.notNull(url, "URL argument cannot be null") val path = url.getPath if (path.endsWith("xml")) { val configurator = new JoranConfigurator() diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 6adc03cf72..f839fa207f 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -280,7 +280,7 @@ object PropertiesUtils extends Logger { val map = mutable.Map[String, String]() val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "") simple.split("\\s?-D") match { - case d if Utils.notEmpty(d) => + case d if AssertUtils.isNotEmpty(d) => d.foreach( x => { if (x.nonEmpty) { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index 2015a2c80c..80a19c2af4 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -18,11 +18,11 @@ package org.apache.streampark.common.util import org.apache.commons.lang3.StringUtils -import java.io.{BufferedInputStream, File, FileInputStream, IOException, PrintWriter, StringWriter} +import java.io._ import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort} import java.net.URL import java.time.Duration -import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID} +import java.util.{jar, Properties, UUID} import java.util.concurrent.locks.LockSupport import java.util.jar.{JarFile, JarInputStream} @@ -34,43 +34,6 @@ object Utils extends Logger { private[this] lazy val OS = System.getProperty("os.name").toLowerCase - def notNull(obj: Any, message: String): Unit = { - if (obj == null) { - throw new NullPointerException(message) - } - } - - def notNull(obj: Any): Unit = { - notNull(obj, "this argument must not be null") - } - - def notEmpty(elem: Any): Boolean = { - elem match { - case null => false - case x if x.isInstanceOf[Array[_]] => elem.asInstanceOf[Array[_]].nonEmpty - case x if x.isInstanceOf[CharSequence] => elem.toString.trim.nonEmpty - case x if x.isInstanceOf[Traversable[_]] => x.asInstanceOf[Traversable[_]].nonEmpty - case x if x.isInstanceOf[Iterable[_]] => x.asInstanceOf[Iterable[_]].nonEmpty - case x if x.isInstanceOf[JavaCollection[_]] => !x.asInstanceOf[JavaCollection[_]].isEmpty - case x if x.isInstanceOf[JavaMap[_, _]] => !x.asInstanceOf[JavaMap[_, _]].isEmpty - case _ => true - } - } - - def isEmpty(elem: Any): Boolean = !notEmpty(elem) - - def required(expression: Boolean): Unit = { - if (!expression) { - throw new IllegalArgumentException - } - } - - def required(expression: Boolean, errorMessage: Any): Unit = { - if (!expression) { - throw new IllegalArgumentException(s"requirement failed: ${errorMessage.toString}") - } - } - def uuid(): String = UUID.randomUUID().toString.replaceAll("-", "") @throws[IOException] diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java index 524959322b..47096a0ff2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.base.util; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.cglib.beans.BeanMap; @@ -451,7 +451,7 @@ public static > Map sortMapByValue(Map< } public static T[] arrayRemoveElements(T[] array, T... elem) { - Utils.notNull(array); + AssertUtils.notNull(array); List arrayList = new ArrayList<>(0); Collections.addAll(arrayList, array); if (isEmpty(elem)) { @@ -464,7 +464,7 @@ public static T[] arrayRemoveElements(T[] array, T... elem) { } public static T[] arrayRemoveIndex(T[] array, int... index) { - Utils.notNull(array); + AssertUtils.notNull(array); for (int j : index) { if (j < 0 || j > array.length - 1) { throw new IndexOutOfBoundsException("index error.@" + j); @@ -481,7 +481,7 @@ public static T[] arrayRemoveIndex(T[] array, int... index) { } public static T[] arrayInsertIndex(T[] array, int index, T t) { - Utils.notNull(array); + AssertUtils.notNull(array); List arrayList = new ArrayList(array.length + 1); if (index == 0) { arrayList.add(t); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java index 66cdb0beeb..3818ec45f4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.controller; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.entity.ExternalLink; import org.apache.streampark.console.core.service.ExternalLinkService; @@ -70,7 +70,7 @@ public RestResponse create(@Valid ExternalLink externalLink) { @PostMapping("/update") @RequiresPermissions("externalLink:update") public RestResponse update(@Valid ExternalLink externalLink) { - Utils.notNull(externalLink.getId(), "The link id cannot be null"); + AssertUtils.notNull(externalLink.getId(), "The link id cannot be null"); externalLinkService.update(externalLink); return RestResponse.success(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java index dc0f23c63c..1694d9bdb0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java @@ -20,6 +20,7 @@ import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.util.CommonUtils; import org.apache.streampark.console.base.util.WebUtils; @@ -228,7 +229,7 @@ public String getMavenArgs() { try { Process process = Runtime.getRuntime().exec(mvn + " --version"); process.waitFor(); - Utils.required(process.exitValue() == 0); + AssertUtils.required(process.exitValue() == 0); useWrapper = false; } catch (Exception ignored) { log.warn("try using user-installed maven failed, now use maven-wrapper."); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index d6359340b7..aabde27cfe 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -24,8 +24,8 @@ import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.SystemPropertyUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.bean.MavenConfig; import org.apache.streampark.console.core.entity.FlinkEnv; @@ -104,7 +104,7 @@ private void initInternalConfig(Environment springEnv) { .forEach( key -> { InternalOption config = InternalConfigHolder.getConfig(key); - Utils.notNull(config); + AssertUtils.notNull(config); InternalConfigHolder.set(config, springEnv.getProperty(key, config.classType())); }); @@ -172,7 +172,7 @@ public synchronized void storageInitialize(StorageType storageType) { // 2. upload jar. // 2.1) upload client jar File client = WebUtils.getAppClientDir(); - Utils.required( + AssertUtils.required( client.exists() && client.listFiles().length > 0, client.getAbsolutePath().concat(" is not exists or empty directory ")); @@ -189,7 +189,7 @@ public synchronized void storageInitialize(StorageType storageType) { WebUtils.getAppLibDir() .listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern())); - Utils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist"); + AssertUtils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist"); String appShims = workspace.APP_SHIMS(); fsOperator.delete(appShims); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java index b08b0d2cff..acb9b255bb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java @@ -17,8 +17,8 @@ package org.apache.streampark.console.core.service.alert.impl; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.ThreadUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.AlertException; import org.apache.streampark.console.base.util.SpringContextUtils; import org.apache.streampark.console.core.bean.AlertConfigWithParams; @@ -100,7 +100,7 @@ public boolean alert(AlertConfigWithParams params, AlertTemplate alertTemplate) try { Class notifyServiceClass = getAlertServiceImpl(alertType); - Utils.notNull(notifyServiceClass); + AssertUtils.notNull(notifyServiceClass); boolean alertRes = SpringContextUtils.getBean(notifyServiceClass) .doAlert(params, alertTemplate); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 1d4edd7cdb..ad9b350fee 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.FileUtils; import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; @@ -169,7 +170,7 @@ public boolean buildApplication(@Nonnull Application app, ApplicationLog applica FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false); if (app.isFlinkSqlJob()) { FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql; - Utils.notNull(flinkSql); + AssertUtils.notNull(flinkSql); app.setDependency(flinkSql.getDependency()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java index 8f3b8d05bb..a4e1a91b96 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java @@ -17,8 +17,8 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -107,7 +107,7 @@ public synchronized void update(Application application, Boolean latest) { if (application.isFlinkSqlJob()) { // get effect config ApplicationConfig effectiveConfig = getEffective(application.getId()); - if (Utils.isEmpty(application.getConfig())) { + if (AssertUtils.isEmpty(application.getConfig())) { if (effectiveConfig != null) { effectiveService.delete(application.getId(), EffectiveType.CONFIG); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 6ed4df915a..3d68231746 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -27,6 +27,7 @@ import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.HdfsOperator; import org.apache.streampark.common.fs.LfsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.PropertiesUtils; @@ -1510,7 +1511,7 @@ public void starting(Application application) { @Transactional(rollbackFor = {Exception.class}) public void start(Application appParam, boolean auto) throws Exception { final Application application = getById(appParam.getId()); - Utils.notNull(application); + AssertUtils.notNull(application); if (!application.isCanBeStart()) { throw new ApiAlertException("[StreamPark] The application cannot be started repeatedly."); } @@ -1605,7 +1606,7 @@ public void start(Application appParam, boolean auto) throws Exception { } } else if (application.isFlinkSqlJob()) { FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false); - Utils.notNull(flinkSql); + AssertUtils.notNull(flinkSql); // 1) dist_userJar String sqlDistJar = serviceHelper.getSqlClientJar(flinkEnv); // 2) appConfig @@ -1632,7 +1633,7 @@ public void start(Application appParam, boolean auto) throws Exception { } AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); - Utils.notNull(buildPipeline); + AssertUtils.notNull(buildPipeline); BuildResult buildResult = buildPipeline.getBuildResult(); if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java index b01aed743d..ecfa29131a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ExternalLink; import org.apache.streampark.console.core.enums.PlaceholderType; @@ -75,7 +75,7 @@ public void delete(Long linkId) { @Override public List render(Long appId) { Application app = applicationService.getById(appId); - Utils.notNull(app, "Application doesn't exist"); + AssertUtils.notNull(app, "Application doesn't exist"); List externalLink = this.list(); if (externalLink != null && !externalLink.isEmpty()) { // Render the placeholder @@ -109,10 +109,10 @@ private boolean check(ExternalLink params) { if (result == null) { return true; } - Utils.required( + AssertUtils.required( !result.getBadgeName().equals(params.getBadgeName()), String.format("The name: %s is already existing.", result.getBadgeName())); - Utils.required( + AssertUtils.required( !result.getLinkUrl().equals(params.getLinkUrl()), String.format("The linkUrl: %s is already existing.", result.getLinkUrl())); return false; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java index f03af2ae0d..1633926aae 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.RestRequest; @@ -172,11 +173,11 @@ public void removeApp(Long appId) { @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) public void rollback(Application application) { FlinkSql sql = getCandidate(application.getId(), CandidateType.HISTORY); - Utils.notNull(sql); + AssertUtils.notNull(sql); try { // check and backup current job FlinkSql effectiveSql = getEffective(application.getId(), false); - Utils.notNull(effectiveSql); + AssertUtils.notNull(effectiveSql); // rollback history sql backUpService.rollbackFlinkSql(application, sql); } catch (Exception e) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index 4101163eb8..ba72d4331f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -20,9 +20,9 @@ import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.ThreadUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.ResponseCode; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; @@ -122,7 +122,7 @@ public RestResponse create(Project project) { @Transactional(rollbackFor = {Exception.class}) public boolean update(Project projectParam) { Project project = getById(projectParam.getId()); - Utils.notNull(project); + AssertUtils.notNull(project); ApiAlertException.throwIfFalse( project.getTeamId().equals(projectParam.getTeamId()), "Team can't be changed, update project failed."); @@ -166,7 +166,7 @@ public boolean update(Project projectParam) { @Transactional(rollbackFor = {Exception.class}) public boolean delete(Long id) { Project project = getById(id); - Utils.notNull(project); + AssertUtils.notNull(project); LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Application::getProjectId, id); long count = applicationService.count(queryWrapper); @@ -246,7 +246,7 @@ public void build(Long id) throws Exception { @Override public List modules(Long id) { Project project = getById(id); - Utils.notNull(project); + AssertUtils.notNull(project); BuildState buildState = BuildState.of(project.getBuildState()); if (BuildState.SUCCESSFUL.equals(buildState)) { File appHome = project.getDistHome(); @@ -373,7 +373,7 @@ public List> listConf(Project project) { } List> list = new ArrayList<>(); File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName())); - Utils.notNull(files); + AssertUtils.notNull(files); for (File item : files) { eachFile(item, list, true); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java index 9dae7dc84f..449676fdbc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java @@ -19,6 +19,7 @@ import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ExecutionMode; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.common.util.ThreadUtils; @@ -123,8 +124,8 @@ public void expire(Long appId) { private void clearExpire(Savepoint entity) { FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId()); Application application = applicationService.getById(entity.getAppId()); - Utils.notNull(flinkEnv); - Utils.notNull(application); + AssertUtils.notNull(flinkEnv); + AssertUtils.notNull(application); String numRetainedKey = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(); String numRetainedFromDynamicProp = @@ -277,7 +278,7 @@ public String getSavePointPath(Application appParam) throws Exception { // 3.1) At the remote mode, request the flink webui interface to get the savepoint path if (ExecutionMode.isRemoteMode(application.getExecutionMode())) { FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); - Utils.notNull( + AssertUtils.notNull( cluster, String.format( "The clusterId=%s cannot be find, maybe the clusterId is wrong or " @@ -440,7 +441,7 @@ private Map tryGetRestProps(Application application, FlinkCluste Map properties = new HashMap<>(); if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) { - Utils.notNull( + AssertUtils.notNull( cluster, String.format( "The clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.", @@ -461,7 +462,7 @@ private String getClusterId(Application application, FlinkCluster cluster) { return application.getJobName(); case KUBERNETES_NATIVE_SESSION: case YARN_SESSION: - Utils.notNull( + AssertUtils.notNull( cluster, String.format( "The %s clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.", diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java index f01b5df583..e9997af41a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.core.service.SqlCompleteService; import com.google.common.collect.Sets; @@ -185,7 +185,7 @@ public void buildTree(String word, int count, TreeNode buildWay) { nowStep = nowStep.get(nowChar).getNext(); loc += 1; } - Utils.notNull(preNode); + AssertUtils.notNull(preNode); preNode.setStop(); preNode.setCount(count); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java index 22d4e89ad9..d8ea8714e2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.enums.ExecutionMode; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -74,8 +74,9 @@ public class YarnQueueServiceImpl extends ServiceImpl page(YarnQueue yarnQueue, RestRequest request) { - Utils.notNull(yarnQueue, "Yarn queue query params mustn't be null."); - Utils.notNull(yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null."); + AssertUtils.notNull(yarnQueue, "Yarn queue query params mustn't be null."); + AssertUtils.notNull( + yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null."); Page page = MybatisPager.getPage(request); return this.baseMapper.findQueues(page, yarnQueue); } @@ -88,8 +89,8 @@ public IPage page(YarnQueue yarnQueue, RestRequest request) { @Override public ResponseResult checkYarnQueue(YarnQueue yarnQueue) { - Utils.notNull(yarnQueue, "Yarn queue mustn't be empty."); - Utils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null."); + AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be empty."); + AssertUtils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null."); ResponseResult responseResult = new ResponseResult<>(); @@ -211,8 +212,8 @@ public boolean existByTeamIdQueueLabel(Long teamId, String queueLabel) { @VisibleForTesting public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) { - Utils.notNull(yarnQueue, "Yarn queue mustn't be null."); - Utils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null."); + AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be null."); + AssertUtils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null."); YarnQueue queueFromDB = getById(yarnQueue.getId()); ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist."); return queueFromDB; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java index 3e95f7e5ed..ae7d195328 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -173,8 +173,10 @@ public void updateMember(Member member) { () -> new ApiAlertException( String.format("The member [id=%s] not found", member.getId()))); - Utils.required(oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed."); - Utils.required(oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed."); + AssertUtils.required( + oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed."); + AssertUtils.required( + oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed."); Optional.ofNullable(roleService.getById(member.getRoleId())) .orElseThrow( () -> diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java index 584f157a12..69d6c74f38 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -79,7 +79,7 @@ public User findByName(String username) { public IPage page(User user, RestRequest request) { Page page = MybatisPager.getPage(request); IPage resPage = this.baseMapper.findUserDetail(page, user); - Utils.notNull(resPage); + AssertUtils.notNull(resPage); if (resPage.getTotal() == 0) { resPage.setRecords(Collections.emptyList()); } @@ -196,7 +196,7 @@ public List getNoTokenUser() { @Override public void setLastTeam(Long teamId, Long userId) { User user = getById(userId); - Utils.notNull(user); + AssertUtils.notNull(user); user.setLastTeamId(teamId); this.baseMapper.updateById(user); } @@ -204,7 +204,7 @@ public void setLastTeam(Long teamId, Long userId) { @Override public void clearLastTeam(Long userId, Long teamId) { User user = getById(userId); - Utils.notNull(user); + AssertUtils.notNull(user); if (!teamId.equals(user.getLastTeamId())) { return; } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java index 4c6144a368..b146d281b3 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.hbase.source; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.flink.connector.function.RunningFunction; import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction; import org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction; @@ -42,8 +42,8 @@ public DataStreamSource getDataStream( HBaseResultFunction resultFunction, RunningFunction runningFunc) { - Utils.notNull(queryFunction, "queryFunction must not be null"); - Utils.notNull(resultFunction, "resultFunction must not be null"); + AssertUtils.notNull(queryFunction, "queryFunction must not be null"); + AssertUtils.notNull(resultFunction, "resultFunction must not be null"); HBaseSourceFunction sourceFunction = new HBaseSourceFunction<>(property, queryFunction, resultFunction, runningFunc, null); return context.getJavaEnv().addSource(sourceFunction); diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java index 8f96349785..636ddf5d31 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java @@ -17,8 +17,8 @@ package org.apache.streampark.flink.connector.jdbc.sink; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.ConfigUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.flink.connector.function.TransformFunction; import org.apache.streampark.flink.connector.jdbc.internal.JdbcSinkFunction; import org.apache.streampark.flink.core.scala.StreamingContext; @@ -55,7 +55,7 @@ public JdbcJavaSink sql(TransformFunction func) { } public DataStreamSink sink(DataStream dataStream) { - Utils.notNull(sqlFunc, "transformFunction can not be null"); + AssertUtils.notNull(sqlFunc, "transformFunction can not be null"); if (this.jdbc == null) { this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), alias); } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java index e3f36d34cf..083f6d44d0 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java @@ -17,8 +17,8 @@ package org.apache.streampark.flink.connector.jdbc.source; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.ConfigUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.flink.connector.function.RunningFunction; import org.apache.streampark.flink.connector.function.SQLQueryFunction; import org.apache.streampark.flink.connector.function.SQLResultFunction; @@ -54,8 +54,8 @@ public DataStreamSource getDataStream( SQLResultFunction resultFunction, RunningFunction runningFunc) { - Utils.notNull(queryFunction, "queryFunction must not be null"); - Utils.notNull(resultFunction, "resultFunction must not be null"); + AssertUtils.notNull(queryFunction, "queryFunction must not be null"); + AssertUtils.notNull(resultFunction, "resultFunction must not be null"); if (this.jdbc == null) { this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), alias); } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala index 0e2cab8826..6186711ae6 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.jdbc.source -import org.apache.streampark.common.util.Utils +import org.apache.streampark.common.util.ConfigUtils import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction import org.apache.streampark.flink.core.scala.StreamingContext @@ -31,14 +31,15 @@ import scala.collection.Map object JdbcSource { - def apply(@(transient @param) property: Properties = new Properties())(implicit - ctx: StreamingContext): JdbcSource = new JdbcSource(ctx, property) + def apply(alias: String = "", properties: Properties = new Properties())(implicit + ctx: StreamingContext): JdbcSource = new JdbcSource(ctx, alias, properties) {} } class JdbcSource( @(transient @param) val ctx: StreamingContext, - property: Properties = new Properties()) { + alias: String, + property: Properties) { /** * @param sqlFun @@ -50,8 +51,11 @@ class JdbcSource( def getDataStream[R: TypeInformation]( sqlFun: R => String, fun: Iterable[Map[String, _]] => Iterable[R], - running: Unit => Boolean)(implicit jdbc: Properties = new Properties()): DataStream[R] = { - Utils.copyProperties(property, jdbc) + running: Unit => Boolean): DataStream[R] = { + val jdbc = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias) + if (property != null) { + jdbc.putAll(property) + } val mysqlFun = new JdbcSourceFunction[R](jdbc, sqlFun, fun, running) ctx.addSource(mysqlFun) } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java index dc49f621de..560de9eb0e 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.mongo.source; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.flink.connector.function.RunningFunction; import org.apache.streampark.flink.connector.mongo.function.MongoQueryFunction; import org.apache.streampark.flink.connector.mongo.function.MongoResultFunction; @@ -43,9 +43,9 @@ public DataStreamSource getDataStream( MongoResultFunction resultFunction, RunningFunction runningFunc) { - Utils.notNull(collectionName, "collectionName must not be null"); - Utils.notNull(queryFunction, "queryFunction must not be null"); - Utils.notNull(resultFunction, "resultFunction must not be null"); + AssertUtils.notNull(collectionName, "collectionName must not be null"); + AssertUtils.notNull(queryFunction, "queryFunction must not be null"); + AssertUtils.notNull(resultFunction, "resultFunction must not be null"); MongoSourceFunction sourceFunction = new MongoSourceFunction<>( collectionName, property, queryFunction, resultFunction, runningFunc, null); diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala index 795db7d093..61663845ae 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala @@ -18,7 +18,7 @@ package org.apache.streampark.flink.packer.docker import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder} -import org.apache.streampark.common.util.Utils +import org.apache.streampark.common.util.AssertUtils import com.github.dockerjava.api.DockerClient import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientConfig, HackDockerClient} @@ -61,7 +61,7 @@ object DockerRetriever { /** set docker-host for kata */ def setDockerHost(): Unit = { val dockerhost: String = InternalConfigHolder.get(CommonConfig.DOCKER_HOST) - if (Utils.notEmpty(dockerhost)) { + if (AssertUtils.isNotEmpty(dockerhost)) { val dockerHostUri: URI = new URI(dockerhost) dockerHttpClientBuilder.dockerHost(dockerHostUri) }