Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Azure/azure-kusto-spark i…
Browse files Browse the repository at this point in the history
…nto 2.4
  • Loading branch information
asaharn committed Dec 2, 2022
2 parents f7512b6 + 4ab9276 commit a132cd1
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,28 +307,24 @@ object KustoWriter {
if (!props.getFlushImmediately && flushImmediately) {
props.setFlushImmediately(true)
}

// write the data here
val partitionsResult = KDSU.retryApplyFunction(() => {
Try(
ingestClient.ingestFromBlob(new BlobSourceInfo(blobUri + sas, size, UUID.randomUUID()), props)
) match {
case Success(x) => x
case Failure(e: Throwable) =>
KDSU.reportExceptionAndThrow(myName, e, "Queueing blob for ingestion in partition " +
s"$partitionIdString for requestId: '${parameters.writeOptions.requestId}")
null
}
}, this.retryConfig, "Ingest into Kusto")
if (parameters.writeOptions.isTransactionalMode) {
val blobUuid = UUID.randomUUID()
val blobPath = blobUri + sas
val blobSourceInfo = new BlobSourceInfo(blobPath, size, blobUuid)
partitionsResults.add(
PartitionResult(KDSU.retryFunction(() => {
Try(
ingestClient.ingestFromBlob(blobSourceInfo, props)
) match {
case Success(x) => x
case Failure(e: Throwable) =>
KDSU.reportExceptionAndThrow(myName, e, "Queueing blob for ingestion in partition " +
s"$partitionIdString for requestId: '${parameters.writeOptions.requestId}")
null
}
}, this.retryConfig, "Ingest into Kusto"),
partitionId))
KDSU.logInfo(myName, s"Queued blob for ingestion in partition $partitionIdString " +
s"for requestId: '${parameters.writeOptions.requestId}, blobUuid: $blobUuid")
PartitionResult(partitionsResult, partitionId))
}
KDSU.logInfo(myName, s"Queued blob for ingestion in partition $partitionIdString " +
s"for requestId: '${parameters.writeOptions.requestId}")
}
val kustoClient = KustoClientCache.getClient(parameters.coordinates.clusterUrl, parameters.authentication,
parameters.coordinates.ingestionUrl, parameters.coordinates.clusterAlias)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ private[kusto] object CslCommandsGenerator {
| distinct ExtentId"""
}

def generateTableMoveExtentsAsyncCommand(sourceTableName: String, destinationTableName: String, batchSize: Int,
def generateTableMoveExtentsAsyncCommand(sourceTableName: String, destinationTableName: String, batchSize: Option[Int],
isDestinationTableMaterializedViewSource: Boolean = false): String
= {
val withClause = if (batchSize.isDefined) s"""with(extentsShowFilteringRuntimePolicy='{"MaximumResultsCount":${batchSize.get}}')""" else ""
val setNewIngestionTime: String = if (isDestinationTableMaterializedViewSource) "with(SetNewIngestionTime=true)" else ""
s""".move async extents to table $destinationTableName $setNewIngestionTime <|
.show table $sourceTableName extents with(extentsShowFilteringRuntimePolicy='{"MaximumResultsCount":$batchSize}');
.show table $sourceTableName extents $withClause;
"""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.log4j.Level
import org.apache.spark.sql.types.StructType
import org.json.{JSONArray, JSONObject}

import java.net.SocketTimeoutException
import java.time.Instant
import java.util.StringJoiner
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -197,27 +196,30 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
}
}

def moveExtentsWithRetries(batchSize: Int, totalAmount: Int, database: String, tmpTableName: String, targetTable: String,
def moveExtentsWithRetries(batchSize: Option[Int], totalAmount: Int, database: String, tmpTableName: String, targetTable: String,
crp: ClientRequestProperties, writeOptions: WriteOptions): Unit = {
var extentsProcessed = 0
var retry = 0
var curBatchSize = batchSize
var curBatchSize = batchSize.getOrElse(0)
var delayPeriodBetweenCalls = DelayPeriodBetweenCalls
var consecutiveSuccesses = 0
val useMaterializedViewFlag = shouldUseMaterializedViewFlag(database, targetTable, crp)
val firstMoveRetries = writeOptions.maxRetriesOnMoveExtents
val secondMovesRetries = Math.max(10, writeOptions.maxRetriesOnMoveExtents)
while (extentsProcessed < totalAmount) {
var error: Object = null
var res: Option[KustoResultSetTable] = None
var failed = false
// Execute move batch and keep any transient error for handling
try {
val operation = executeEngine(database, generateTableMoveExtentsAsyncCommand(tmpTableName,
targetTable, curBatchSize, useMaterializedViewFlag), crp).getPrimaryResults
targetTable, if (batchSize.isEmpty) None else Some(curBatchSize), useMaterializedViewFlag), crp).getPrimaryResults
val operationResult = KDSU.verifyAsyncCommandCompletion(engineClient, database, operation, samplePeriod =
KustoConstants
.DefaultPeriodicSamplePeriod, writeOptions.timeout, s"move extents to destination table '$targetTable' ",
myName,
writeOptions.requestId)
// TODO: use count over the show operations
res = Some(executeEngine(database, generateShowOperationDetails(operationResult.get.getString(0)), crp)
.getPrimaryResults)
if (res.get.count() == 0) {
Expand All @@ -228,25 +230,17 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
}
}
} catch {
// We don't check for the shouldRetry or permanent errors because we know
// The issue is not with syntax or non-existing tables, it can only be transient
// issues that might be solved in retries even if engine reports them as permanent
case ex: FailedOperationException =>
if (ex.getResult.isDefined) {
val failedResult: KustoResultSetTable = ex.getResult.get
if (!failedResult.getBoolean("ShouldRetry")) {
throw ex
}

error = failedResult.getString("Status")
failed = true
} else {
throw ex
error = ex.getResult.get.getString("Status")
}
failed = true
case ex: KustoDataExceptionBase =>
if (ex.getCause.isInstanceOf[SocketTimeoutException] || !ex.isPermanent) {
error = ExceptionUtils.getStackTrace(ex)
failed = true
} else {
throw ex
}
}

// When some node fails the move - it will put "failed" as the target extent id
Expand All @@ -259,9 +253,13 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
if (failed) {
consecutiveSuccesses = 0
retry += 1
if (retry > writeOptions.maxRetriesOnMoveExtents) {
throw RetriesExhaustedException(s"Failed to move extents after $retry tries")
}
val extentsProcessedErrorString = if (extentsProcessed > 0) s"and ${extentsProcessed} were moved" else ""
if (extentsProcessed > 0) {
// This is not the first move command
if (retry > secondMovesRetries)
throw RetriesExhaustedException(s"Failed to move extents after $retry tries$extentsProcessedErrorString.")
} else if (retry > firstMoveRetries)
throw RetriesExhaustedException(s"Failed to move extents after $retry tries$extentsProcessedErrorString.")

// Lower batch size, increase delay
val params = handleRetryFail(curBatchSize, retry, delayPeriodBetweenCalls, targetTable, error)
Expand All @@ -271,12 +269,14 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
} else {
consecutiveSuccesses += 1
if (consecutiveSuccesses > 2) {
curBatchSize = Math.min(curBatchSize * 2, batchSize)
// After curBatchSize size has decreased - we can lower it again according to original batch size
curBatchSize = Math.min(curBatchSize * 2, batchSize.getOrElse(curBatchSize * 2))
}

extentsProcessed += res.get.count()
val batchSizeString = if (batchSize.isDefined) s"maxBatch: $curBatchSize," else ""
KDSU.logDebug(myName, s"Moving extents batch succeeded at retry: $retry," +
s" maxBatch: $curBatchSize, consecutive successfull batches: $consecutiveSuccesses, successes this " +
s" $batchSizeString consecutive successfull batches: $consecutiveSuccesses, successes this " +
s"batch: ${res.get.count()}," +
s" extentsProcessed: $extentsProcessed, backoff: $delayPeriodBetweenCalls, total:$totalAmount")

Expand All @@ -295,10 +295,10 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
val nodeCountQuery = executeEngine(database, generateNodesCountCommand(), crp).getPrimaryResults
nodeCountQuery.next()
val nodeCount = nodeCountQuery.getInt(0)
moveExtentsWithRetries(nodeCount * writeOptions.minimalExtentsCountForSplitMerge, extentsCount, database,
moveExtentsWithRetries(Some(nodeCount * writeOptions.minimalExtentsCountForSplitMerge), extentsCount, database,
tmpTableName, targetTable, crp, writeOptions)
} else {
moveExtentsWithRetries(extentsCount, extentsCount, database,
moveExtentsWithRetries(None, extentsCount, database,
tmpTableName, targetTable, crp, writeOptions)
}
}
Expand Down Expand Up @@ -372,12 +372,12 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
}

def executeEngine(database: String, command: String, crp: ClientRequestProperties, retryConfig: Option[RetryConfig] = None): KustoOperationResult = {
KDSU.retryFunction(() => engineClient.execute(database, command, crp), retryConfig.getOrElse(this.retryConfig),
KDSU.retryApplyFunction(() => engineClient.execute(database, command, crp), retryConfig.getOrElse(this.retryConfig),
"Execute engine command with retries")
}

def executeDM(command: String, crp: ClientRequestProperties, retryConfig: Option[RetryConfig] = None): KustoOperationResult = {
KDSU.retryFunction(() => dmClient.execute(ExtendedKustoClient.DefaultDb, command, crp), retryConfig.getOrElse(this.retryConfig),
KDSU.retryApplyFunction(() => dmClient.execute(ExtendedKustoClient.DefaultDb, command, crp), retryConfig.getOrElse(this.retryConfig),
"Execute DM command with retries")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ object KustoConstants {
val DefaultBatchingLimit: Int = 300
val DefaultExtentsCountForSplitMergePerNode: Int = 400
val DefaultMaxRetriesOnMoveExtents: Int = 10
val DefaultExecutionQueueing: Int = TimeUnit.SECONDS.toMillis(15).toInt
val DefaultTimeoutQueueing: Int = TimeUnit.SECONDS.toMillis(5).toInt
val DefaultExecutionQueueing: Int = TimeUnit.SECONDS.toMillis(60).toInt
val DefaultTimeoutQueueing: Int = TimeUnit.SECONDS.toMillis(40).toInt
val MaxIngestRetryAttempts = 2
val MaxCommandsRetryAttempts = 4
val DefaultMaximumIngestionTime: FiniteDuration = FiniteDuration.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ object KustoDataSourceUtils {
case _: NoSuchElementException => throw new InvalidParameterException(s"No such WriteMode option: '${writeModeParam.get}'")
}
val userTempTableName = parameters.get(KustoSinkOptions.KUSTO_TEMP_TABLE_NAME)
if (userTempTableName.isDefined && tableCreation == SinkTableCreationMode.CreateIfNotExist || !isTransactionalMode) {
if (userTempTableName.isDefined && (tableCreation == SinkTableCreationMode.CreateIfNotExist || !isTransactionalMode)) {
throw new InvalidParameterException("tempTableName can't be used with CreateIfNotExist or Queued write mode.")
}
isAsync = parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "false").trim.toBoolean
Expand Down Expand Up @@ -365,7 +365,7 @@ object KustoDataSourceUtils {
SinkParameters(writeOptions, sourceParameters)
}

def retryFunction[T](func: () => T, retryConfig: RetryConfig, retryName: String): T = {
def retryApplyFunction[T](func: () => T, retryConfig: RetryConfig, retryName: String): T = {
val retry = Retry.of(retryName, retryConfig)
val f: CheckedFunction0[T] = new CheckedFunction0[T]() {
override def apply(): T = func()
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>3.1.5</revision>
<revision>3.1.6</revision>

<!-- Spark dependencies -->
<scala.version.major>2.11</scala.version.major>
Expand Down

0 comments on commit a132cd1

Please sign in to comment.