diff --git a/connector/scalastyle_config.xml b/connector/scalastyle_config.xml
index d81b1596..c1d0c77d 100644
--- a/connector/scalastyle_config.xml
+++ b/connector/scalastyle_config.xml
@@ -86,7 +86,7 @@
-
+
diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala
index c38a6b4b..1dc2b4e0 100644
--- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala
+++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala
@@ -103,6 +103,7 @@ object FinalizeHelper {
tmpTableName,
allowMerge = false,
allowRebuild = false),
+ "alterMergePolicyCommand",
crp)
// Drop dedup tags
if (writeOptions.ensureNoDupBlobs) {
@@ -112,6 +113,7 @@ object FinalizeHelper {
crp,
writeOptions.timeout,
s"drops extents from temp table '$tmpTableName' ",
+ "extentsDrop",
writeOptions.requestId)
}
client.moveExtents(
diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala
index 4ee34119..2eb00adc 100644
--- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala
+++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala
@@ -101,6 +101,7 @@ object KustoWriter {
.executeEngine(
tableCoordinates.database,
generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get),
+ "schemaShow",
crp)
.getPrimaryResults
@@ -131,6 +132,7 @@ object KustoWriter {
.executeEngine(
tableCoordinates.database,
generateTableGetSchemaAsRowsCommand(writeOptions.userTempTableName.get),
+ "schemaShow",
crp)
.getPrimaryResults
.count() <= 0 ||
@@ -449,7 +451,7 @@ object KustoWriter {
streamingClient: ManagedStreamingIngestClient,
inputStreamLastIdx: Int): Unit = {
KDSU.retryApplyFunction(
- () => {
+ i => {
val inputStream = new ByteArrayInputStream(bytes, 0, inputStreamLastIdx)
// The SDK will compress the stream by default.
val streamSourceInfo = new StreamSourceInfo(inputStream)
@@ -463,7 +465,8 @@ object KustoWriter {
s"details: ${ingestionStatus.details}, " +
s"activityId: ${ingestionStatus.activityId}, " +
s"errorCode: ${ingestionStatus.errorCode}, " +
- s"errorCodeString: ${ingestionStatus.errorCodeString}" +
+ s"errorCodeString: ${ingestionStatus.errorCodeString}," +
+ s"retry: $i" +
"}")
})
case Failure(e: Throwable) =>
@@ -583,7 +586,7 @@ object KustoWriter {
}
// write the data here
val partitionsResult = KDSU.retryApplyFunction(
- () => {
+ i => {
Try(
ingestClient.ingestFromBlob(
new BlobSourceInfo(blobUri + sas, size, UUID.randomUUID()),
@@ -599,7 +602,7 @@ object KustoWriter {
KDSU.reportExceptionAndThrow(
className,
e,
- "Queueing blob for ingestion in partition " +
+ s"Queueing blob for ingestion, retry number '$i', in partition " +
s"$partitionIdString for requestId: '${parameters.writeOptions.requestId}")
val blobUrlWithSas =
s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}"
diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoReader.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoReader.scala
index c16376fe..d4169306 100644
--- a/connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoReader.scala
+++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoReader.scala
@@ -84,6 +84,7 @@ private[kusto] object KustoReader {
.executeEngine(
request.kustoCoordinates.database,
filteredQuery,
+ "executeQuery",
request.clientRequestProperties.orNull)
.getPrimaryResults
@@ -329,6 +330,7 @@ private[kusto] class KustoReader(client: ExtendedKustoClient) {
.executeEngine(
request.kustoCoordinates.database,
exportCommand,
+ "exportPartitionToBlob",
request.clientRequestProperties.orNull)
.getPrimaryResults
KDSU.verifyAsyncCommandCompletion(
diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala b/connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala
index 6d0ddd79..2400558a 100644
--- a/connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala
+++ b/connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala
@@ -83,7 +83,7 @@ class ContainerProvider(
private def refresh(exportContainer: Boolean = false): ContainerAndSas = {
if (exportContainer) {
- Try(client.executeDM(command, None, Some(retryConfigExportContainers))) match {
+ Try(client.executeDM(command, None, "refreshContainers", Some(retryConfigExportContainers))) match {
case Success(res) =>
val storage = res.getPrimaryResults.getData.asScala.map(row => {
val parts = row.get(0).toString.split('?')
diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala b/connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala
index 23828636..fba92345 100644
--- a/connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala
+++ b/connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala
@@ -9,30 +9,14 @@ import com.microsoft.azure.kusto.data._
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase
import com.microsoft.azure.kusto.ingest.resources.ResourceWithSas
-import com.microsoft.azure.kusto.ingest.{
- IngestClientFactory,
- ManagedStreamingIngestClient,
- QueuedIngestClient,
- StreamingIngestClient
-}
+import com.microsoft.azure.kusto.ingest.{IngestClientFactory, ManagedStreamingIngestClient, QueuedIngestClient, StreamingIngestClient}
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasink.KustoWriter.DelayPeriodBetweenCalls
-import com.microsoft.kusto.spark.datasink.{
- SinkTableCreationMode,
- SparkIngestionProperties,
- WriteMode,
- WriteOptions
-}
-import com.microsoft.kusto.spark.datasource.{
- TransientStorageCredentials,
- TransientStorageParameters
-}
+import com.microsoft.kusto.spark.datasink.{SinkTableCreationMode, SparkIngestionProperties, WriteMode, WriteOptions}
+import com.microsoft.kusto.spark.datasource.{TransientStorageCredentials, TransientStorageParameters}
import com.microsoft.kusto.spark.exceptions.{FailedOperationException, RetriesExhaustedException}
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
-import com.microsoft.kusto.spark.utils.KustoConstants.{
- MaxCommandsRetryAttempts,
- MaxSleepOnMoveExtentsMillis
-}
+import com.microsoft.kusto.spark.utils.KustoConstants.{MaxCommandsRetryAttempts, MaxSleepOnMoveExtentsMillis}
import com.microsoft.kusto.spark.utils.KustoDataSourceUtils.extractSchemaFromResultTable
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import io.github.resilience4j.core.IntervalFunction
@@ -44,7 +28,7 @@ import org.apache.log4j.Level
import org.apache.spark.sql.types.StructType
import java.time.Instant
-import java.util.StringJoiner
+import java.util.{StringJoiner, UUID}
import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration
@@ -96,10 +80,10 @@ class ExtendedKustoClient(
tableSchemaBuilder.add(s"['${field.name}']:$fieldType")
}
tmpTableSchema = tableSchemaBuilder.toString
- executeEngine(database, generateTableCreateCommand(table, tmpTableSchema), crp)
+ executeEngine(database, generateTableCreateCommand(table, tmpTableSchema), "tableCreate", crp)
if (writeOptions.writeMode == WriteMode.KustoStreaming) {
- executeEngine(database, generateTableAlterStreamIngestionCommand(table), crp)
- executeEngine(database, generateClearStreamingIngestionCacheCommand(table), crp)
+ executeEngine(database, generateTableAlterStreamIngestionCommand(table),"enableStreamingPolicy", crp)
+ executeEngine(database, generateClearStreamingIngestionCacheCommand(table), "clearStreamingPolicyCache", crp)
}
}
} else {
@@ -116,10 +100,11 @@ class ExtendedKustoClient(
if (writeOptions.writeMode == WriteMode.Transactional) {
// Create a temporary table with the kusto or dataframe parsed schema with retention and delete set to after the
// write operation times out. Engine recommended keeping the retention although we use auto delete.
- executeEngine(database, generateTempTableCreateCommand(tmpTableName, tmpTableSchema), crp)
+ executeEngine(database, generateTempTableCreateCommand(tmpTableName, tmpTableSchema), "tableCreate", crp)
val targetTableBatchingPolicyRes = executeEngine(
database,
generateTableShowIngestionBatchingPolicyCommand(table),
+ "showBatchingPolicy",
crp).getPrimaryResults
targetTableBatchingPolicyRes.next()
val targetTableBatchingPolicy =
@@ -130,8 +115,9 @@ class ExtendedKustoClient(
generateTableAlterIngestionBatchingPolicyCommand(
tmpTableName,
targetTableBatchingPolicy),
+ "alterBatchingPolicy",
crp)
- executeDM(generateRefreshBatchingPolicyCommand(database, tmpTableName), Some(crp))
+ executeDM(generateRefreshBatchingPolicyCommand(database, tmpTableName), Some(crp), "refreshBatchingPolicy")
}
if (configureRetentionPolicy) {
executeEngine(
@@ -143,9 +129,10 @@ class ExtendedKustoClient(
durationFormat,
true),
recoverable = false),
+ "alterRetentionPolicy",
crp)
val instant = Instant.now.plusSeconds(writeOptions.autoCleanupTime.toSeconds)
- executeEngine(database, generateTableAlterAutoDeletePolicy(tmpTableName, instant), crp)
+ executeEngine(database, generateTableAlterAutoDeletePolicy(tmpTableName, instant), "alterAutoDelete", crp)
}
KDSU.logInfo(
myName,
@@ -154,15 +141,50 @@ class ExtendedKustoClient(
}
def executeDM(
- command: String,
- maybeCrp: Option[ClientRequestProperties],
- retryConfig: Option[RetryConfig] = None): KustoOperationResult = {
+ command: String,
+ maybeCrp: Option[ClientRequestProperties],
+ activityName: String,
+ retryConfig: Option[RetryConfig] = None): KustoOperationResult = {
KDSU.retryApplyFunction(
- () => dmClient.execute(ExtendedKustoClient.DefaultDb, command, maybeCrp.orNull),
+ i => {
+ dmClient.execute(ExtendedKustoClient.DefaultDb, command, newIncrementedCrp(maybeCrp, activityName, i))
+ },
retryConfig.getOrElse(this.retryConfig),
"Execute DM command with retries")
}
+ def executeEngine(
+ database: String,
+ command: String,
+ activityName: String,
+ crp: ClientRequestProperties,
+ retryConfig: Option[RetryConfig] = None): KustoOperationResult = {
+ KDSU.retryApplyFunction(
+ i => {
+ engineClient.execute(database, command, newIncrementedCrp(Some(crp), activityName, i))
+ },
+ retryConfig.getOrElse(this.retryConfig),
+ "Execute engine command with retries")
+ }
+
+
+ private def newIncrementedCrp(maybeCrp: Option[ClientRequestProperties],
+ activityName:String,
+ iteration: Int): ClientRequestProperties = {
+ var prefix: Option[String] = None
+ if (maybeCrp.isDefined){
+ val currentId = maybeCrp.get.getClientRequestId
+ if (StringUtils.isNoneBlank(currentId)){
+ prefix = Some(currentId + ";")
+ }
+ }
+
+ val id = s"${prefix.getOrElse("")}$activityName;${UUID.randomUUID()};$iteration"
+ val clientRequestProperties = new ClientRequestProperties
+ clientRequestProperties.setClientRequestId(id)
+ clientRequestProperties
+ }
+
def getTempBlobForIngestion: ContainerAndSas = {
ingestContainersContainerProvider.getContainer
}
@@ -198,12 +220,12 @@ class ExtendedKustoClient(
writeOptions: WriteOptions,
sinkStartTime: Instant): Unit = {
val extentsCountQuery =
- executeEngine(database, generateExtentsCountCommand(tmpTableName), crp).getPrimaryResults
+ executeEngine(database, generateExtentsCountCommand(tmpTableName), "countExtents", crp).getPrimaryResults
extentsCountQuery.next()
val extentsCount = extentsCountQuery.getInt(0)
if (extentsCount > writeOptions.minimalExtentsCountForSplitMerge) {
val nodeCountQuery =
- executeEngine(database, generateNodesCountCommand(), crp).getPrimaryResults
+ executeEngine(database, generateNodesCountCommand(),"nodesCount", crp).getPrimaryResults
nodeCountQuery.next()
val nodeCount = nodeCountQuery.getInt(0)
moveExtentsWithRetries(
@@ -260,6 +282,7 @@ class ExtendedKustoClient(
timeRange,
if (batchSize.isEmpty) None else Some(curBatchSize),
useMaterializedViewFlag),
+ "extentsMove",
crp).getPrimaryResults
val operationResult = KDSU.verifyAsyncCommandCompletion(
engineClient,
@@ -275,6 +298,7 @@ class ExtendedKustoClient(
executeEngine(
database,
generateShowOperationDetails(operationResult.get.getString(0)),
+ "operationsDetailsShow",
crp).getPrimaryResults)
if (res.get.count() == 0) {
failed = handleNoResults(totalAmount, extentsProcessed, database, tmpTableName, crp)
@@ -381,7 +405,7 @@ class ExtendedKustoClient(
s"result'${totalAmount - extentsProcessed}' Please open issue if you see this trace. At: https://github" +
".com/Azure/azure-kusto-spark/issues")
val extentsLeftRes =
- executeEngine(database, generateExtentsCountCommand(tmpTableName), crp).getPrimaryResults
+ executeEngine(database, generateExtentsCountCommand(tmpTableName), "extentsCount",crp).getPrimaryResults
extentsLeftRes.next()
extentsLeftRes.getInt(0) != 0
@@ -427,13 +451,14 @@ class ExtendedKustoClient(
executeEngine(
database,
generateIsTableMaterializedViewSourceCommand(targetTable),
+ "isTableMV",
crp).getPrimaryResults
isDestinationTableMaterializedViewSourceResult.next()
val isDestinationTableMaterializedViewSource: Boolean =
isDestinationTableMaterializedViewSourceResult.getLong(0) > 0
if (isDestinationTableMaterializedViewSource) {
val res =
- executeEngine(database, generateIsTableEngineV3(targetTable), crp).getPrimaryResults
+ executeEngine(database, generateIsTableEngineV3(targetTable), "isTableV3", crp).getPrimaryResults
res.next()
res.getBoolean(0)
} else {
@@ -472,34 +497,36 @@ class ExtendedKustoClient(
executeEngine(
database,
CslCommandsGenerator.generateFetchTableIngestByTagsCommand(table),
+ "fetchTableExtentsTags",
crp).getPrimaryResults
}
def retryAsyncOp(
- database: String,
- cmd: String,
- crp: ClientRequestProperties,
- timeout: FiniteDuration,
- cmdName: String,
- requestId: String): Option[KustoResultSetTable] = {
+ database: String,
+ cmd: String,
+ crp: ClientRequestProperties,
+ timeout: FiniteDuration,
+ cmdToTrace: String,
+ cmdName: String,
+ requestId: String): Option[KustoResultSetTable] = {
KDSU.retryApplyFunction(
- () => {
- val operation = executeEngine(database, cmd, crp).getPrimaryResults
+ i => {
+ val operation = executeEngine(database, cmd, cmdToTrace, newIncrementedCrp(Some(crp), cmdName, i)).getPrimaryResults
KDSU.verifyAsyncCommandCompletion(
engineClient,
database,
operation,
samplePeriod = KustoConstants.DefaultPeriodicSamplePeriod,
timeout,
- cmdName,
+ cmdToTrace,
myName,
requestId)
},
retryConfigAsyncOp,
- cmdName)
+ cmdToTrace)
}
- private def buildRetryConfig = {
+ def buildRetryConfig = {
val sleepConfig = IntervalFunction.ofExponentialRandomBackoff(
ExtendedKustoClient.BaseIntervalMs,
IntervalFunction.DEFAULT_MULTIPLIER,
@@ -534,7 +561,7 @@ class ExtendedKustoClient(
crp: ClientRequestProperties): Unit = {
try {
- executeEngine(database, generateTableDropCommand(tmpTableName), crp)
+ executeEngine(database, generateTableDropCommand(tmpTableName), "tableDrop", crp)
KDSU.logInfo(myName, s"Temporary table '$tmpTableName' deleted successfully")
} catch {
case exception: Exception =>
@@ -547,18 +574,6 @@ class ExtendedKustoClient(
}
}
- def executeEngine(
- database: String,
- command: String,
- crp: ClientRequestProperties,
- retryConfig: Option[RetryConfig] = None): KustoOperationResult = {
- // TODO - CID should reflect retries
- KDSU.retryApplyFunction(
- () => engineClient.execute(database, command, crp),
- retryConfig.getOrElse(this.retryConfig),
- "Execute engine command with retries")
- }
-
private[kusto] def setMappingOnStagingTableIfNeeded(
stagingTableSparkIngestionProperties: SparkIngestionProperties,
database: String,
@@ -573,7 +588,7 @@ class ExtendedKustoClient(
val mappingKind = mapping.getIngestionMappingKind.toString
val cmd = generateShowTableMappingsCommand(originalTable, mappingKind)
val mappings =
- executeEngine(stagingTableIngestionProperties.getDatabaseName, cmd, crp).getPrimaryResults
+ executeEngine(stagingTableIngestionProperties.getDatabaseName, cmd, "tableMappingsShow",crp).getPrimaryResults
var found = false
while (mappings.next && !found) {
@@ -584,7 +599,7 @@ class ExtendedKustoClient(
mappingKind,
mappingReferenceName,
policyJson)
- executeEngine(stagingTableIngestionProperties.getDatabaseName, cmd, crp)
+ executeEngine(stagingTableIngestionProperties.getDatabaseName, cmd, "tableMappingCreate", crp)
found = true
}
}
diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala b/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala
index 90219313..3edb75b5 100644
--- a/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala
+++ b/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala
@@ -221,7 +221,7 @@ object KustoDataSourceUtils {
clientRequestProperties: Option[ClientRequestProperties]): KustoSchema = {
KustoResponseDeserializer(
client
- .executeEngine(database, query, clientRequestProperties.orNull)
+ .executeEngine(database, query, "schemaGet", clientRequestProperties.orNull)
.getPrimaryResults).getSchema
}
@@ -540,10 +540,14 @@ object KustoDataSourceUtils {
}
}
- def retryApplyFunction[T](func: () => T, retryConfig: RetryConfig, retryName: String): T = {
+ def retryApplyFunction[T](func: Int => T, retryConfig: RetryConfig, retryName: String): T = {
val retry = Retry.of(retryName, retryConfig)
val f: CheckedFunction0[T] = new CheckedFunction0[T]() {
- override def apply(): T = func()
+ var retry = 0
+ override def apply(): T = {
+ retry += 1
+ func(retry)
+ }
}
retry.executeCheckedSupplier(f)