Skip to content

Commit

Permalink
* Minor fixes, add the rows count and cache the RDD to avoid re-evalu…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
ag-ramachandran committed Sep 25, 2024
1 parent 6eaa5ee commit dfaba24
Showing 1 changed file with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ object KustoWriter {
}
val cloudInfo = CloudInfo.retrieveCloudInfoForCluster(kustoClient.ingestKcsb.getClusterUrl)
val rdd = data.queryExecution.toRdd
val partitionsResults = rdd.sparkContext.collectionAccumulator[PartitionResult]
val isRddAlreadyCached = rdd.getStorageLevel.useMemory
val maybeCachedRdd = if (writeOptions.ensureNoDupBlobs && !isRddAlreadyCached) {
rdd.cache()
} else {
rdd
}
val partitionsResults = maybeCachedRdd.sparkContext.collectionAccumulator[PartitionResult]
val parameters = KustoWriteResource(
authentication = authentication,
coordinates = tableCoordinates,
Expand All @@ -160,10 +166,12 @@ object KustoWriter {
cloudInfo = cloudInfo)
val sinkStartTime = getCreationTime(stagingTableIngestionProperties)
// Cache this RDD created so that it is not evaluated multiple times from source
val cachedRdd = rdd.cache()

val updatedParameters = parameters.copy(isAlreadyCached = isRddAlreadyCached)

if (writeOptions.isAsync) {
val asyncWork = cachedRdd.foreachPartitionAsync { rows =>
ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters)
val asyncWork = maybeCachedRdd.foreachPartitionAsync { rows =>
ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, updatedParameters)
}
KDSU.logInfo(className, s"asynchronous write to Kusto table '$table' in progress")
// This part runs back on the driver
Expand All @@ -178,7 +186,7 @@ object KustoWriter {
writeOptions,
crp,
tableExists,
rdd.sparkContext,
maybeCachedRdd.sparkContext,
authentication,
kustoClient,
sinkStartTime)
Expand All @@ -199,8 +207,8 @@ object KustoWriter {
}
} else {
try
cachedRdd.foreachPartition { rows =>
ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters)
maybeCachedRdd.foreachPartition { rows =>
ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, updatedParameters)
}
catch {
case exception: Exception =>
Expand All @@ -221,12 +229,15 @@ object KustoWriter {
writeOptions,
crp,
tableExists,
rdd.sparkContext,
maybeCachedRdd.sparkContext,
authentication,
kustoClient,
sinkStartTime)
}
}
if (parameters.writeOptions.ensureNoDupBlobs && !parameters.isAlreadyCached) {
maybeCachedRdd.unpersist()
}
}
}

Expand Down Expand Up @@ -536,6 +547,7 @@ final case class KustoWriteResource(
schema: StructType,
writeOptions: WriteOptions,
tmpTableName: String,
cloudInfo: CloudInfo)
cloudInfo: CloudInfo,
isAlreadyCached: Boolean = false)

final case class PartitionResult(ingestionResult: IngestionResult, partitionId: Int)

0 comments on commit dfaba24

Please sign in to comment.