Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Azure/azure-kusto-spark i…
Browse files Browse the repository at this point in the history
…nto 2.4
  • Loading branch information
asaharn committed Mar 13, 2023
2 parents e73e9bd + 5825346 commit 264dfb3
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 21 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ link your application with the artifact below to use the Azure Data Explorer Con
```
groupId = com.microsoft.azure.kusto
artifactId = kusto-spark_3.0_2.12
version = 3.1.11
version = 3.1.12
```

**In Maven**:

Look for the following coordinates:
```
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.11
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.12
```

Or clone this repository and build it locally to add it to your local maven repository,.
Expand All @@ -49,15 +49,15 @@ The jar can also be found under the [released package](https://github.com/Azure/
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-spark_3.0_2.12</artifactId>
<version>3.1.11</version>
<version>3.1.12</version>
</dependency>
```

**In SBT**:

```scala
libraryDependencies ++= Seq(
"com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "3.1.11"
"com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "3.1.12"
)
```

Expand All @@ -66,7 +66,7 @@ libraryDependencies ++= Seq(
Libraries -> Install New -> Maven -> copy the following coordinates:

```
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.11
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.12
```

#### Building Samples Module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.microsoft.kusto.spark.utils.{ExtendedKustoClient, KustoClientCache, K
import org.apache.spark.SparkContext
import org.apache.spark.util.CollectionAccumulator

import java.time.Instant
import scala.collection.JavaConverters._
import scala.concurrent.{Await, Future, TimeoutException}
import scala.concurrent.ExecutionContext.Implicits.global
Expand All @@ -31,7 +32,8 @@ object FinalizeHelper {
tableExists: Boolean,
sparkContext: SparkContext,
authentication: KustoAuthentication,
kustoClient: ExtendedKustoClient
kustoClient: ExtendedKustoClient,
sinkStartTime: Instant
): Unit = {
if (!kustoClient.shouldIngestData(coordinates, writeOptions.ingestionProperties, tableExists, crp)) {
KDSU.logInfo(myName, s"$IngestSkippedTrace '${coordinates.table}'")
Expand Down Expand Up @@ -77,7 +79,7 @@ object FinalizeHelper {
writeOptions.requestId
)
}
client.moveExtents(coordinates.database, tmpTableName, coordinates.table.get, crp, writeOptions)
client.moveExtents(coordinates.database, tmpTableName, coordinates.table.get, crp, writeOptions, sinkStartTime)
}
// Move data to real table
// Protect tmp table from merge/rebuild and move data to the table requested by customer. This operation is atomic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.zip.GZIPOutputStream
import java.util.{TimeZone, UUID}
import com.microsoft.kusto.spark.datasink.FinalizeHelper.finalizeIngestionWhenWorkersSucceeded

import java.time.Instant
import java.time.{Clock, Instant}
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -128,6 +128,7 @@ object KustoWriter {
val partitionsResults = rdd.sparkContext.collectionAccumulator[PartitionResult]
val parameters = KustoWriteResource(authentication = authentication, coordinates = tableCoordinates,
schema = data.schema, writeOptions = rebuiltOptions, tmpTableName = tmpTableName)
val sinkStartTime = getCreationTime(stagingTableIngestionProperties, tableCoordinates)
if (writeOptions.isAsync) {
val asyncWork = rdd.foreachPartitionAsync { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists,
partitionsResults,parameters) }
Expand All @@ -138,7 +139,7 @@ object KustoWriter {
asyncWork.onSuccess {
case _ => finalizeIngestionWhenWorkersSucceeded(
tableCoordinates, batchIdIfExists, tmpTableName, partitionsResults,
writeOptions, crp, tableExists, rdd.sparkContext, authentication, kustoClient)
writeOptions, crp, tableExists, rdd.sparkContext, authentication, kustoClient, sinkStartTime)
}
asyncWork.onFailure {
case exception: Exception =>
Expand Down Expand Up @@ -166,12 +167,21 @@ object KustoWriter {
if (writeOptions.isTransactionalMode) {
finalizeIngestionWhenWorkersSucceeded(
tableCoordinates, batchIdIfExists, tmpTableName, partitionsResults, writeOptions,
crp, tableExists, rdd.sparkContext, authentication, kustoClient)
crp, tableExists, rdd.sparkContext, authentication, kustoClient, sinkStartTime)
}
}
}
}

def getCreationTime(ingestionProperties: SparkIngestionProperties, tableCoordinates: KustoCoordinates): Instant = {
val startTime = Option(ingestionProperties.toIngestionProperties(tableCoordinates.database, tableCoordinates.table.get).getAdditionalProperties.get("startTime"))

startTime match {
case Some(creationTimeVal) => Instant.parse(creationTimeVal)
case None => Instant.now(Clock.systemUTC())
}
}

def ingestRowsIntoTempTbl(rows: Iterator[InternalRow], batchIdForTracing: String,
partitionsResults: CollectionAccumulator[PartitionResult],parameters: KustoWriteResource)
: Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.microsoft.kusto.spark.utils

import java.time.Instant
import com.microsoft.kusto.spark.datasource.{TransientStorageCredentials, TransientStorageParameters}
import java.util

private[kusto] object CslCommandsGenerator {
private final val defaultKeySet = Set("compressionType","namePrefix","sizeLimit","compressed","async")
Expand Down Expand Up @@ -81,21 +82,21 @@ private[kusto] object CslCommandsGenerator {
s""".show table ${tableName} details | project todynamic(ShardingPolicy).UseShardEngine"""
}

def generateTableMoveExtentsCommand(sourceTableName: String, destinationTableName: String, batchSize: Int,
def generateTableMoveExtentsCommand(sourceTableName: String, destinationTableName: String, timerange: Array[Instant], batchSize: Int,
isDestinationTableMaterializedViewSource: Boolean = false): String = {
val setNewIngestionTime: String = if (isDestinationTableMaterializedViewSource) "with(SetNewIngestionTime=true)" else ""
s""".move extents to table $destinationTableName $setNewIngestionTime <|
s""".move extents to table $destinationTableName $setNewIngestionTime with(extentCreatedOnFrom='${timerange(0)}', extentCreatedOnTo='${timerange(1)}') <|
.show table $sourceTableName extents with(extentsShowFilteringRuntimePolicy='{"MaximumResultsCount":$batchSize}');
$$command_results
| distinct ExtentId"""
}

def generateTableMoveExtentsAsyncCommand(sourceTableName: String, destinationTableName: String, batchSize: Option[Int],
def generateTableMoveExtentsAsyncCommand(sourceTableName: String, destinationTableName: String, timerange: Array[Instant], batchSize: Option[Int],
isDestinationTableMaterializedViewSource: Boolean = false): String
= {
val withClause = if (batchSize.isDefined) s"""with(extentsShowFilteringRuntimePolicy='{"MaximumResultsCount":${batchSize.get}}')""" else ""
val setNewIngestionTime: String = if (isDestinationTableMaterializedViewSource) "with(SetNewIngestionTime=true)" else ""
s""".move async extents to table $destinationTableName $setNewIngestionTime <|
s""".move async extents to table $destinationTableName $setNewIngestionTime with(extentCreatedOnFrom='${timerange(0)}', extentCreatedOnTo='${timerange(1)}') <|
.show table $sourceTableName extents $withClause;
"""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
}
}

def moveExtentsWithRetries(batchSize: Option[Int], totalAmount: Int, database: String, tmpTableName: String, targetTable: String,
def moveExtentsWithRetries(batchSize: Option[Int], totalAmount: Int, database: String, tmpTableName: String, targetTable: String, ingestionStartTime: Instant,
crp: ClientRequestProperties, writeOptions: WriteOptions): Unit = {
var extentsProcessed = 0
var retry = 0
Expand All @@ -222,8 +222,9 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
var failed = false
// Execute move batch and keep any transient error for handling
try {
val timeRange = Array[Instant](ingestionStartTime, Instant.now())
val operation = executeEngine(database, generateTableMoveExtentsAsyncCommand(tmpTableName,
targetTable, if (batchSize.isEmpty) None else Some(curBatchSize), useMaterializedViewFlag), crp).getPrimaryResults
targetTable, timeRange , if (batchSize.isEmpty) None else Some(curBatchSize), useMaterializedViewFlag), crp).getPrimaryResults
val operationResult = KDSU.verifyAsyncCommandCompletion(engineClient, database, operation, samplePeriod =
KustoConstants
.DefaultPeriodicSamplePeriod, writeOptions.timeout, s"move extents to destination table '$targetTable' ",
Expand Down Expand Up @@ -297,7 +298,7 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
}

def moveExtents(database: String, tmpTableName: String, targetTable: String, crp: ClientRequestProperties,
writeOptions: WriteOptions): Unit = {
writeOptions: WriteOptions, sinkStartTime: Instant): Unit = {
val extentsCountQuery = executeEngine(database, generateExtentsCountCommand(tmpTableName), crp).getPrimaryResults
extentsCountQuery.next()
val extentsCount = extentsCountQuery.getInt(0)
Expand All @@ -306,10 +307,10 @@ class ExtendedKustoClient(val engineKcsb: ConnectionStringBuilder, val ingestKcs
nodeCountQuery.next()
val nodeCount = nodeCountQuery.getInt(0)
moveExtentsWithRetries(Some(nodeCount * writeOptions.minimalExtentsCountForSplitMerge), extentsCount, database,
tmpTableName, targetTable, crp, writeOptions)
tmpTableName, targetTable, sinkStartTime, crp, writeOptions)
} else {
moveExtentsWithRetries(None, extentsCount, database,
tmpTableName, targetTable, crp, writeOptions)
tmpTableName, targetTable, sinkStartTime, crp, writeOptions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpec}

import java.time.Instant
import java.time.{Instant}
import java.time.temporal.ChronoUnit
import scala.collection.immutable
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -110,6 +110,7 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {
ingestByTags.add(tag)
val sp = new SparkIngestionProperties()
sp.ingestByTags = ingestByTags
sp.creationTime = DateTime.now()

dfOrig.write
.format("com.microsoft.kusto.spark.datasource")
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>3.1.11</revision>
<revision>3.1.12</revision>

<!-- Spark dependencies -->
<scala.version.major>2.11</scala.version.major>
Expand Down

0 comments on commit 264dfb3

Please sign in to comment.