Skip to content

Commit

Permalink
Use mapping and flushImmediately options over staging table if set (#95)
Browse files Browse the repository at this point in the history
* Flush immediately if blob larger than 100 mb
Set max blob size to 100 MB
Use mapping and flushImmediately options on the staging table if set by the user

* Limit assignable and docs

* log

* Apply suggestions from code review

Co-Authored-By: Daniel Dubovski <[email protected]>

* comments

* spelling

* normalize name

* comments
  • Loading branch information
ohadbitt authored Nov 12, 2019
1 parent 86b3269 commit 6da979b
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ object KustoSinkOptions extends KustoOptions{
val KUSTO_SPARK_INGESTION_PROPERTIES_JSON: String = newOption("sparkIngestionPropertiesJson")

val NONE_RESULT_LIMIT = "none"

// A limit indicating the size in MB of the aggregated data before ingested to Kusto. Note that this is done for each
// partition. Kusto's ingestion also aggregates data, default suggested by Kusto is 1GB but here we suggest to cut
// it at 100MB to adjust it to spark pulling of data.
val KUSTO_CLIENT_BATCHING_LIMIT: String = newOption("clientBatchingLimit")

}

object SinkTableCreationMode extends Enumeration {
Expand All @@ -37,4 +43,5 @@ case class WriteOptions(tableCreateOptions: SinkTableCreationMode.SinkTableCreat
isAsync: Boolean = false,
writeResultLimit: String = KustoSinkOptions.NONE_RESULT_LIMIT,
timeZone: String = "UTC", timeout: FiniteDuration,
IngestionProperties: Option[String] = None)
IngestionProperties: Option[String] = None,
batchLimit: Int = 100)
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types.{DateType, StringType, StructType, TimestampType, _}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.CollectionAccumulator
import org.apache.spark.{FutureAction, TaskContext}
import org.apache.spark.TaskContext

import scala.collection.Iterator
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.{Await, Future, TimeoutException}

object KustoWriter {
private val myName = this.getClass.getSimpleName
val TempIngestionTablePrefix = "_tmpTable"
val delayPeriodBetweenCalls: Int = KCONST.defaultPeriodicSamplePeriod.toMillis.toInt
val GZIP_BUFFER_SIZE: Int = KCONST.defaultBufferSize
val maxBlobSize: Int = KCONST.oneGiga - KCONST.oneMega
var maxBlobSize: Int = KCONST.oneMega * KCONST.oneMega

private[kusto] def write(batchId: Option[Long],
data: DataFrame,
tableCoordinates: KustoCoordinates,
authentication: KustoAuthentication,
writeOptions: WriteOptions): Unit = {

maxBlobSize = writeOptions.batchLimit * KCONST.oneMega
val batchIdIfExists = batchId.filter(_ != 0).map(_.toString).getOrElse("")
val kustoClient = KustoClientCache.getClient(tableCoordinates.cluster, authentication)

Expand All @@ -67,11 +67,16 @@ object KustoWriter {
kustoClient.createTmpTableWithSameSchema(tableCoordinates, tmpTableName, writeOptions.tableCreateOptions, data.schema)
KDSU.logInfo(myName, s"Successfully created temporary table $tmpTableName, will be deleted after completing the operation")

val stagingTableIngestionProperties = getIngestionProperties(writeOptions, parameters)
kustoClient.setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties, table)
if (stagingTableIngestionProperties.getFlushImmediately){
KDSU.logWarn(myName, "Its not recommended to set flushImmediately to true")
}
val rdd = data.queryExecution.toRdd
val partitionsResults = rdd.sparkContext.collectionAccumulator[PartitionResult]

if (writeOptions.isAsync) {
val asyncWork: FutureAction[Unit] = rdd.foreachPartitionAsync { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults) }
val asyncWork = rdd.foreachPartitionAsync { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults) }
KDSU.logInfo(myName, s"asynchronous write to Kusto table '$table' in progress")
// This part runs back on the driver
asyncWork.onSuccess {
Expand All @@ -83,11 +88,10 @@ object KustoWriter {
KDSU.reportExceptionAndThrow(myName, exception, "writing data", tableCoordinates.cluster, tableCoordinates.database, table, shouldNotThrow = true)
KDSU.logError(myName, "The exception is not visible in the driver since we're in async mode")
}
}
else {
try {
} else {
try
rdd.foreachPartition { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults) }
} catch {
catch {
case exception: Exception =>
kustoClient.cleanupIngestionByproducts(tableCoordinates.database, kustoClient.engineClient, tmpTableName)
throw exception
Expand Down Expand Up @@ -138,12 +142,7 @@ object KustoWriter {
(implicit parameters: KustoWriteResource): Unit = {
import parameters._

val ingestionProperties = if (writeOptions.IngestionProperties.isDefined) {
SparkIngestionProperties.fromString(writeOptions.IngestionProperties.get).toIngestionProperties(coordinates.database, tmpTableName)
} else {
new IngestionProperties(coordinates.database, tmpTableName)
}

val ingestionProperties = getIngestionProperties(writeOptions, parameters)
ingestionProperties.setDataFormat(DATA_FORMAT.csv.name)
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table)
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses)
Expand All @@ -152,7 +151,19 @@ object KustoWriter {

KDSU.logWarn(myName, s"Ingesting using ingest client - partition: ${TaskContext.getPartitionId()}")

tasks.asScala.foreach(t => Await.result(t, KCONST.defaultIngestionTaskTime))
tasks.asScala.foreach(t => try {
Await.result(t, KCONST.defaultIngestionTaskTime)
} catch {
case _: TimeoutException => KDSU.logWarn(myName, s"Timed out trying to ingest, no need to fail as the ingest might succeed")
})
}

private def getIngestionProperties(writeOptions: WriteOptions, parameters: KustoWriteResource): IngestionProperties = {
if (writeOptions.IngestionProperties.isDefined) {
SparkIngestionProperties.fromString(writeOptions.IngestionProperties.get).toIngestionProperties(parameters.coordinates.database, parameters.tmpTableName)
} else {
new IngestionProperties(parameters.coordinates.database, parameters.tmpTableName)
}
}

private def ingestToTemporaryTableByWorkers(
Expand Down Expand Up @@ -196,12 +207,18 @@ object KustoWriter {
ingestionProperties: IngestionProperties,
partitionsResults: CollectionAccumulator[PartitionResult]): util.ArrayList[Future[Unit]]
= {
def ingest(blob: CloudBlockBlob, size: Long, sas: String): Future[Unit] = {
def ingest(blob: CloudBlockBlob, size: Long, sas: String, flushImmediately: Boolean = false): Future[Unit] = {
Future {
var props = ingestionProperties
if(!ingestionProperties.getFlushImmediately && flushImmediately){
// Need to copy the ingestionProperties so that only this one will be flushed immediately
props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties)
props.setFlushImmediately(true)
}
val blobUri = blob.getStorageUri.getPrimaryUri.toString
val blobPath = blobUri + sas
val blobSourceInfo = new BlobSourceInfo(blobPath, size)
partitionsResults.add(PartitionResult(ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties), TaskContext.getPartitionId))
partitionsResults.add(PartitionResult(ingestClient.ingestFromBlob(blobSourceInfo, props), TaskContext.getPartitionId))
}
}

Expand All @@ -224,8 +241,9 @@ object KustoWriter {
if (shouldNotCommitBlockBlob) {
blobWriter
} else {
KDSU.logInfo(myName, s"Sealing blob in partition ${TaskContext.getPartitionId}, number ${ingestionTasks.size}")
finalizeBlobWrite(blobWriter)
val task = ingest(blobWriter.blob, blobWriter.csvWriter.getCounter, blobWriter.sas)
val task = ingest(blobWriter.blob, blobWriter.csvWriter.getCounter, blobWriter.sas, flushImmediately = true)
ingestionTasks.add(task)
createBlobWriter(schema, coordinates, tmpTableName, kustoClient)
}
Expand Down Expand Up @@ -267,9 +285,9 @@ object KustoWriter {
// This method does not check for null at the current row idx and should be checked before !
private def writeField(row: SpecializedGetters, fieldIndexInRow: Int, dataType: DataType, dateFormat: FastDateFormat, csvWriter: CountingCsvWriter, nested: Boolean): Unit = {
dataType match {
case StringType => GetStringFromUTF8(row.getUTF8String(fieldIndexInRow), nested, csvWriter)
case DateType => csvWriter.writeStringField(DateTimeUtils.toJavaDate(row.getInt(fieldIndexInRow)).toString, nested)
case TimestampType => csvWriter.writeStringField(dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(fieldIndexInRow))), nested)
case StringType => GetStringFromUTF8(row.getUTF8String(fieldIndexInRow), nested, csvWriter)
case BooleanType => csvWriter.write(row.getBoolean(fieldIndexInRow).toString)
case structType: StructType => convertStructToCsv(row.getStruct(fieldIndexInRow, structType.length), structType, dateFormat, csvWriter, nested)
case arrType: ArrayType => convertArrayToCsv(row.getArray(fieldIndexInRow), arrType.elementType, dateFormat, csvWriter, nested)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ class SparkIngestionProperties(var flushImmediately: Boolean = false,
}

object SparkIngestionProperties {
def cloneIngestionProperties(ingestionProperties: IngestionProperties): IngestionProperties = {
val cloned = new IngestionProperties(ingestionProperties.getDatabaseName, ingestionProperties.getTableName)
cloned.setReportLevel(ingestionProperties.getReportLevel)
cloned.setReportMethod(ingestionProperties.getReportMethod)
cloned.setAdditionalTags(cloned.getAdditionalTags)
cloned.setDropByTags(ingestionProperties.getDropByTags)
cloned.setIngestByTags(cloned.getIngestByTags)
cloned.setIngestIfNotExists(ingestionProperties.getIngestIfNotExists)
cloned.setDataFormat(ingestionProperties.getDataFormat)
cloned.setIngestionMapping(ingestionProperties.getIngestionMapping)
cloned
}

private[kusto] def fromString(json: String): SparkIngestionProperties = {
new ObjectMapper().setVisibility(JsonMethod.FIELD, Visibility.ANY).readValue(json, classOf[SparkIngestionProperties])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[kusto] object CslCommandsGenerator {
// Note: we could project-away Type, but this would result in an exception for non-existing tables,
// and we rely on getting an empty result in this case
def generateTableGetSchemaAsRowsCommand(table: String): String = {
".show table " + table + " schema as json | project ColumnsJson=todynamic(Schema).OrderedColumns" +
".show table " + KustoQueryUtils.normalizeTableName(table) + " schema as json | project ColumnsJson=todynamic(Schema).OrderedColumns" +
"| mv-expand ColumnsJson | evaluate bag_unpack(ColumnsJson)"
}

Expand All @@ -46,7 +46,7 @@ private[kusto] object CslCommandsGenerator {
}

def generateTableAlterMergePolicyCommand(table: String, allowMerge: Boolean, allowRebuild: Boolean): String = {
s""".alter table $table policy merge @'{"AllowMerge":"$allowMerge", "AllowRebuild":"$allowRebuild"}'"""
s""".alter table ${KustoQueryUtils.normalizeTableName(table)} policy merge @'{"AllowMerge":"$allowMerge", "AllowRebuild":"$allowRebuild"}'"""
}

def generateOperationsShowCommand(operationId: String): String = {
Expand Down Expand Up @@ -94,4 +94,12 @@ private[kusto] object CslCommandsGenerator {
def generateTableAlterRetentionPolicy(tmpTableName: String, period: String, recoverable: Boolean): String = {
s""".alter table $tmpTableName policy retention '{ "SoftDeletePeriod": "$period", "Recoverability":"${if (recoverable) "Enabled" else "Disabled"}" }' """
}

def generateShowTableMappingsCommand(tableName: String, kind: String): String = {
s""".show table ${KustoQueryUtils.normalizeTableName(tableName)} ingestion $kind mappings"""
}

def generateCreateTableMappingCommand(tableName: String, kind: String, name:String, mappingAsJson: String): String = {
s""".create table $tableName ingestion $kind mapping "$name" @"$mappingAsJson""""
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,26 @@ import java.util.concurrent.TimeUnit

import com.microsoft.azure.kusto.data.{Client, ClientFactory, ConnectionStringBuilder}
import com.microsoft.azure.kusto.ingest.result.{IngestionStatus, OperationStatus}
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestClientFactory}
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestClientFactory, IngestionProperties}
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasink.KustoWriter.delayPeriodBetweenCalls
import com.microsoft.kusto.spark.datasink.SinkTableCreationMode.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.{PartitionResult, SinkTableCreationMode}
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.KustoDataSourceUtils.extractSchemaFromResultTable
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.CollectionAccumulator
import org.joda.time.{DateTime, DateTimeZone, Period}
import shaded.parquet.org.codehaus.jackson.map.ObjectMapper

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, Future}

class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuilder, val ingestKcsb: ConnectionStringBuilder) {

val engineClient: Client = ClientFactory.createClient(engineKcsb)

// Reading process does not require ingest client to start working
Expand Down Expand Up @@ -175,6 +175,27 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
}
}

private[kusto] def setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties: IngestionProperties, originalTable: String): Unit = {
val mapping = stagingTableIngestionProperties.getIngestionMapping
val mappingReferenceName = mapping.getIngestionMappingReference
if (StringUtils.isNotBlank(mappingReferenceName)) {
val mappingKind = mapping.getIngestionMappingKind.toString
val cmd = generateShowTableMappingsCommand(originalTable, mappingKind)
val mappings = engineClient.execute(stagingTableIngestionProperties.getDatabaseName, cmd).getValues
val it = mappings.iterator()
var found = false
while (it.hasNext && !found){
val mapping = it.next()
if(mapping.get(0).equals(mappingReferenceName)){
val policyJson = mapping.get(2).replace("\"","'")
val c = generateCreateTableMappingCommand(stagingTableIngestionProperties.getTableName, mappingKind, mappingReferenceName, policyJson)
engineClient.execute(stagingTableIngestionProperties.getDatabaseName, c)
found = true
}
}
}
}

private def readIngestionResult(statusRecord: IngestionStatus): String = {
new ObjectMapper()
.writerWithDefaultPrettyPrinter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object KustoConstants {
// Setting high value to have no timeout on Await commands
val defaultWaitingIntervalLongRunning: String = (2 days).toSeconds.toString
val defaultPeriodicSamplePeriod: FiniteDuration = 1 seconds
val defaultIngestionTaskTime: FiniteDuration = 10 seconds
val defaultIngestionTaskTime: FiniteDuration = 20 seconds
val clientName: String = KustoDataSourceUtils.ClientName
val defaultBufferSize: Int = 16 * 1024
val storageExpiryMinutes: Int = 120
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.StructType
import java.util.Properties

import shaded.parquet.org.codehaus.jackson.map.ObjectMapper

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.util.matching.Regex
Expand Down Expand Up @@ -152,12 +154,14 @@ object KustoDataSourceUtils {
var tableCreationParam: Option[String] = None
var isAsync: Boolean = false
var isAsyncParam: String = ""
var batchLimit: Int = 0

try {
isAsyncParam = parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "false")
isAsync = parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "false").trim.toBoolean
tableCreationParam = parameters.get(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS)
tableCreation = if (tableCreationParam.isEmpty) SinkTableCreationMode.FailIfNotExist else SinkTableCreationMode.withName(tableCreationParam.get)
batchLimit = parameters.getOrElse(KustoSinkOptions.KUSTO_CLIENT_BATCHING_LIMIT, "100").trim.toInt
} catch {
case _: NoSuchElementException => throw new InvalidParameterException(s"No such SinkTableCreationMode option: '${tableCreationParam.get}'")
case _: java.lang.IllegalArgumentException => throw new InvalidParameterException(s"KUSTO_WRITE_ENABLE_ASYNC is expecting either 'true' or 'false', got: '$isAsyncParam'")
Expand All @@ -173,14 +177,20 @@ object KustoDataSourceUtils {
parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_RESULT_LIMIT, "1"),
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, "UTC"),
timeout,
ingestionPropertiesAsJson
ingestionPropertiesAsJson,
batchLimit
)

val sourceParameters = parseSourceParameters(parameters)

if (sourceParameters.kustoCoordinates.table.isEmpty) {
throw new InvalidParameterException("KUSTO_TABLE parameter is missing. Must provide a destination table name")
}

logInfo("parseSinkParameters", s"Parsed write options for sink: {'timeout': ${writeOptions.timeout}, 'async': ${writeOptions.isAsync}, " +
s"'tableCreationMode': ${writeOptions.tableCreateOptions}, 'writeLimit': ${writeOptions.writeResultLimit}, 'batchLimit': ${writeOptions.batchLimit}" +
s", 'timeout': ${writeOptions.timeout}, 'timezone': ${writeOptions.timeZone}, 'ingestionProperties': $ingestionPropertiesAsJson}")

SinkParameters(writeOptions, sourceParameters)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object KustoQueryUtils {
def isQuery(query: String): Boolean = !isCommand(query)

def simplifyName(name: String): String = {
name.replaceAll("-", "_").replaceAll("\\s", "")
normalizeTableName(name.replaceAll("-", "_").replaceAll("\\s", ""))
}

def normalizeTableName(table: String): String = {
Expand Down
5 changes: 5 additions & 0 deletions docs/KustoSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ that is using it. Please verify the following before using Kusto connector:

- flushImmediately: Boolean - use with caution - flushes the data immidiatly upon ingestion without aggregation.

* **KUSTO_CLIENT_BATCHING_LIMIT**:
A limit indicating the size in MB of the aggregated data before ingested to Kusto. Note that this is done for each
partition. The ingestion Kusto also aggregates data, default suggested by Kusto is 1GB but here we suggest to cut
it at 100MB to adjust it to spark pulling of data.

>**Note:**
For both synchronous and asynchronous operation, 'write' is an atomic transaction, i.e.
either all data is written to Kusto, or no data is written.
Expand Down

0 comments on commit 6da979b

Please sign in to comment.