Skip to content

Commit

Permalink
* Minor fixes, add the rows count and cache the RDD to avoid re-evalu…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
ag-ramachandran committed Sep 25, 2024
1 parent 37e5feb commit 6eaa5ee
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException
import com.microsoft.azure.kusto.ingest.resources.ContainerWithSas
import com.microsoft.azure.kusto.ingest.result.IngestionResult
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestionProperties}
import com.microsoft.azure.kusto.ingest.IngestionProperties
import com.microsoft.azure.storage.blob.{BlobRequestOptions, CloudBlockBlob}
import com.microsoft.kusto.spark.authentication.KustoAuthentication
import com.microsoft.kusto.spark.common.KustoCoordinates
Expand All @@ -35,25 +35,23 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.CollectionAccumulator

import java.io._
import java.io.{BufferedWriter, IOException, OutputStreamWriter}
import java.net.URI
import java.nio.charset.StandardCharsets
import java.security.InvalidParameterException
import java.time.{Clock, Duration, Instant}
import java.util
import java.util.zip.GZIPOutputStream
import java.util.{TimeZone, UUID}
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters.asScalaBufferConverter

object KustoWriter {
private val className = this.getClass.getSimpleName
val LegacyTempIngestionTablePrefix = "_tmpTable"
val TempIngestionTablePrefix = "sparkTempTable_"
val DelayPeriodBetweenCalls: Int = KCONST.DefaultPeriodicSamplePeriod.toMillis.toInt
private val GzipBufferSize: Int = 1000 * KCONST.DefaultBufferSize
Expand Down Expand Up @@ -161,46 +159,47 @@ object KustoWriter {
tmpTableName = tmpTableName,
cloudInfo = cloudInfo)
val sinkStartTime = getCreationTime(stagingTableIngestionProperties)
// Cache this RDD created so that it is not evaluated multiple times from source
val cachedRdd = rdd.cache()
if (writeOptions.isAsync) {
val asyncWork = rdd.foreachPartitionAsync { rows =>
val asyncWork = cachedRdd.foreachPartitionAsync { rows =>
ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters)
}
KDSU.logInfo(className, s"asynchronous write to Kusto table '$table' in progress")
// This part runs back on the driver

if (writeOptions.isTransactionalMode) {
asyncWork.onSuccess { case _ =>
finalizeIngestionWhenWorkersSucceeded(
tableCoordinates,
batchIdIfExists,
tmpTableName,
partitionsResults,
writeOptions,
crp,
tableExists,
rdd.sparkContext,
authentication,
kustoClient,
sinkStartTime)
}
asyncWork.onFailure { case exception: Exception =>
if (writeOptions.userTempTableName.isEmpty) {
kustoClient.cleanupIngestionByProducts(tableCoordinates.database, tmpTableName, crp)
}
KDSU.reportExceptionAndThrow(
className,
exception,
"writing data",
tableCoordinates.clusterUrl,
tableCoordinates.database,
table,
shouldNotThrow = true)
KDSU.logError(className, "The exception is not visible in the driver since we're in async mode")
asyncWork.onComplete {
case Success(_) =>
finalizeIngestionWhenWorkersSucceeded(
tableCoordinates,
batchIdIfExists,
tmpTableName,
partitionsResults,
writeOptions,
crp,
tableExists,
rdd.sparkContext,
authentication,
kustoClient,
sinkStartTime)
case Failure(exception) =>
if (writeOptions.userTempTableName.isEmpty) {
kustoClient.cleanupIngestionByProducts(tableCoordinates.database, tmpTableName, crp)
}
KDSU.reportExceptionAndThrow(
className,
exception,
"writing data",
tableCoordinates.clusterUrl,
tableCoordinates.database,
table,
shouldNotThrow = true)
KDSU.logError(className, "The exception is not visible in the driver since we're in async mode")
}
}
} else {
try
rdd.foreachPartition { rows =>
cachedRdd.foreachPartition { rows =>
ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters)
}
catch {
Expand Down Expand Up @@ -322,7 +321,7 @@ object KustoWriter {
parameters.coordinates.ingestionUrl,
parameters.coordinates.clusterAlias)
val ingestClient = clientCache.ingestClient
CloudInfo.manuallyAddToCache(clientCache.ingestKcsb.getClusterUrl, parameters.cloudInfo);
CloudInfo.manuallyAddToCache(clientCache.ingestKcsb.getClusterUrl, parameters.cloudInfo)
val reqRetryOpts = new RequestRetryOptions(
RetryPolicyType.FIXED,
KCONST.QueueRetryAttempts,
Expand Down Expand Up @@ -387,14 +386,15 @@ object KustoWriter {
val lastBlobWriter = rows.zipWithIndex.foldLeft[BlobWriteResource](initialBlobWriter) { case (blobWriter, row) =>
RowCSVWriterUtils.writeRowAsCSV(row._1, parameters.schema, timeZone, blobWriter.csvWriter)
val count = blobWriter.csvWriter.getCounter
val rowsWritten = blobWriter.csvWriter.getRowsWritten
val shouldNotCommitBlockBlob = count < maxBlobSize
if (shouldNotCommitBlockBlob) {
blobWriter
} else {
KDSU.logInfo(
className,
s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
s"blob number ${row._2}, with size $count")
s"blob number ${row._2}, blobname ${blobWriter.blob.getName} with size $count.Rows written: $rowsWritten")
finalizeBlobWrite(blobWriter)
ingest(
blobWriter,
Expand All @@ -420,6 +420,12 @@ object KustoWriter {
s"Finished serializing rows in partition $partitionIdString for requestId:'${parameters.writeOptions.requestId}'")
finalizeBlobWrite(lastBlobWriter)
if (lastBlobWriter.csvWriter.getCounter > 0) {
val count = lastBlobWriter.csvWriter.getCounter
val rowsWritten = lastBlobWriter.csvWriter.getRowsWritten
KDSU.logInfo(
className,
s"Flushing final blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
s"blob name ${lastBlobWriter.blob.getName}, with size $count.Rows written: $rowsWritten")
ingest(
lastBlobWriter,
parameters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ case class CountingWriter(out: java.io.Writer) extends Writer {
// new sun.security.action.GetPropertyAction("line.separator"))
private val newLineSepLength: Int = newLineSep.length
private var bytesCounter: Long = 0L
private var rowsCounter: Long = 0L

def newLine(): Unit = {
out.write(newLineSep)
bytesCounter += newLineSepLength
rowsCounter += 1
}

def write(c: Char): Unit = {
Expand Down Expand Up @@ -53,6 +55,7 @@ case class CountingWriter(out: java.io.Writer) extends Writer {

def getCounter: Long = bytesCounter

def getRowsWritten: Long = rowsCounter
def resetCounter(): Unit = {
bytesCounter = 0
}
Expand Down

0 comments on commit 6eaa5ee

Please sign in to comment.