Skip to content

Commit

Permalink
* Refactoring the code
Browse files Browse the repository at this point in the history
* Modified TCs
  • Loading branch information
asaharn committed Jan 15, 2024
1 parent 8890c24 commit 75b90d9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
private var lastRefresh: Instant = Instant.now(Clock.systemUTC())
private val className = this.getClass.getSimpleName
private val maxCommandsRetryAttempts = 8
private val retryConfig = buildRetryConfig

private def buildRetryConfig = {
val retryException: Predicate[Throwable] = (e: Throwable) =>
(e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException])
private val retryConfig = buildRetryConfig((e: Throwable) =>
(e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException]))
private val retryConfigIngestionRefresh = buildRetryConfig(e => e.isInstanceOf[RuntimeException])

private def buildRetryConfig (retryException : Predicate[Throwable]) = {
val sleepConfig = IntervalFunction.ofExponentialRandomBackoff(
ExtendedKustoClient.BaseIntervalMs, IntervalFunction.DEFAULT_MULTIPLIER,
IntervalFunction.DEFAULT_RANDOMIZATION_FACTOR, ExtendedKustoClient.MaxRetryIntervalMs)
Expand Down Expand Up @@ -76,15 +75,7 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
storageUris(roundRobinIdx)
}
} else {
//Q: Could be merged with the retry config of exportContainer but fails TCs due to retry's exception criteria
val sleepConfig = IntervalFunction.ofExponentialRandomBackoff(
ExtendedKustoClient.BaseIntervalMs, IntervalFunction.DEFAULT_MULTIPLIER,
IntervalFunction.DEFAULT_RANDOMIZATION_FACTOR, ExtendedKustoClient.MaxRetryIntervalMs)
val retry = Retry.of("refresh ingestion resources", RetryConfig.custom
.maxAttempts(maxCommandsRetryAttempts)
.intervalFunction(sleepConfig)
.retryOnException(e => e.isInstanceOf[RuntimeException]).build)
val retryExecute: CheckedFunction0[ContainerAndSas] = Retry.decorateCheckedSupplier(retry, () => {
val retryExecute: CheckedFunction0[ContainerAndSas] = Retry.decorateCheckedSupplier(Retry.of("refresh ingestion resources", retryConfigIngestionRefresh), () => {
Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
case Success(res) =>
val storage = res.asScala.map(row => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ContainerProviderTest extends AnyFlatSpec with Matchers with MockFactory {
val SLEEP_TIME_SEC = 10

private def createExtendedKustoMockClient(hasEmptyResults: Boolean = false, mockDmClient: Client,
maybeExceptionThrown: Option[Throwable] = None): ExtendedKustoClient = {
maybeExceptionThrown: Option[Throwable] = None, getRMOccurances : Int = 1): ExtendedKustoClient = {
val mockIngestClient: QueuedIngestClient = mock[QueuedIngestClient]
val mockIngestionResourceManager: IngestionResourceManager = Mockito.mock[IngestionResourceManager](classOf[IngestionResourceManager])

Expand All @@ -38,7 +38,7 @@ class ContainerProviderTest extends AnyFlatSpec with Matchers with MockFactory {
}
}
// Expecting getResourceManager to be called maxCommandsRetryAttempts i.e. 8 times.
mockIngestClient.getResourceManager _ expects() repeated 8 times() returning mockIngestionResourceManager
mockIngestClient.getResourceManager _ expects() repeated getRMOccurances times() returning mockIngestionResourceManager
// Unfortunately we cannot Mock this class as there is a member variable that is a val and cannot be mocked
new ExtendedKustoClient(new ConnectionStringBuilder("https://somecluster.eastus.kusto.windows.net/"),
new ConnectionStringBuilder("https://ingest-somecluster.eastus.kusto.windows.net"), "somecluster") {
Expand Down Expand Up @@ -92,7 +92,7 @@ class ContainerProviderTest extends AnyFlatSpec with Matchers with MockFactory {
Thread.sleep((SLEEP_TIME_SEC * 2) * 1000) // Milliseconds

val mockDmFailClient = mock[Client]
val extendedMockClientEmptyFail = createExtendedKustoMockClient(hasEmptyResults = true, mockDmClient = mockDmFailClient)
val extendedMockClientEmptyFail = createExtendedKustoMockClient(hasEmptyResults = true, mockDmClient = mockDmFailClient, getRMOccurances = 8)
val emptyStorageContainerProvider = new ContainerProvider(extendedMockClientEmptyFail, clusterAlias, command, CACHE_EXPIRY_SEC)
val caught =
intercept[RuntimeException] { // Result type: Assertion
Expand All @@ -112,7 +112,7 @@ class ContainerProviderTest extends AnyFlatSpec with Matchers with MockFactory {
/*
Invoke and test
*/
val extendedMockClient = createExtendedKustoMockClient(hasEmptyResults = true, mockDmClient = mockDmClient)
val extendedMockClient = createExtendedKustoMockClient(hasEmptyResults = true, mockDmClient = mockDmClient, getRMOccurances = 8)
/*
Invoke and test. In this case the call succeeds but returns no storage. This will hit the empty storage block
*/
Expand Down

0 comments on commit 75b90d9

Please sign in to comment.