Skip to content

Commit

Permalink
code change for 2.4
Browse files Browse the repository at this point in the history
  • Loading branch information
asaharn committed Jan 25, 2024
1 parent da2b994 commit 83dbb2c
Showing 1 changed file with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
private var lastRefresh: Instant = Instant.now(Clock.systemUTC())
private val className = this.getClass.getSimpleName
private val maxCommandsRetryAttempts = 8
private val retryConfigExportContainers = buildRetryConfig((e: Throwable) =>
private val retryConfigExportContainers = buildRetryConfig(JavaConverter.asJavaPredicate((e: Throwable) =>
(e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException]))
private val retryConfigIngestionRefresh = buildRetryConfig((e : Throwable) => (e.isInstanceOf[NoStorageContainersException]
|| e.isInstanceOf[IngestionClientException] || e.isInstanceOf[IngestionServiceException]))
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException])))
private val retryConfigIngestionRefresh = buildRetryConfig(JavaConverter.asJavaPredicate((e : Throwable) => (e.isInstanceOf[NoStorageContainersException]
|| e.isInstanceOf[IngestionClientException] || e.isInstanceOf[IngestionServiceException])))

private def buildRetryConfig (retryException : Predicate[Throwable]) = {
val sleepConfig = IntervalFunction.ofExponentialRandomBackoff(
Expand Down Expand Up @@ -77,19 +77,23 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
storageUris(roundRobinIdx)
}
} else {
val retryExecute: CheckedFunction0[ContainerAndSas] = Retry.decorateCheckedSupplier(Retry.of("refresh ingestion resources", retryConfigIngestionRefresh), () => {
Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
case Success(res) =>
val storage = res.asScala.map(row => {
ContainerAndSas(row.getContainer.getBlobContainerUrl, s"${row.getSas}")
})
processContainerResults(storage)
case Failure(exception) =>
KDSU.reportExceptionAndThrow(className, exception,
"Error querying for create tempstorage", clusterAlias, shouldNotThrow = storageUris.nonEmpty)
storageUris(roundRobinIdx)
val checkRetryFunc : CheckedFunction0[ContainerAndSas] = new CheckedFunction0[ContainerAndSas] {
override def apply(): ContainerAndSas = {
Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
case Success(res) =>
val storage = res.asScala.map(row => {
ContainerAndSas(row.getContainer.getBlobContainerUrl, s"${row.getSas}")
})
processContainerResults(storage)
case Failure(exception) =>
KDSU.reportExceptionAndThrow(className, exception,
"Error querying for create tempstorage", clusterAlias, shouldNotThrow = storageUris.nonEmpty)
storageUris(roundRobinIdx)
}
}
})
}

val retryExecute: CheckedFunction0[ContainerAndSas] = Retry.decorateCheckedSupplier(Retry.of("refresh ingestion resources", retryConfigIngestionRefresh), checkRetryFunc )
retryExecute.apply()
}
}
Expand Down

0 comments on commit 83dbb2c

Please sign in to comment.