Skip to content

Commit

Permalink
Bug fix: distributed read could fail with Unsupported encoding: DELTA…
Browse files Browse the repository at this point in the history
…_BYTE_ARRAY (#281)

* Bug fix:
Disable new parquet writer on distributed reading if spark version is smaller than 3.3.0

* comments

Co-authored-by: Ohad Bitton <[email protected]>
  • Loading branch information
ohadbitt and ohbitton authored Jan 17, 2023
1 parent a8c2f0b commit 13f4235
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 19 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ link your application with the artifact below to use the Azure Data Explorer Con
```
groupId = com.microsoft.azure.kusto
artifactId = kusto-spark_3.0_2.12
version = 3.1.7
version = 3.1.10
```

**In Maven**:

Look for the following coordinates:
```
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.7
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.10
```

Or clone this repository and build it locally to add it to your local maven repository,.
Expand All @@ -49,15 +49,15 @@ The jar can also be found under the [released package](https://github.com/Azure/
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-spark_3.0_2.12</artifactId>
<version>3.1.7</version>
<version>3.1.10</version>
</dependency>
```

**In SBT**:

```scala
libraryDependencies ++= Seq(
"com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "3.1.7"
"com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "3.1.10"
)
```

Expand All @@ -66,7 +66,7 @@ libraryDependencies ++= Seq(
Libraries -> Install New -> Maven -> copy the following coordinates:

```
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.7
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.10
```

#### Building Samples Module
Expand Down
6 changes: 6 additions & 0 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- TODO Remove maven-artifact when moving to java 11 and use java.lang.module.ModuleDescriptor.Version -->
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-artifact</artifactId>
<version>3.8.7</version>
</dependency>

<!-- Test Scope -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer
import com.microsoft.kusto.spark.authentication.KustoAuthentication
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasource.ReadMode.ReadMode
import com.microsoft.kusto.spark.utils.{CslCommandsGenerator, KustoAzureFsSetupCache, KustoBlobStorageUtils, ExtendedKustoClient, KustoDataSourceUtils => KDSU}
import com.microsoft.kusto.spark.utils.{CslCommandsGenerator, ExtendedKustoClient, KustoAzureFsSetupCache, KustoBlobStorageUtils, KustoDataSourceUtils => KDSU}
import org.apache.hadoop.util.ComparableVersion
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.Filter
Expand Down Expand Up @@ -50,7 +51,12 @@ private[kusto] object KustoReader {
private val myName = this.getClass.getSimpleName
private val distributedReadModeTransientCache: concurrent.Map[DistributedReadModeTransientCacheKey,Seq[String]] =
new concurrent.TrieMap()

/*
A new native implementation of Parquet writer that uses new encoding schemes was rolled out on the ADX side. This uses delta byte array for strings and other byte array-based Parquet types (default in Parquet V2 which most modern parquet readers support by default).
To avoid breaking changes for applications, if the runtime is on a lower version than 3.3.0 of spark runtime we explicitly set the ADX export to not use the useNativeIngestion
TODO - add test
*/
private val minimalParquetWriterVersion = "3.3.0"
private[kusto] def singleBuildScan(kustoClient: ExtendedKustoClient,
request: KustoReadRequest,
filtering: KustoFiltering): RDD[Row] = {
Expand Down Expand Up @@ -229,13 +235,16 @@ private[kusto] class KustoReader(client: ExtendedKustoClient) {
directory: String,
options: KustoReadOptions,
filtering: KustoFiltering): Unit = {
val supportNewParquetWriter = new ComparableVersion(request.sparkSession.version)
.compareTo(new ComparableVersion(KustoReader.minimalParquetWriterVersion)) > 0
val exportCommand = CslCommandsGenerator.generateExportDataCommand(
query=KustoFilter.pruneAndFilter(request.schema, request.query, filtering),
directory=directory,
partitionId=partition.idx,
storageParameters=storage,
partitionPredicate=partition.predicate,
additionalExportOptions=options.additionalExportOptions
additionalExportOptions=options.additionalExportOptions,
supportNewParquetWriter=supportNewParquetWriter
)

val commandResult: KustoResultSetTable = client.executeEngine(request.kustoCoordinates.database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ object KustoSourceOptions extends KustoOptions {
val KUSTO_QUERY_FILTER_PUSH_DOWN: String = newOption("queryFilterPushDown")
// When a large dataset has to be exported with Kusto as a source (or) when forcing a distributed mode read (or) when
// query limits are hit the connector uses the export option to export data (.export data).With newer options being
// rolled-out for export, this additional parameter can be used as options for the export
// rolled-out for export, this additional parameter can be used as options for the export.
// Setting useNativeParquetWriter=true will fail for Spark versions < 3.3.0
val KUSTO_EXPORT_OPTIONS_JSON: String = newOption("kustoExportOptionsJson")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import java.time.Instant
import com.microsoft.kusto.spark.datasource.{TransientStorageCredentials, TransientStorageParameters}

private[kusto] object CslCommandsGenerator {
// We want to process these export options on a case to case basis. We want a default of snappy for compression
// and also want to ignore namePrefix because the files get exported with this name and then we need to read
// them with the same name in downstream processing.
private final val defaultKeySet = Set("compressionType","namePrefix","sizeLimit","compressed","async")
def generateFetchTableIngestByTagsCommand(table: String): String = {
s""".show table $table extents;
Expand Down Expand Up @@ -123,14 +120,18 @@ private[kusto] object CslCommandsGenerator {
s".show operations $operationId"
}

// Export data to blob
// Export data to blob container
// We want to process these export options on a case to case basis. We want a default of snappy for compression
// and also want to ignore namePrefix because the files get exported with this name and then we need to read
// them with the same name in downstream processing.
def generateExportDataCommand(
query: String,
directory: String,
partitionId: Int,
storageParameters: TransientStorageParameters,
partitionPredicate: Option[String] = None,
additionalExportOptions: Map[String,String] = Map.empty[String,String]
additionalExportOptions: Map[String,String] = Map.empty[String,String],
supportNewParquetWriter: Boolean = true
): String = {
val getFullUrlFromParams = (storage: TransientStorageCredentials) => {
val secretString = if (!storage.sasDefined) s""";" h@"${storage.storageAccountKey}"""" else if
Expand All @@ -148,10 +149,12 @@ private[kusto] object CslCommandsGenerator {
val compressionFormat = additionalExportOptions.getOrElse("compressionType", "snappy")
val namePrefix = s"${directory}part$partitionId"
val sizeLimitOverride = additionalExportOptions.get("sizeLimit").map(size => s"sizeLimit=${size.toLong * 1024 * 1024} ,").getOrElse("")
val nativeParquetString = additionalExportOptions.get("useNativeParquetWriter")
.map(b => s"useNativeParquetWriter=$b, ").getOrElse(if (!supportNewParquetWriter) "useNativeParquetWriter=false, " else "")

var command =
s""".export async $compress to parquet ("${storageParameters.storageCredentials.map(getFullUrlFromParams).reduce((s, s1) => s + ",\"" + s1)})""" +
s""" with ($sizeLimitOverride namePrefix="$namePrefix", compressionType="$compressionFormat"$additionalOptionsString) <| $query"""
s""" with ($sizeLimitOverride$nativeParquetString namePrefix="$namePrefix", compressionType="$compressionFormat"$additionalOptionsString) <| $query"""

command
}
Expand Down
8 changes: 5 additions & 3 deletions docs/KustoSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ If set to 'true', query executed on kusto cluster will include the filters.
'true' by default if KUSTO_DISTRIBUTED_READ_MODE_TRANSIENT_CACHE=false

* **KUSTO_EXPORT_OPTIONS_JSON**:
'kustoExportOptionsJson' - JSON that provides the list of [export options](https://learn.microsoft.com/azure/data-explorer/kusto/management/data-export/export-data-to-storage) in case of distributed read (either because of query limits getting hit or user request for ForceDistributed mode). The export options do not support the _OutputDataFormat_ which is defaulted to _parquet_, _namePrefix_ which is defaulted based on the
partition of the export. _compressionType_ is defaulted to snappy and the command also specifies _compressed_ (to create .snappy.gz files), to turn extra compression off - it can be set to _none_ (**not recommended**)
'kustoExportOptionsJson' - JSON that provides the list of [export options](https://learn.microsoft.com/azure/data-explorer/kusto/management/data-export/export-data-to-storage) in case of distributed read (either because of query limits getting hit or user request for ForceDistributed mode).
The export options do not support the _OutputDataFormat_ which is defaulted to _parquet_, _namePrefix_ which is a new directory specifically for the current read,
_compressionType_ is defaulted to snappy and the command also specifies _compressed_ (to create .snappy.gz files), to turn extra compression off - it can be set to _none_ (**not recommended**)
i.e .option("kustoExportOptionsJson", "{\"distribution\":\"per_node\"}")
*
>Note: Connector versions >= 3.1.10 will automatically set useNativeParquetWriter=false if Spark version < 3.3.0 as Kusto service uses now vectorized parquet writer introduced in this version. Do not set to true for lower versions as it will fail.
#### Transient Storage Parameters
When reading data from Kusto in 'distributed' mode, the data is exported from Kusto into a blob storage every time the corresponding RDD is materialized.
If the user doesn't specify storage parameters and a 'Distributed' read mode is chosen - the storage used will be provided by Kusto ingest service.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>3.1.9</revision>
<revision>3.1.10</revision>

<!-- Spark dependencies -->
<scala.version.major>2.12</scala.version.major>
Expand Down

0 comments on commit 13f4235

Please sign in to comment.