Skip to content

Commit

Permalink
* Add change for unique tag names
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Sep 19, 2024
1 parent 91efd91 commit 734bd0c
Showing 1 changed file with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasink.FinalizeHelper.finalizeIngestionWhenWorkersSucceeded
import com.microsoft.kusto.spark.utils.CslCommandsGenerator.generateTableGetSchemaAsRowsCommand
import com.microsoft.kusto.spark.utils.KustoConstants.{IngestSkippedTrace, MaxIngestRetryAttempts}
import com.microsoft.kusto.spark.utils.{ExtendedKustoClient, KustoClientCache, KustoIngestionUtils, KustoQueryUtils, KustoConstants => KCONST, KustoDataSourceUtils => KDSU}
import com.microsoft.kusto.spark.utils.{
ExtendedKustoClient,
KustoClientCache,
KustoIngestionUtils,
KustoQueryUtils,
KustoConstants => KCONST,
KustoDataSourceUtils => KDSU
}
import io.github.resilience4j.retry.RetryConfig
import org.apache.spark.TaskContext
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -255,8 +262,7 @@ object KustoWriter {
parameters.coordinates.database,
if (parameters.writeOptions.isTransactionalMode) {
parameters.tmpTableName
}
else {
} else {
parameters.coordinates.table.get
})

Expand Down Expand Up @@ -368,8 +374,8 @@ object KustoWriter {
parameters.authentication,
parameters.coordinates.ingestionUrl,
parameters.coordinates.clusterAlias)
val blobIdMap = new ConcurrentHashMap[String,Int]()
blobIdMap.put(parameters.writeOptions.requestId,0)
val blobIdMap = new ConcurrentHashMap[String, Int]()
blobIdMap.put(parameters.writeOptions.requestId, 0)
val maxBlobSize = parameters.writeOptions.batchLimit * KCONST.OneMegaByte
var curBlobUUID = UUID.randomUUID().toString
// This blobWriter will be used later to write the rows to blob storage from which it will be ingested to Kusto
Expand All @@ -384,8 +390,8 @@ object KustoWriter {
if (shouldNotCommitBlockBlob) {
blobWriter
} else {
val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId,0)
blobIdMap.put(parameters.writeOptions.requestId,blobIndexInBatch+1)
val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId, 0)
blobIdMap.put(parameters.writeOptions.requestId, blobIndexInBatch + 1)
KDSU.logInfo(
className,
s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
Expand All @@ -399,7 +405,8 @@ object KustoWriter {
curBlobUUID,
kustoClient,
partitionsResults,
batchIdForTracing,blobIndexInBatch)
batchIdForTracing,
blobIndexInBatch)
curBlobUUID = UUID.randomUUID().toString
createBlobWriter(
parameters.coordinates,
Expand All @@ -415,8 +422,8 @@ object KustoWriter {
s"Finished serializing rows in partition $partitionIdString for requestId:'${parameters.writeOptions.requestId}'")
finalizeBlobWrite(lastBlobWriter)
if (lastBlobWriter.csvWriter.getCounter > 0) {
val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId,0)
blobIdMap.put(parameters.writeOptions.requestId,blobIndexInBatch+1)
val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId, 0)
blobIdMap.put(parameters.writeOptions.requestId, blobIndexInBatch + 1)
ingest(
lastBlobWriter,
parameters,
Expand All @@ -425,7 +432,8 @@ object KustoWriter {
curBlobUUID,
kustoClient,
partitionsResults,
batchIdForTracing,blobIndexInBatch)
batchIdForTracing,
blobIndexInBatch)
}
}

Expand All @@ -437,7 +445,8 @@ object KustoWriter {
blobUUID: String,
kustoClient: ExtendedKustoClient,
partitionsResults: CollectionAccumulator[PartitionResult],
batchIdForTracing: String,blobIndexInBatch:Int): Unit = {
batchIdForTracing: String,
blobIndexInBatch: Int): Unit = {
val size = blobResource.csvWriter.getCounter
val sas = blobResource.sas
val partitionId = TaskContext.getPartitionId
Expand All @@ -449,8 +458,11 @@ object KustoWriter {
}
if (parameters.writeOptions.ensureNoDupBlobs) {
// The Key change is here
val pref = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, blobIndexInBatch.toString)
val tag = pref + blobUUID
val tag = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, s"${blobIndexInBatch.toString}_$partitionId")
KDSU.logInfo(
className,
s"With ensureNoDupBlobs in ${TaskContext.getPartitionId.toString} " +
s"for requestId: '${parameters.writeOptions.requestId}, the tag is $tag")
val ingestIfNotExist = new util.ArrayList[String]
ingestIfNotExist.addAll(props.getIngestIfNotExists)
val ingestBy: util.List[String] = new util.ArrayList[String]
Expand Down

0 comments on commit 734bd0c

Please sign in to comment.