Skip to content

Commit

Permalink
Retry for ingestion Resources (#356)
Browse files Browse the repository at this point in the history
* Remove irrelvant tests
* Fix ignored tests
* Add build file
* Added Retry for refresh ingestion
* Code changes for TCs
* ghCI change
---------

Co-authored-by: Ramachandran A G <[email protected]>
  • Loading branch information
asaharn and ag-ramachandran authored Jan 25, 2024
1 parent f327408 commit a317093
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 106 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Build Java
on:
push:
branches: [ '**' ]
pull_request:
branches: [ '**' ]
jobs:
build:
runs-on: ubuntu-latest
name: Setup Java 8, build and run tests
steps:
- uses: actions/checkout@v2
- name: Setup java
uses: actions/setup-java@v3
with:
distribution: 'adopt'
java-version: 8
cache: 'maven'
- name: Run the Maven verify phase
env:
kustoAadAuthorityID: ${{ secrets.TENANT_ID }}
kustoAadAppSecret: ${{ secrets.APP_SECRET }}
kustoDatabase: ${{ secrets.DATABASE }}
kustoCluster: ${{ secrets.CLUSTER }}
SecretPath: ${{ secrets.SECRET_PATH }}
kustoAadAppId: ${{secrets.APP_ID}}
run: mvn clean verify -DkustoAadAppId=${{ secrets.APP_ID }} -DkustoAadAuthorityID=${{ secrets.TENANT_ID }} -DkustoAadAppSecret=${{ secrets.APP_SECRET }} -DkustoDatabase=${{ secrets.DATABASE }} -DkustoCluster=${{ secrets.CLUSTER }} -DSecretPath=${{ secrets.SECRET_PATH }}
- name: Publish Unit Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
files: |
data/target/surefire-reports/*.xml
ingest/target/surefire-reports/*.xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
2 changes: 1 addition & 1 deletion connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.microsoft.kusto.spark.exceptions

case class NoStorageContainersException(msg: String) extends scala.Exception(msg) {

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.microsoft.kusto.spark.utils

import com.microsoft.azure.kusto.data.exceptions.{DataServiceException, KustoDataExceptionBase}
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException
import com.microsoft.azure.kusto.ingest.exceptions.{IngestionClientException, IngestionServiceException}
import com.microsoft.kusto.spark.exceptions.NoStorageContainersException
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import io.github.resilience4j.core.IntervalFunction
import io.github.resilience4j.retry.RetryConfig
import io.github.resilience4j.retry.{Retry, RetryConfig}
import io.vavr.CheckedFunction0
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.http.conn.HttpHostConnectException

Expand All @@ -21,13 +23,13 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
private var lastRefresh: Instant = Instant.now(Clock.systemUTC())
private val className = this.getClass.getSimpleName
private val maxCommandsRetryAttempts = 8
private val retryConfig = buildRetryConfig

private def buildRetryConfig = {
val retryException: Predicate[Throwable] = (e: Throwable) =>
(e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException])
private val retryConfigExportContainers = buildRetryConfig((e: Throwable) =>
(e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException]))
private val retryConfigIngestionRefresh = buildRetryConfig((e : Throwable) => (e.isInstanceOf[NoStorageContainersException]
|| e.isInstanceOf[IngestionClientException] || e.isInstanceOf[IngestionServiceException]))

private def buildRetryConfig (retryException : Predicate[Throwable]) = {
val sleepConfig = IntervalFunction.ofExponentialRandomBackoff(
ExtendedKustoClient.BaseIntervalMs, IntervalFunction.DEFAULT_MULTIPLIER,
IntervalFunction.DEFAULT_RANDOMIZATION_FACTOR, ExtendedKustoClient.MaxRetryIntervalMs)
Expand Down Expand Up @@ -62,7 +64,7 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin

private def refresh(exportContainer:Boolean=false):ContainerAndSas = {
if(exportContainer) {
Try(client.executeDM(command, None, Some(retryConfig))) match {
Try(client.executeDM(command, None, Some(retryConfigExportContainers))) match {
case Success(res) =>
val storage = res.getPrimaryResults.getData.asScala.map(row => {
val parts = row.get(0).toString.split('?')
Expand All @@ -75,23 +77,28 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
storageUris(roundRobinIdx)
}
} else {
Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
case Success(res) =>
val storage = res.asScala.map(row => {
ContainerAndSas(row.getContainer.getBlobContainerUrl, s"${row.getSas}")
})
processContainerResults(storage)
case Failure(exception) =>
KDSU.reportExceptionAndThrow(className, exception,
"Error querying for create tempstorage", clusterAlias, shouldNotThrow = storageUris.nonEmpty)
storageUris(roundRobinIdx)
}
val retryExecute: CheckedFunction0[ContainerAndSas] = Retry.decorateCheckedSupplier(Retry.of("refresh ingestion resources", retryConfigIngestionRefresh), () => {
Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
case Success(res) =>
val storage = res.asScala.map(row => {
ContainerAndSas(row.getContainer.getBlobContainerUrl, s"${row.getSas}")
})
processContainerResults(storage)
case Failure(exception) =>
KDSU.reportExceptionAndThrow(className, exception,
"Error querying for create tempstorage", clusterAlias, shouldNotThrow = storageUris.nonEmpty)
storageUris(roundRobinIdx)
}
})
retryExecute.apply()
}
}

private def processContainerResults(storage: mutable.Buffer[ContainerAndSas]): ContainerAndSas = {
if (storage.isEmpty) {
KDSU.reportExceptionAndThrow(className, new RuntimeException("Failed to allocate temporary storage"), "writing to Kusto", clusterAlias)
KDSU.reportExceptionAndThrow(className,
NoStorageContainersException("No storage containers received. Failed to allocate temporary storage"),
"writing to Kusto", clusterAlias)
}
KDSU.logInfo(className, s"Got ${storage.length} storage SAS with command :'$command'. from service 'ingest-$clusterAlias'")
lastRefresh = Instant.now(Clock.systemUTC())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasink.{SparkIngestionProperties, WriteOptions}
import com.microsoft.kusto.spark.utils.ExtendedKustoClient
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.mockito.Matchers.any
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, times, verify}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KustoSinkStreamingE2E extends AnyFlatSpec with BeforeAndAfterAll {

sc.stop()
}
private val kustoConnectionOptions: KustoConnectionOptions = KustoTestUtils.getSystemTestOptions
private lazy val kustoConnectionOptions: KustoConnectionOptions = KustoTestUtils.getSystemTestOptions

val csvPath: String = System.getProperty("path", "connector/src/test/resources/TestData/csv")
val customSchema: StructType = new StructType().add("colA", StringType, nullable = true).add("colB", IntegerType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class KustoSourceE2E extends AnyFlatSpec with BeforeAndAfterAll {
// Create a new table.
KDSU.logInfo("e2e","running KustoConnector")
val crp = new ClientRequestProperties
crp.setTimeoutInMilliSec(2000)
crp.setTimeoutInMilliSec(60000)
val ingestByTags = new java.util.ArrayList[String]
val tag = "dammyTag"
ingestByTags.add(tag)
Expand Down
Loading

0 comments on commit a317093

Please sign in to comment.