diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
new file mode 100644
index 00000000..0f858fa3
--- /dev/null
+++ b/.github/workflows/build.yml
@@ -0,0 +1,36 @@
+name: Build Java
+on:
+ push:
+ branches: [ '**' ]
+ pull_request:
+ branches: [ '**' ]
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ name: Setup Java 8, build and run tests
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup java
+ uses: actions/setup-java@v3
+ with:
+ distribution: 'adopt'
+ java-version: 8
+ cache: 'maven'
+ - name: Run the Maven verify phase
+ env:
+ kustoAadAuthorityID: ${{ secrets.TENANT_ID }}
+ kustoAadAppSecret: ${{ secrets.APP_SECRET }}
+ kustoDatabase: ${{ secrets.DATABASE }}
+ kustoCluster: ${{ secrets.CLUSTER }}
+ SecretPath: ${{ secrets.SECRET_PATH }}
+ kustoAadAppId: ${{secrets.APP_ID}}
+ run: mvn clean verify -DkustoAadAppId=${{ secrets.APP_ID }} -DkustoAadAuthorityID=${{ secrets.TENANT_ID }} -DkustoAadAppSecret=${{ secrets.APP_SECRET }} -DkustoDatabase=${{ secrets.DATABASE }} -DkustoCluster=${{ secrets.CLUSTER }} -DSecretPath=${{ secrets.SECRET_PATH }}
+ - name: Publish Unit Test Results
+ uses: EnricoMi/publish-unit-test-result-action@v2
+ if: always()
+ with:
+ files: |
+ data/target/surefire-reports/*.xml
+ ingest/target/surefire-reports/*.xml
+ - name: Upload coverage to Codecov
+ uses: codecov/codecov-action@v2
\ No newline at end of file
diff --git a/connector/pom.xml b/connector/pom.xml
index b5938701..e3bc91ae 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -309,7 +309,7 @@
org.mockito
- mockito-all
+ mockito-inline
${mockito.version}
test
diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/exceptions/NoStorageContainersException.scala b/connector/src/main/scala/com/microsoft/kusto/spark/exceptions/NoStorageContainersException.scala
new file mode 100644
index 00000000..a68918ce
--- /dev/null
+++ b/connector/src/main/scala/com/microsoft/kusto/spark/exceptions/NoStorageContainersException.scala
@@ -0,0 +1,5 @@
+package com.microsoft.kusto.spark.exceptions
+
+case class NoStorageContainersException(msg: String) extends scala.Exception(msg) {
+
+}
\ No newline at end of file
diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala b/connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala
index 4fc49315..6d02f883 100644
--- a/connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala
+++ b/connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala
@@ -1,10 +1,12 @@
package com.microsoft.kusto.spark.utils
import com.microsoft.azure.kusto.data.exceptions.{DataServiceException, KustoDataExceptionBase}
-import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException
+import com.microsoft.azure.kusto.ingest.exceptions.{IngestionClientException, IngestionServiceException}
+import com.microsoft.kusto.spark.exceptions.NoStorageContainersException
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import io.github.resilience4j.core.IntervalFunction
-import io.github.resilience4j.retry.RetryConfig
+import io.github.resilience4j.retry.{Retry, RetryConfig}
+import io.vavr.CheckedFunction0
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.http.conn.HttpHostConnectException
@@ -21,13 +23,13 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
private var lastRefresh: Instant = Instant.now(Clock.systemUTC())
private val className = this.getClass.getSimpleName
private val maxCommandsRetryAttempts = 8
- private val retryConfig = buildRetryConfig
-
- private def buildRetryConfig = {
- val retryException: Predicate[Throwable] = (e: Throwable) =>
- (e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
- (e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException])
+ private val retryConfigExportContainers = buildRetryConfig((e: Throwable) =>
+ (e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
+ (e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException]))
+ private val retryConfigIngestionRefresh = buildRetryConfig((e : Throwable) => (e.isInstanceOf[NoStorageContainersException]
+ || e.isInstanceOf[IngestionClientException] || e.isInstanceOf[IngestionServiceException]))
+ private def buildRetryConfig (retryException : Predicate[Throwable]) = {
val sleepConfig = IntervalFunction.ofExponentialRandomBackoff(
ExtendedKustoClient.BaseIntervalMs, IntervalFunction.DEFAULT_MULTIPLIER,
IntervalFunction.DEFAULT_RANDOMIZATION_FACTOR, ExtendedKustoClient.MaxRetryIntervalMs)
@@ -62,7 +64,7 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
private def refresh(exportContainer:Boolean=false):ContainerAndSas = {
if(exportContainer) {
- Try(client.executeDM(command, None, Some(retryConfig))) match {
+ Try(client.executeDM(command, None, Some(retryConfigExportContainers))) match {
case Success(res) =>
val storage = res.getPrimaryResults.getData.asScala.map(row => {
val parts = row.get(0).toString.split('?')
@@ -75,23 +77,28 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
storageUris(roundRobinIdx)
}
} else {
- Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
- case Success(res) =>
- val storage = res.asScala.map(row => {
- ContainerAndSas(row.getContainer.getBlobContainerUrl, s"${row.getSas}")
- })
- processContainerResults(storage)
- case Failure(exception) =>
- KDSU.reportExceptionAndThrow(className, exception,
- "Error querying for create tempstorage", clusterAlias, shouldNotThrow = storageUris.nonEmpty)
- storageUris(roundRobinIdx)
- }
+ val retryExecute: CheckedFunction0[ContainerAndSas] = Retry.decorateCheckedSupplier(Retry.of("refresh ingestion resources", retryConfigIngestionRefresh), () => {
+ Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
+ case Success(res) =>
+ val storage = res.asScala.map(row => {
+ ContainerAndSas(row.getContainer.getBlobContainerUrl, s"${row.getSas}")
+ })
+ processContainerResults(storage)
+ case Failure(exception) =>
+ KDSU.reportExceptionAndThrow(className, exception,
+ "Error querying for create tempstorage", clusterAlias, shouldNotThrow = storageUris.nonEmpty)
+ storageUris(roundRobinIdx)
+ }
+ })
+ retryExecute.apply()
}
}
private def processContainerResults(storage: mutable.Buffer[ContainerAndSas]): ContainerAndSas = {
if (storage.isEmpty) {
- KDSU.reportExceptionAndThrow(className, new RuntimeException("Failed to allocate temporary storage"), "writing to Kusto", clusterAlias)
+ KDSU.reportExceptionAndThrow(className,
+ NoStorageContainersException("No storage containers received. Failed to allocate temporary storage"),
+ "writing to Kusto", clusterAlias)
}
KDSU.logInfo(className, s"Got ${storage.length} storage SAS with command :'$command'. from service 'ingest-$clusterAlias'")
lastRefresh = Instant.now(Clock.systemUTC())
diff --git a/connector/src/test/scala/com/microsoft/kusto/spark/ExtendedKustoClientTests.scala b/connector/src/test/scala/com/microsoft/kusto/spark/ExtendedKustoClientTests.scala
index 5ddd774d..04660f1f 100644
--- a/connector/src/test/scala/com/microsoft/kusto/spark/ExtendedKustoClientTests.scala
+++ b/connector/src/test/scala/com/microsoft/kusto/spark/ExtendedKustoClientTests.scala
@@ -7,7 +7,7 @@ import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasink.{SparkIngestionProperties, WriteOptions}
import com.microsoft.kusto.spark.utils.ExtendedKustoClient
import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.mockito.Matchers.any
+import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, times, verify}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
diff --git a/connector/src/test/scala/com/microsoft/kusto/spark/KustoSinkStreamingE2E.scala b/connector/src/test/scala/com/microsoft/kusto/spark/KustoSinkStreamingE2E.scala
index 3c10592b..302611f9 100644
--- a/connector/src/test/scala/com/microsoft/kusto/spark/KustoSinkStreamingE2E.scala
+++ b/connector/src/test/scala/com/microsoft/kusto/spark/KustoSinkStreamingE2E.scala
@@ -40,7 +40,7 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {
sc.stop()
}
- private val kustoConnectionOptions: KustoConnectionOptions = KustoTestUtils.getSystemTestOptions
+ private lazy val kustoConnectionOptions: KustoConnectionOptions = KustoTestUtils.getSystemTestOptions
val csvPath: String = System.getProperty("path", "connector/src/test/resources/TestData/csv")
val customSchema: StructType = new StructType().add("colA", StringType, nullable = true).add("colB", IntegerType, nullable = true)
diff --git a/connector/src/test/scala/com/microsoft/kusto/spark/KustoSourceE2E.scala b/connector/src/test/scala/com/microsoft/kusto/spark/KustoSourceE2E.scala
index 22bfd28e..da84e8d0 100644
--- a/connector/src/test/scala/com/microsoft/kusto/spark/KustoSourceE2E.scala
+++ b/connector/src/test/scala/com/microsoft/kusto/spark/KustoSourceE2E.scala
@@ -107,7 +107,7 @@ class KustoSourceE2E extends AnyFlatSpec with BeforeAndAfterAll {
// Create a new table.
KDSU.logInfo("e2e","running KustoConnector")
val crp = new ClientRequestProperties
- crp.setTimeoutInMilliSec(2000)
+ crp.setTimeoutInMilliSec(60000)
val ingestByTags = new java.util.ArrayList[String]
val tag = "dammyTag"
ingestByTags.add(tag)
diff --git a/connector/src/test/scala/com/microsoft/kusto/spark/utils/ContainerProviderTest.scala b/connector/src/test/scala/com/microsoft/kusto/spark/utils/ContainerProviderTest.scala
index 980d9769..c77260ac 100644
--- a/connector/src/test/scala/com/microsoft/kusto/spark/utils/ContainerProviderTest.scala
+++ b/connector/src/test/scala/com/microsoft/kusto/spark/utils/ContainerProviderTest.scala
@@ -1,41 +1,79 @@
package com.microsoft.kusto.spark.utils
-import com.microsoft.azure.kusto.data.KustoOperationResult
-import com.microsoft.azure.kusto.data.exceptions.DataServiceException
-import org.apache.commons.lang3.exception.ExceptionUtils
-import org.apache.http.HttpHost
-import org.apache.http.conn.HttpHostConnectException
+import com.azure.storage.blob.BlobContainerClient
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
+import com.microsoft.azure.kusto.data.{Client, KustoOperationResult}
+import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException
+import com.microsoft.azure.kusto.ingest.resources.ContainerWithSas
+import com.microsoft.azure.kusto.ingest.{IngestionResourceManager, QueuedIngestClient}
+import com.microsoft.kusto.spark.exceptions.NoStorageContainersException
+import org.mockito.Mockito
import org.scalamock.scalatest.MockFactory
-import org.scalatest.Ignore
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
-import java.net.{ConnectException, InetAddress}
+import java.util.Collections
+import scala.collection.JavaConverters.seqAsJavaListConverter
import scala.io.Source
-import java.io.IOException
-@Ignore
+
class ContainerProviderTest extends AnyFlatSpec with Matchers with MockFactory {
- val CACHE_EXPIRY_SEC = 2
+ val CACHE_EXPIRY_SEC = 30
val SLEEP_TIME_SEC = 10
+
+ private def createExtendedKustoMockClient(hasEmptyResults: Boolean = false, mockDmClient: Client,
+ maybeExceptionThrown: Option[Throwable] = None, getRMOccurances : Int = 1): ExtendedKustoClient = {
+ val mockIngestClient: QueuedIngestClient = mock[QueuedIngestClient]
+ val mockIngestionResourceManager: IngestionResourceManager = Mockito.mock[IngestionResourceManager](classOf[IngestionResourceManager])
+
+ maybeExceptionThrown match {
+ case Some(exception) => Mockito.when(mockIngestionResourceManager.getShuffledContainers)
+ .thenThrow(exception, exception, exception, exception, exception, exception, exception, exception). //throws exception 8 times due to retry
+ thenAnswer(_ => List(getMockContainerWithSas(1), getMockContainerWithSas(2)).asJava)
+ case None => if (hasEmptyResults) {
+ Mockito.when(mockIngestionResourceManager.getShuffledContainers).
+ thenAnswer(_ => Collections.EMPTY_LIST)
+ } else {
+ Mockito.when(mockIngestionResourceManager.getShuffledContainers).
+ thenAnswer(_ => List(getMockContainerWithSas(1), getMockContainerWithSas(2)).asJava)
+ }
+ }
+ // Expecting getResourceManager to be called maxCommandsRetryAttempts i.e. 8 times.
+ mockIngestClient.getResourceManager _ expects() repeated getRMOccurances times() returning mockIngestionResourceManager
+ // Unfortunately we cannot Mock this class as there is a member variable that is a val and cannot be mocked
+ new ExtendedKustoClient(new ConnectionStringBuilder("https://somecluster.eastus.kusto.windows.net/"),
+ new ConnectionStringBuilder("https://ingest-somecluster.eastus.kusto.windows.net"), "somecluster") {
+ override lazy val ingestClient: QueuedIngestClient = mockIngestClient
+ override lazy val dmClient: Client = mockDmClient
+ }
+ }
+
+ private def getMockContainerWithSas(index: Int): ContainerWithSas = {
+ val mockResultsOne: ContainerWithSas = Mockito.mock[ContainerWithSas](classOf[ContainerWithSas])
+ val blobResultsOne: BlobContainerClient = Mockito.mock[BlobContainerClient](classOf[BlobContainerClient])
+ Mockito.when(blobResultsOne.getBlobContainerUrl).thenAnswer(_ => s"https://sacc$index.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0")
+ Mockito.when(mockResultsOne.getSas).thenAnswer(_ => "?sv=2018-03-28&sr=c&sp=rw")
+ Mockito.when(mockResultsOne.getContainer).thenAnswer(_ => blobResultsOne)
+ mockResultsOne
+ }
// happy path
- "ContainerProvider" should "return a container" in {
- val extendedMockClient = mock[ExtendedKustoClient]
- val extendedMockClientEmptyFail = mock[ExtendedKustoClient]
+ "ContainerProvider returns a container" should "from RM" in {
val kustoOperationResult = new KustoOperationResult(readTestSource("storage-result.json"), "v1")
+ val mockDmClient = mock[Client]
+// (mockDmClient.execute(_: String, _: String, _: ClientRequestProperties)).expects(*, *, *) returning(kustoOperationResult)
+
val clusterAlias = "ingest-cluster"
val command = ".create tempstorage"
- val ingestProviderEntryCreator = (c: ContainerAndSas) => c
/*
Invoke and test
*/
- val containerProvider = new ContainerProvider(extendedMockClient, clusterAlias, command,CACHE_EXPIRY_SEC)
- extendedMockClient.executeDM _ expects(command, None, *) noMoreThanOnce() returning kustoOperationResult
- containerProvider.getContainer.containerUrl should(not be "")
+ val extendedMockClient = createExtendedKustoMockClient(mockDmClient = mockDmClient)
+ val containerProvider = new ContainerProvider(extendedMockClient, clusterAlias, command, CACHE_EXPIRY_SEC)
+ containerProvider.getContainer.containerUrl should (not be "")
Some(containerProvider.getContainer.containerUrl) should contain oneOf
("https://sacc1.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0",
"https://sacc2.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0")
- containerProvider.getContainer.sas should(not be "")
+ containerProvider.getContainer.sas should (not be "")
/* Second test that returns from cache. The test will fail if the client is invoked again as expectation is to call once */
containerProvider.getContainer.containerUrl should (not be "")
@@ -43,90 +81,55 @@ class ContainerProviderTest extends AnyFlatSpec with Matchers with MockFactory {
("https://sacc1.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0",
"https://sacc2.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0")
containerProvider.getContainer.sas should (not be "")
-
-
/* Third test where the cache expires and the invocation throws an exception */
Thread.sleep(SLEEP_TIME_SEC * 1000) // Milliseconds
- extendedMockClient.executeDM _ expects(command, None, *) throws new DataServiceException(clusterAlias,"Cannot create temp storage",false)
containerProvider.getContainer.containerUrl should (not be "")
Some(containerProvider.getContainer.containerUrl) should contain oneOf
("https://sacc1.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0",
"https://sacc2.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0")
containerProvider.getContainer.sas should (not be "")
+
// The case where storageUris.nonEmpty is false. This will throw the exception as there is nothing to give from the cache
- Thread.sleep(SLEEP_TIME_SEC * 1000) // Milliseconds
+ Thread.sleep((SLEEP_TIME_SEC * 2) * 1000) // Milliseconds
- extendedMockClientEmptyFail.executeDM _ expects(command, None, *) throws new DataServiceException(clusterAlias, "Cannot create temp storage", false)
- val emptyStorageContainerProvider = new ContainerProvider(extendedMockClientEmptyFail, clusterAlias, command,CACHE_EXPIRY_SEC)
+ val mockDmFailClient = mock[Client]
+ val extendedMockClientEmptyFail = createExtendedKustoMockClient(hasEmptyResults = true, mockDmClient = mockDmFailClient, getRMOccurances = 8)
+ val emptyStorageContainerProvider = new ContainerProvider(extendedMockClientEmptyFail, clusterAlias, command, CACHE_EXPIRY_SEC)
val caught =
- intercept[DataServiceException] { // Result type: Assertion
+ intercept[NoStorageContainersException] { // Result type: Assertion
emptyStorageContainerProvider.getContainer
- }
- assert(caught.getMessage.indexOf("Cannot create temp storage") != -1)
+ }
+ assert(caught.getMessage.indexOf("No storage containers received. Failed to allocate temporary storage") != -1)
}
"ContainerProvider" should "fail in the case when call succeeds but returns no storage" in {
- val extendedMockClient = mock[ExtendedKustoClient]
- val kustoOperationResult = new KustoOperationResult(readTestSource("storage-result-empty.json"), "v1")
val clusterAlias = "ingest-cluster"
val command = ".create tempstorage"
- val ingestProviderEntryCreator = (c: ContainerAndSas) => c
+
+ val kustoOperationResult = new KustoOperationResult(readTestSource("storage-result-empty.json"), "v1")
+ val mockDmClient = mock[Client]
+ /*
+ Invoke and test
+ */
+ val extendedMockClient = createExtendedKustoMockClient(hasEmptyResults = true, mockDmClient = mockDmClient, getRMOccurances = 8)
/*
Invoke and test. In this case the call succeeds but returns no storage. This will hit the empty storage block
*/
val containerProvider = new ContainerProvider(extendedMockClient, clusterAlias, command, CACHE_EXPIRY_SEC)
- extendedMockClient.executeDM _ expects(command, None, *) noMoreThanOnce() returning kustoOperationResult
- the[RuntimeException] thrownBy containerProvider.getContainer should have message "Failed to allocate temporary storage"
+ the[NoStorageContainersException] thrownBy containerProvider.getContainer should have message "No storage containers received. Failed to allocate temporary storage"
}
"ContainerProvider" should "retry and return a container in case of a temporary HTTPException" in {
- val extendedMockClient = mock[ExtendedKustoClient]
- val extendedMockNoRootExceptionFail = mock[ExtendedKustoClient]
- val extendedMockClientIOExceptionFail = mock[ExtendedKustoClient]
- val kustoOperationResult = new KustoOperationResult(readTestSource("storage-result.json"), "v1")
val clusterAlias = "ingest-cluster"
val command = ".get ingestion resources"
/*
Invoke and test
*/
+ val mockDmClient = mock[Client]
+ val extendedMockClient = createExtendedKustoMockClient(mockDmClient = mockDmClient,
+ maybeExceptionThrown = Some(new IngestionServiceException("IOError when trying to retrieve CloudInfo")), getRMOccurances = 8)
val containerProvider = new ContainerProvider(extendedMockClient, clusterAlias, command, CACHE_EXPIRY_SEC)
-
- // HttpHostConnectException -> IOException -> ConnectException This is the only hierarchy available
- extendedMockClient.executeDM _ expects(command, None, *) throws new DataServiceException(clusterAlias,
- "IOError when trying to retrieve CloudInfo",new HttpHostConnectException(
- new IOException(new ConnectException("Connection timed out")),
- HttpHost.create("kustocluster.centralus.kusto.windows.net"),
- InetAddress.getLoopbackAddress),true) once() returning kustoOperationResult
- // The first call will fail with a HttpHostConnectException. The second call will succeed
- containerProvider.getContainer.containerUrl should (not be "")
- Some(containerProvider.getContainer.containerUrl) should contain oneOf
- ("https://sacc1.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0",
- "https://sacc2.blob.core.windows.net/20230430-ingestdata-e5c334ee145d4b4-0")
- containerProvider.getContainer.sas should (not be "")
-
- // Test where the root exception below it is empty, It will fail
- extendedMockNoRootExceptionFail.executeDM _ expects(command, None, *) throws new DataServiceException(clusterAlias,
- "No root exception", false)
- val noRootExceptionContainerProvider = new ContainerProvider(extendedMockNoRootExceptionFail, clusterAlias, command,
- CACHE_EXPIRY_SEC)
- val caught =
- intercept[DataServiceException] { // Result type: Assertion
- noRootExceptionContainerProvider.getContainer
- }
- assert(caught.getMessage.indexOf("No root exception") != -1)
-
-
- // Test where the root exception below it is not a HttpHostConnectException. It will fail
- extendedMockClientIOExceptionFail.executeDM _ expects(command, None, *) throws new DataServiceException(clusterAlias,
- "No root exception", new IOException(new ConnectException("IOError when trying to retrieve CloudInfo")), true)
- val ioExceptionContainerProvider = new ContainerProvider(extendedMockClientIOExceptionFail, clusterAlias, command,
- CACHE_EXPIRY_SEC)
- val ioErrorCaught =
- intercept[DataServiceException] { // Result type: Assertion
- ioExceptionContainerProvider.getContainer
- }
- assert(ioErrorCaught.getMessage.indexOf("No root exception") != -1)
- assert(ExceptionUtils.getRootCause(ioErrorCaught).getMessage.indexOf("IOError when trying to retrieve CloudInfo") != -1)
+ the[IngestionServiceException] thrownBy containerProvider.getContainer should have message "IOError when trying to retrieve CloudInfo"
}
private def readTestSource(fileName: String): String = {
diff --git a/connector/src/test/scala/com/microsoft/kusto/spark/utils/KustoAzureFsSetupCacheTest.scala b/connector/src/test/scala/com/microsoft/kusto/spark/utils/KustoAzureFsSetupCacheTest.scala
index dfe128e3..dc8c2d8d 100644
--- a/connector/src/test/scala/com/microsoft/kusto/spark/utils/KustoAzureFsSetupCacheTest.scala
+++ b/connector/src/test/scala/com/microsoft/kusto/spark/utils/KustoAzureFsSetupCacheTest.scala
@@ -29,19 +29,23 @@ class KustoAzureFsSetupCacheTest extends AnyFunSuite {
}
test("testUpdateAndGetPrevNativeAzureFs") {
+ val now = Instant.now(Clock.systemUTC())
val dataToTest = Table(
- ("now", "expectedResult"),
+ ("now", "checkIfRefreshNeeded", "Scenario"),
// Initial set is false for the flag, but refresh
- (Instant.now(Clock.systemUTC()), true),
+ (now, true, "Initial set is false, refresh is needed"),
// The cache is expired, so it will be re-set.The checkIfRefreshNeeded will return false, but the state is already true.
- (Instant.now(Clock.systemUTC()).minus(3 * KustoConstants.SparkSettingsRefreshMinutes, ChronoUnit.MINUTES), true),
+ (now.minus(3 * KustoConstants.SparkSettingsRefreshMinutes, ChronoUnit.MINUTES), true,
+ "The cache is expired, so it will be re-set.The checkIfRefreshNeeded will return false, but the state is already true."),
// This will be within the cache interval and also the flag is set to true
- (Instant.now(Clock.systemUTC()).minus(KustoConstants.SparkSettingsRefreshMinutes / 2, ChronoUnit.MINUTES) , true),
+ (now.minus(KustoConstants.SparkSettingsRefreshMinutes / 2, ChronoUnit.MINUTES) , true,
+ "This will be within the cache interval and also the flag is set to true"),
)
- forAll(dataToTest) { (now: Instant, expectedResult: Boolean) =>
+ forAll(dataToTest) { (now: Instant, checkIfRefreshNeeded: Boolean, scenario:String) =>
val actualResult = KustoAzureFsSetupCache.updateAndGetPrevNativeAzureFs(now)
- actualResult shouldEqual expectedResult
+ assert(actualResult == checkIfRefreshNeeded, scenario)
+ actualResult shouldEqual checkIfRefreshNeeded
}
}
diff --git a/pom.xml b/pom.xml
index cb79f655..c03ef0a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
4.13.2
3.12.0
3.3.6
- 5.0.3
+ 5.0.4
1.8
1.8
@@ -35,7 +35,7 @@
3.5.1
3.0.1
3.0.0
- 1.10.19
+ 4.11.0
4.1.94.Final
UTF-8
1.0.39