Skip to content

Commit

Permalink
Export containers (#102)
Browse files Browse the repository at this point in the history
* use export containers
introduce READ_MODE to choose scale or lean mode

* use export storage

* use export storage

* Update KustoSourceE2E.scala

* fix

* multiple storage accounts

* multiple storage accounts

* fix build

* comments

* comments

* comment

* comment

* comment

* rename and docs

Co-authored-by: ohbitton <[email protected]>
  • Loading branch information
ohadbitt and ohbitton authored Feb 18, 2020
1 parent 8968772 commit d32812e
Show file tree
Hide file tree
Showing 20 changed files with 240 additions and 228 deletions.
1 change: 1 addition & 0 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<artifactId>azure-keyvault-webkey</artifactId>
<version>1.1</version>
</dependency>
<!--might need 2.7 here for local run-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ trait KustoOptions {

// An integer number corresponding to the period in seconds after which the operation will timeout. Default: '5400' (90 minutes)
val KUSTO_TIMEOUT_LIMIT: String = newOption("timeoutLimit")

object SinkTableCreationMode extends Enumeration {
type SinkTableCreationMode = Value
val CreateIfNotExist, FailIfNotExist = Value
}
}

case class KustoCoordinates(cluster: String, database: String, table: Option[String] = None)
Expand All @@ -56,11 +51,6 @@ private[kusto] object KustoDebugOptions {
name
}

// Reading method is determined internally by the connector
// This option allows to override connector heuristics and force a specific mode.
// Recommended to use only for debug and testing purposes
// Supported values: Empty string (""), 'lean' (direct query), 'scale' (via blob). Default: empty
val KUSTO_DBG_FORCE_READ_MODE: String = newOption("dbgForceReadMode")
// When reading via blob storage, compresses the data upon export from Kusto to Blob
// This feature is experimental, in order to measure performance impact w/wo compression
// Default: 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ object KustoSinkOptions extends KustoOptions{
// partition. Kusto's ingestion also aggregates data, default suggested by Kusto is 1GB but here we suggest to cut
// it at 100MB to adjust it to spark pulling of data.
val KUSTO_CLIENT_BATCHING_LIMIT: String = newOption("clientBatchingLimit")

}

object SinkTableCreationMode extends Enumeration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ object KustoWriter {
client: KustoClient): BlobWriteResource = {
val blobName = s"${tableCoordinates.database}_${tmpTableName}_${UUID.randomUUID.toString}_spark.csv.gz"

val containerAndSas = client.getNewTempBlobReference
val containerAndSas = client.getTempBlobForIngestion
val currentBlob = new CloudBlockBlob(new URI(containerAndSas.containerUrl + '/' + blobName + containerAndSas.sas))
val currentSas = containerAndSas.sas
val gzip: GZIPOutputStream = new GZIPOutputStream(currentBlob.openOutputStream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.microsoft.azure.kusto.data.ClientRequestProperties
import com.microsoft.kusto.spark.authentication.{KeyVaultAuthentication, KustoAuthentication}
import com.microsoft.kusto.spark.common.{KustoCoordinates, KustoDebugOptions}
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, KustoWriter}
import com.microsoft.kusto.spark.datasource.ReadMode.ReadMode
import com.microsoft.kusto.spark.utils.{KeyVaultUtils, KustoQueryUtils, KustoConstants => KCONST, KustoDataSourceUtils => KDSU}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
Expand Down Expand Up @@ -81,6 +82,7 @@ class DefaultSource extends CreatableRelationProvider
}

if (authenticationParameters.isEmpty) {
// Parse parameters if haven't got parsed before
val sourceParameters = KDSU.parseSourceParameters(parameters)
authenticationParameters = Some(sourceParameters.authenticationParameters)
kustoCoordinates = sourceParameters.kustoCoordinates
Expand Down Expand Up @@ -109,14 +111,20 @@ class DefaultSource extends CreatableRelationProvider
}

val timeout = new FiniteDuration(parameters.getOrElse(KustoSourceOptions.KUSTO_TIMEOUT_LIMIT, KCONST.defaultWaitingIntervalLongRunning).toLong, TimeUnit.SECONDS)
val readModeOption = parameters.get(KustoSourceOptions.KUSTO_READ_MODE)
val readMode: Option[ReadMode] = if (readModeOption.isDefined){
Some(ReadMode.withName(readModeOption.get))
} else {
None
}

KDSU.logInfo(myName, "Finished serializing parameters for reading")

KustoRelation(
kustoCoordinates,
kustoAuthentication.get,
parameters.getOrElse(KustoSourceOptions.KUSTO_QUERY, ""),
KustoReadOptions(parameters.getOrElse(KustoDebugOptions.KUSTO_DBG_FORCE_READ_MODE, ""), shouldCompressOnExport, exportSplitLimitMb),
KustoReadOptions(readMode, shouldCompressOnExport, exportSplitLimitMb),
timeout,
numPartitions,
parameters.get(KustoDebugOptions.KUSTO_PARTITION_COLUMN),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.microsoft.kusto.spark.datasource

import java.net.URI
import java.security.InvalidParameterException
import java.util.UUID

import com.microsoft.azure.kusto.data.{Client, ClientRequestProperties}
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey
import com.microsoft.azure.storage.blob.CloudBlobContainer
import com.microsoft.kusto.spark.authentication.KustoAuthentication
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasource.ReadMode.ReadMode
import com.microsoft.kusto.spark.utils.{CslCommandsGenerator, KustoAzureFsSetupCache, KustoBlobStorageUtils, KustoQueryUtils, KustoDataSourceUtils => KDSU}
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -37,16 +41,16 @@ private[kusto] case class KustoReadRequest(sparkSession: SparkSession,
timeout: FiniteDuration,
clientRequestProperties: Option[ClientRequestProperties])

private[kusto] case class KustoReadOptions(forcedReadMode: String = "",
private[kusto] case class KustoReadOptions(readMode: Option[ReadMode] = None,
shouldCompressOnExport: Boolean = true,
exportSplitLimitMb: Long = 1024)

private[kusto] object KustoReader {
private val myName = this.getClass.getSimpleName

private[kusto] def leanBuildScan(kustoClient: Client,
request: KustoReadRequest,
filtering: KustoFiltering): RDD[Row] = {
private[kusto] def singleBuildScan(kustoClient: Client,
request: KustoReadRequest,
filtering: KustoFiltering): RDD[Row] = {

val filteredQuery = KustoFilter.pruneAndFilter(request.schema, request.query, filtering)
val kustoResult = kustoClient.execute(request.kustoCoordinates.database,
Expand All @@ -57,12 +61,12 @@ private[kusto] object KustoReader {
request.sparkSession.createDataFrame(serializer.toRows, serializer.getSchema).rdd
}

private[kusto] def scaleBuildScan(kustoClient: Client,
request: KustoReadRequest,
storage: KustoStorageParameters,
partitionInfo: KustoPartitionParameters,
options: KustoReadOptions,
filtering: KustoFiltering): RDD[Row] = {
private[kusto] def distributedBuildScan(kustoClient: Client,
request: KustoReadRequest,
storage: Seq[KustoStorageParameters],
partitionInfo: KustoPartitionParameters,
options: KustoReadOptions,
filtering: KustoFiltering): RDD[Row] = {
KDSU.logInfo(myName, "Starting exporting data from Kusto to blob storage")

setupBlobAccess(request, storage)
Expand All @@ -81,12 +85,20 @@ private[kusto] object KustoReader {
request.clientRequestProperties)
}

KDSU.logInfo(myName, s"Finished exporting from Kusto to '${storage.account}/${storage.container}/$directory'" +
val directoryExists = (params: KustoStorageParameters) => {
val container = if (params.secretIsAccountKey) {
new CloudBlobContainer(new URI(s"https://${params.account}.blob.core.windows.net/${params.container}"), new StorageCredentialsAccountAndKey(params.account,params.secret))
} else {
new CloudBlobContainer(new URI(s"https://${params.account}.blob.core.windows.net/${params.container}?${params.secret}"))
}
container.getDirectoryReference(directory).listBlobsSegmented().getLength > 0
}
val paths = storage.filter(directoryExists).map(params => s"wasbs://${params.container}@${params.account}.blob.core.windows.net/$directory")
KDSU.logInfo(myName, s"Finished exporting from Kusto to '$paths'" +
s", will start parquet reading now")

val path = s"wasbs://${storage.container}@${storage.account}.blob.core.windows.net/$directory"
val rdd = try {
request.sparkSession.read.parquet(s"$path").rdd
request.sparkSession.read.parquet(paths:_*).rdd
} catch {
case ex: Exception =>
// Check whether the result is empty, causing an IO exception on reading empty parquet file
Expand All @@ -101,9 +113,7 @@ private[kusto] object KustoReader {
}
}

KDSU.logInfo(myName, "Transaction data written to blob storage account " +
storage.account + ", container " + storage.container + ", directory " + directory)

KDSU.logInfo(myName, "Transaction data written to blob storage, paths:" + paths)
rdd
}

Expand All @@ -117,22 +127,24 @@ private[kusto] object KustoReader {
}
}

private[kusto] def setupBlobAccess(request: KustoReadRequest, storage: KustoStorageParameters): Unit = {
private[kusto] def setupBlobAccess(request: KustoReadRequest, storageParameters: Seq[KustoStorageParameters]): Unit = {
val config = request.sparkSession.conf
val now = new DateTime(DateTimeZone.UTC)
if (storage.secretIsAccountKey) {
if (!KustoAzureFsSetupCache.updateAndGetPrevStorageAccountAccess(storage.account, storage.secret, now)) {
config.set(s"fs.azure.account.key.${storage.account}.blob.core.windows.net", s"${storage.secret}")
for(storage <- storageParameters) {
if (storage.secretIsAccountKey) {
if (!KustoAzureFsSetupCache.updateAndGetPrevStorageAccountAccess(storage.account, storage.secret, now)) {
config.set(s"fs.azure.account.key.${storage.account}.blob.core.windows.net", s"${storage.secret}")
}
}
}
else {
if (!KustoAzureFsSetupCache.updateAndGetPrevSas(storage.container, storage.account, storage.secret, now)) {
config.set(s"fs.azure.sas.${storage.container}.${storage.account}.blob.core.windows.net", s"${storage.secret}")
else {
if (!KustoAzureFsSetupCache.updateAndGetPrevSas(storage.container, storage.account, storage.secret, now)) {
config.set(s"fs.azure.sas.${storage.container}.${storage.account}.blob.core.windows.net", s"${storage.secret}")
}
}
}

if (!KustoAzureFsSetupCache.updateAndGetPrevNativeAzureFs(now)) {
config.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
if (!KustoAzureFsSetupCache.updateAndGetPrevNativeAzureFs(now)) {
config.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
}
}
}

Expand All @@ -156,14 +168,14 @@ private[kusto] object KustoReader {
}
}

private[kusto] class KustoReader(client: Client, request: KustoReadRequest, storage: KustoStorageParameters) {
private[kusto] class KustoReader(client: Client, request: KustoReadRequest, storage: Seq[KustoStorageParameters]) {
private val myName = this.getClass.getSimpleName

// Export a single partition from Kusto to transient Blob storage.
// Returns the directory path for these blobs
private[kusto] def exportPartitionToBlob(partition: KustoPartition,
request: KustoReadRequest,
storage: KustoStorageParameters,
storage: Seq[KustoStorageParameters],
directory: String,
options: KustoReadOptions,
filtering: KustoFiltering,
Expand All @@ -173,12 +185,9 @@ private[kusto] class KustoReader(client: Client, request: KustoReadRequest, stor

val exportCommand = CslCommandsGenerator.generateExportDataCommand(
KustoFilter.pruneAndFilter(request.schema, request.query, filtering),
storage.account,
storage.container,
directory,
storage.secret,
storage.secretIsAccountKey,
partition.idx,
storage,
partition.predicate,
limit,
isCompressed = options.shouldCompressOnExport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,46 +53,65 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val kustoClient = KustoClientCache.getClient(kustoCoordinates.cluster, authentication).engineClient
var timedOutCounting = false
var count = 0
try {
count = KDSU.estimateRowsCount(kustoClient, query, kustoCoordinates.database)
}catch {
// Assume count is high if estimation took more than 10 seconds
case _: Exception => timedOutCounting = true
}
if (count == 0 && !timedOutCounting) {
sparkSession.emptyDataFrame.rdd
val forceSingleMode = readOptions.readMode.isDefined && readOptions.readMode.get == ReadMode.ForceSingleMode
var useSingleMode = forceSingleMode
var res: Option[RDD[Row]] = None
if (readOptions.readMode.isEmpty){
var count = 0
try {
count = KDSU.estimateRowsCount(kustoClient, query, kustoCoordinates.database)
}catch {
// Assume count is high if estimation got timed out
case e: Exception =>
if (forceSingleMode) {
// Throw in case user forced LeanMode
throw e
}
// By default we fallback to distributed mode
timedOutCounting = true
}
if (count == 0 && !timedOutCounting) {
res = Some(sparkSession.emptyDataFrame.rdd)
} else {
// Use distributed mode if count is high or in case of a time out
useSingleMode = !(timedOutCounting || count > KustoConstants.directQueryUpperBoundRows)
}
}
else {
val useLeanMode = KDSU.shouldUseLeanReadMode(count, storageParameters.isDefined, readOptions.forcedReadMode, timedOutCounting)
var exception: Option[Exception] = None
var res: Option[RDD[Row]] = None
if (useLeanMode) {

var exception: Option[Exception] = None
if(res.isEmpty) {
if (useSingleMode) {
try {
res = Some(KustoReader.leanBuildScan(
res = Some(KustoReader.singleBuildScan(
kustoClient,
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout, clientRequestProperties),
KustoFiltering(requiredColumns, filters)))
} catch {
case ex: Exception => exception = Some(ex)
}
}
if(!useLeanMode || (exception.isDefined && storageParameters.isDefined)) {
res = Some(KustoReader.scaleBuildScan(
kustoClient,
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout, clientRequestProperties),
storageParameters.get,
KustoPartitionParameters(numPartitions, getPartitioningColumn, getPartitioningMode),
readOptions,
KustoFiltering(requiredColumns, filters))

if (!useSingleMode || exception.isDefined) {
if(exception.isDefined){
KDSU.logError("KustoRelation",s"Failed with lean mode, falling back to distributed mode. Exception : ${exception.get.getMessage}")
}
res = Some(KustoReader.distributedBuildScan(
kustoClient,
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout, clientRequestProperties),
if (storageParameters.isDefined) Seq(storageParameters.get) else
KustoClientCache.getClient(kustoCoordinates.cluster, authentication).getTempBlobsForExport,
KustoPartitionParameters(numPartitions, getPartitioningColumn, getPartitioningMode),
readOptions,
KustoFiltering(requiredColumns, filters))
)
}

if(res.isEmpty && exception.isDefined){
throw exception.get
} else {
res.get
}
}

res.get
}

private def getSchema: StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ object KustoSourceOptions extends KustoOptions {
val KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES: String = newOption("customSchema")

val KUSTO_QUERY: String = newOption("kustoQuery")
//TODO - impl retries?
val KUSTO_QUERY_RETRY_TIMES: String = newOption("kustoQueryRetryTimes")

// A json representation of the ClientRequestProperties object used for reading from Kusto
var KUSTO_CLIENT_REQUEST_PROPERTIES_JSON: String = newOption("clientRequestPropertiesJson")

// Blob Storage access parameters for source connector when working in 'scale' mode (read)
// Blob Storage access parameters for source connector when working in 'distributed' mode (read)
// These parameters will not be needed once we move to automatic blob provisioning

// Transient storage account when reading from Kusto
Expand All @@ -23,4 +24,19 @@ object KustoSourceOptions extends KustoOptions {
val KUSTO_BLOB_STORAGE_SAS_URL: String = newOption("blobStorageSasUrl")
// Blob container name
val KUSTO_BLOB_CONTAINER: String = newOption("blobContainer")

// By default an estimation of the rows count is first being made, if the count is lower than 5000 records a simple
// query is made, else - if storage params were provided they are used for 'distributed' reading and if not - the connector
// tries to use storage from the kusto ingest service.
// This option allows to override these connector heuristics.
// By default if the single mode was chosen and failed - there is a fallback to 'distributed' mode
// See https://docs.microsoft.com/en-us/azure/kusto/concepts/querylimits#limit-on-result-set-size-result-truncation
// for hard limit on query size using single mode
val KUSTO_READ_MODE: String = newOption("readMode")
}

object ReadMode extends Enumeration {
type ReadMode = Value
val ForceSingleMode, ForceDistributedMode = Value

}
Loading

0 comments on commit d32812e

Please sign in to comment.