Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry for ingestion Resources #356

Merged
merged 22 commits into from
Jan 25, 2024
Merged

Retry for ingestion Resources #356

merged 22 commits into from
Jan 25, 2024

Conversation

asaharn
Copy link
Member

@asaharn asaharn commented Jan 9, 2024

Pull Request Description

Added retry mechanism for the cases when no storage is returned.

errortrace:

ERROR: Query termination received for [id=96300c8c-6de5-46bc-816e-ce9e78bd2e53, runId=722c6239-67b9-4182-b3b4-3c33c1e37f69], with exception: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in call
    raise e
  File "/databricks/spark/python/pyspark/sql/utils.py", line 114, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/Workspace/Repos/aisfenix/adx-ingest/src/euvdb/jobs_uc.py", line 388, in adx_batch_write_gtr
    fut.result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Workspace/Repos/aisfenix/adx-ingest/src/euvdb/jobs_uc.py", line 430, in write_batch_date_gtr
    self.adx.write(adx_table_name, adx_df, adx_props, write_mode)
  File "/Workspace/Repos/aisfenix/adx-ingest/src/euvdb/clients_uc.py", line 268, in write
    .save()
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/readwriter.py", line 1461, in save
    self._jwrite.save()
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptions/captured.py", line 188, in deco
    return f(*a, **kw)
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1732.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 251.0 failed 1 times, most recent failure: Lost task 0.0 in stage 251.0 (TID 13845) (10.231.232.29 executor driver): java.lang.RuntimeException: Failed to allocate temporary storage
	at com.microsoft.kusto.spark.utils.ContainerProvider.processContainerResults(ContainerProvider.scala:94)
	at com.microsoft.kusto.spark.utils.ContainerProvider.refresh(ContainerProvider.scala:83)
	at com.microsoft.kusto.spark.utils.ContainerProvider.getContainer(ContainerProvider.scala:47)
	at com.microsoft.kusto.spark.utils.ExtendedKustoClient.getTempBlobForIngestion(ExtendedKustoClient.scala:114)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.createBlobWriter(KustoWriter.scala:249)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRows(KustoWriter.scala:331)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoKusto(KustoWriter.scala:190)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestToTemporaryTableByWorkers(KustoWriter.scala:236)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoTempTbl(KustoWriter.scala:171)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7(KustoWriter.scala:137)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7$adapted(KustoWriter.scala:137)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1086)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1086)
	at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2999)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:897)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:900)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:795)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3588)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3519)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3506)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3506)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1516)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1516)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1516)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3835)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3747)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3735)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1240)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1228)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2959)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2942)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2980)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2999)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:3024)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1086)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:448)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1084)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:137)
	at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:49)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:49)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:82)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:79)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:272)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:274)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:498)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:201)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1113)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:151)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:447)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:271)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:245)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:266)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:251)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:316)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:312)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:441)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:251)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:372)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:251)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:203)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:200)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:336)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:956)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:424)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Failed to allocate temporary storage
	at com.microsoft.kusto.spark.utils.ContainerProvider.processContainerResults(ContainerProvider.scala:94)
	at com.microsoft.kusto.spark.utils.ContainerProvider.refresh(ContainerProvider.scala:83)
	at com.microsoft.kusto.spark.utils.ContainerProvider.getContainer(ContainerProvider.scala:47)
	at com.microsoft.kusto.spark.utils.ExtendedKustoClient.getTempBlobForIngestion(ExtendedKustoClient.scala:114)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.createBlobWriter(KustoWriter.scala:249)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRows(KustoWriter.scala:331)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoKusto(KustoWriter.scala:190)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestToTemporaryTableByWorkers(KustoWriter.scala:236)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoTempTbl(KustoWriter.scala:171)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7(KustoWriter.scala:137)
	at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7$adapted(KustoWriter.scala:137)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1086)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1086)
	at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2999)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:897)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:900)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:795)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

---

Future Release Comment

[Add description of your change, to include in the next release]
[Delete any or all irrelevant sections, e.g. if your change does not warrant a release comment at all]

Breaking Changes:

  • None

Features:

  • None

Fixes:

  • None

Copy link

github-actions bot commented Jan 9, 2024

Test Results

0 tests   0 ✅  0s ⏱️
0 suites  0 💤
0 files    0 ❌

Results for commit 674b6a9.

♻️ This comment has been updated with latest results.

@asaharn asaharn marked this pull request as ready for review January 15, 2024 12:24
@asaharn asaharn requested a review from ohadbitt January 15, 2024 12:51
@asaharn asaharn changed the base branch from feature/GitHubCI to master January 16, 2024 07:53
@@ -75,17 +75,20 @@ class ContainerProvider(val client: ExtendedKustoClient, val clusterAlias: Strin
storageUris(roundRobinIdx)
}
} else {
Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
Copy link
Contributor

@ohadbitt ohadbitt Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not throw RuntimeException create a new exception or use something that extends Exception as this is a recoverable exception- Also to me this looks like a bug in our Java SDK - because this means we returned here without error and got empty resources

@@ -0,0 +1,36 @@
name: Build Java
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this related to the PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with these changes we are moving away from AzCi to github actions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just next time - separate PRs


val kustoOperationResult = new KustoOperationResult(readTestSource("storage-result-empty.json"), "v1")
val mockDmClient = mock[Client]
// (mockDmClient.execute(_: String, _: String, _: ClientRequestProperties)).expects(*, *, *).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

val retryException: Predicate[Throwable] = (e: Throwable) =>
(e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException])
private val retryConfig = buildRetryConfig((e: Throwable) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private val retryConfig = buildRetryConfig((e: Throwable) =>
private val retryConfigExportContainers = buildRetryConfig((e: Throwable) =>

private val retryConfig = buildRetryConfig((e: Throwable) =>
(e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException]))
private val retryConfigIngestionRefresh = buildRetryConfig(e => e.isInstanceOf[RuntimeException])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we change it to a different type of exception change it here as well and add here all the SDK exceptions as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still here RuntimeException - need to be only the IngestionServiceException, IngestionClientException

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like outdated code.
see here

private val retryConfigIngestionRefresh = buildRetryConfig((e : Throwable) => (e.isInstanceOf[NoStorageContainersException]

@asaharn asaharn requested a review from ohadbitt January 23, 2024 11:31
Copy link
Contributor

@ohadbitt ohadbitt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to also bump the sdk version

@@ -27,7 +28,7 @@ class ContainerProviderTest extends AnyFlatSpec with Matchers with MockFactory {

maybeExceptionThrown match {
case Some(exception) => Mockito.when(mockIngestionResourceManager.getShuffledContainers)
.thenThrow(exception).thenThrow(exception).
.thenThrow(exception, exception, exception, exception, exception, exception, exception, exception). //throws exception 8 times due to retry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny syntax :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gave good amount of time to simplify this but no luck 😐

private val retryConfig = buildRetryConfig((e: Throwable) =>
(e.isInstanceOf[IngestionServiceException] && !e.asInstanceOf[KustoDataExceptionBase].isPermanent) ||
(e.isInstanceOf[DataServiceException] && ExceptionUtils.getRootCause(e).isInstanceOf[HttpHostConnectException]))
private val retryConfigIngestionRefresh = buildRetryConfig(e => e.isInstanceOf[RuntimeException])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still here RuntimeException - need to be only the IngestionServiceException, IngestionClientException

@asaharn asaharn merged commit a317093 into master Jan 25, 2024
4 of 5 checks passed
asaharn added a commit that referenced this pull request Feb 7, 2024
* Retry for ingestion Resources (#356)
* Remove irrelvant tests
* Fix ignored tests
* Add build file
* Added Retry for refresh ingestion
* Code changes for TCs
* ghCI change
---------

Co-authored-by: Ramachandran A G <[email protected]>

* code change for 2.4

* mockito syntax change for scala 2.11

---------

Co-authored-by: Ramachandran A G <[email protected]>
@ag-ramachandran ag-ramachandran deleted the feature/retryTempStorage branch January 26, 2025 07:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants