Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Azure/azure-kusto-spark i…
Browse files Browse the repository at this point in the history
…nto 2.4
  • Loading branch information
asaharn committed Apr 3, 2023
2 parents bb42a2a + 6450ffe commit 2e94197
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 32 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ link your application with the artifact below to use the Azure Data Explorer Con
```
groupId = com.microsoft.azure.kusto
artifactId = kusto-spark_3.0_2.12
version = 3.1.13
version = 3.1.14
```

**In Maven**:

Look for the following coordinates:
```
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.13
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.14
```

Or clone this repository and build it locally to add it to your local maven repository,.
Expand All @@ -49,15 +49,15 @@ The jar can also be found under the [released package](https://github.com/Azure/
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-spark_3.0_2.12</artifactId>
<version>3.1.13</version>
<version>3.1.14</version>
</dependency>
```

**In SBT**:

```scala
libraryDependencies ++= Seq(
"com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "3.1.13"
"com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "3.1.14"
)
```

Expand All @@ -66,7 +66,7 @@ libraryDependencies ++= Seq(
Libraries -> Install New -> Maven -> copy the following coordinates:

```
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.13
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.14
```

#### Building Samples Module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ class TransientStorageParameters(val storageCredentials: scala.Array[TransientSt
override def toString: String = {
storageCredentials.map(tsc => tsc.toString).mkString("[",System.lineSeparator(),s", domain: $endpointSuffix]")
}

def toInsecureString: String = {
new ObjectMapper().setVisibility(JsonMethod.FIELD, Visibility.ANY)
.writerWithDefaultPrettyPrinter
.writeValueAsString(this)
}

}

final case class TransientStorageCredentials() {
Expand Down Expand Up @@ -81,6 +88,7 @@ final case class TransientStorageCredentials() {
override def toString: String = {
s"BlobContainer: $blobContainer ,Storage: $storageAccountName , IsSasKeyDefined: $sasDefined"
}

}

object TransientStorageParameters {
Expand All @@ -90,5 +98,5 @@ object TransientStorageParameters {
}

object TransientStorageCredentials {
private val SasPattern: Regex = raw"(?:https://)?([^.]+).blob.([^/]+)/([^?]+)?(.+)".r
private val SasPattern: Regex = raw"https:\/\/([^.]+).blob.([^\/]+)\/([^?]+)(\?.+)".r
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
import com.microsoft.kusto.spark.KustoTestUtils.KustoConnectionOptions
import com.microsoft.kusto.spark.common.KustoDebugOptions
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, SinkTableCreationMode, SparkIngestionProperties}
import com.microsoft.kusto.spark.datasource.{KustoSourceOptions, ReadMode}
import com.microsoft.kusto.spark.datasource.{KustoSourceOptions, ReadMode, TransientStorageCredentials, TransientStorageParameters}
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.{KustoQueryUtils, KustoDataSourceUtils => KDSU}
import org.apache.hadoop.util.ComparableVersion
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
import org.joda.time.DateTime
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpec}

import java.time.{Instant}
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util
import scala.collection.immutable
import scala.util.{Failure, Success, Try}
import scala.util.Random
Expand All @@ -41,6 +43,7 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {

private val loggingLevel = Option(System.getProperty("logLevel"))
private var kustoAdminClient: Option[Client] = None
private var maybeKustoIngestClient: Option[Client] = None
if (loggingLevel.isDefined) KDSU.setLoggingLevel(loggingLevel.get)
override def beforeAll(): Unit = {
super.beforeAll()
Expand All @@ -49,7 +52,11 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {
val engineKcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(KDSU.getEngineUrlFromAliasIfNeeded(kustoConnectionOptions.cluster),
kustoConnectionOptions.appId, kustoConnectionOptions.appKey, kustoConnectionOptions.authority)
kustoAdminClient = Some(ClientFactory.createClient(engineKcsb))

val ingestUrl = new StringBuffer(KDSU.getEngineUrlFromAliasIfNeeded(kustoConnectionOptions.cluster)).insert(8, "ingest-").toString
val ingestKcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
ingestUrl,
kustoConnectionOptions.appId, kustoConnectionOptions.appKey, kustoConnectionOptions.authority)
maybeKustoIngestClient = Some(ClientFactory.createClient(ingestKcsb))
Try(kustoAdminClient.get.execute(kustoConnectionOptions.database, generateAlterIngestionBatchingPolicyCommand(
"database",
kustoConnectionOptions.database,
Expand Down Expand Up @@ -105,7 +112,7 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {
KDSU.logInfo("e2e","running KustoConnector");
val crp = new ClientRequestProperties
crp.setTimeoutInMilliSec(2000)
val ingestByTags = new java.util.ArrayList[String]
val ingestByTags = new util.ArrayList[String]
val tag = "dammyTag"
ingestByTags.add(tag)
val sp = new SparkIngestionProperties()
Expand Down Expand Up @@ -136,39 +143,50 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {
KustoSinkOptions.KUSTO_AAD_APP_ID -> kustoConnectionOptions.appId,
KustoSinkOptions.KUSTO_AAD_APP_SECRET -> kustoConnectionOptions.appKey
)
validateRead(conf)
}

val minimalParquetWriterVersion: String = "3.3.0"
private def validateRead(conf: Map[String, String]) = {
val dfResult = spark.read.kusto(kustoConnectionOptions.cluster, kustoConnectionOptions.database, table, conf)
val orig = dfOrig.select("name", "value","dec").rdd.map(x => (x.getString(0), x.getInt(1), x.getDecimal(2))).collect().sortBy(_._2)
val result = dfResult.select("name", "value","dec").rdd.map(x => (x.getString(0), x.getInt(1),x.getDecimal(2))).collect().sortBy(_._2)
val orig = dfOrig.select("name", "value", "dec").rdd.map(x => (x.getString(0), x.getInt(1), x.getDecimal(2))).collect().sortBy(_._2)
val result = dfResult.select("name", "value", "dec").rdd.map(x => (x.getString(0), x.getInt(1), x.getDecimal(2))).collect().sortBy(_._2)
assert(orig.deep == result.deep)
}

"KustoSource" should "execute a read query on Kusto cluster in single mode" in {
val query: String = System.getProperty(KustoSourceOptions.KUSTO_QUERY, table)

val conf: Map[String, String] = Map(
KustoSourceOptions.KUSTO_READ_MODE -> ReadMode.ForceSingleMode.toString,
KustoSourceOptions.KUSTO_AAD_APP_ID -> kustoConnectionOptions.appId,
KustoSourceOptions.KUSTO_AAD_APP_SECRET -> kustoConnectionOptions.appKey
)

val df = spark.read.kusto(kustoConnectionOptions.cluster, kustoConnectionOptions.database, query, conf)
df.show()
validateRead(conf)
}

"KustoSource" should "execute a read query on Kusto cluster in distributed mode" in {
val query: String = System.getProperty(KustoSourceOptions.KUSTO_QUERY, table)
// val blobSas: String = System.getProperty("blobSas")
// TODO - get sas from DM and set it yourself
// val storage = new TransientStorageParameters(Array(new TransientStorageCredentials(blobSas)))

val conf: Map[String, String] = Map(
KustoSourceOptions.KUSTO_READ_MODE -> ReadMode.ForceDistributedMode.toString,
// KustoSourceOptions.KUSTO_TRANSIENT_STORAGE -> storage.toString,
KustoSourceOptions.KUSTO_AAD_APP_ID -> kustoConnectionOptions.appId,
KustoSourceOptions.KUSTO_AAD_APP_SECRET -> kustoConnectionOptions.appKey
)

spark.read.kusto(kustoConnectionOptions.cluster, kustoConnectionOptions.database, query, conf).show(20)
maybeKustoIngestClient match {
case Some(kustoIngestClient) =>
val storageWithKey = kustoIngestClient.execute(kustoConnectionOptions.database,
generateGetExportContainersCommand()).getPrimaryResults.getData.get(0).get(0).toString
KDSU.logError(myName, s"storageWithKey: $storageWithKey")

val storage = new TransientStorageParameters(Array(new TransientStorageCredentials(storageWithKey)))

val conf: Map[String, String] = Map(
KustoSourceOptions.KUSTO_READ_MODE -> ReadMode.ForceDistributedMode.toString,
KustoSourceOptions.KUSTO_TRANSIENT_STORAGE -> storage.toInsecureString,
KustoSourceOptions.KUSTO_AAD_APP_ID -> kustoConnectionOptions.appId,
KustoSourceOptions.KUSTO_AAD_APP_SECRET -> kustoConnectionOptions.appKey
)
val supportNewParquetWriter = new ComparableVersion(spark.version)
.compareTo(new ComparableVersion(minimalParquetWriterVersion)) > 0
supportNewParquetWriter match {
case true=> validateRead(conf)
case false=>
val dfResult = spark.read.kusto(kustoConnectionOptions.cluster, kustoConnectionOptions.database, table, conf)
assert(dfResult.count() == expectedNumberOfRows)
}
}
}

// TODO make this UT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

import java.security.InvalidParameterException

@RunWith(classOf[JUnitRunner])
class KustoSourceTests extends FlatSpec with MockFactory with Matchers with BeforeAndAfterAll {
private val loggingLevel: Option[String] = Option(System.getProperty("logLevel"))
Expand Down Expand Up @@ -100,6 +102,18 @@ class KustoSourceTests extends FlatSpec with MockFactory with Matchers with Befo
assert(params.sasDefined.equals(true))
}

"KustoDataSource" should "fail in parsing with no sas key" in {
val sas = "https://storage.blob.core.customDom/upload/"
assertThrows[InvalidParameterException] { new TransientStorageCredentials(sas) }
}

"KustoDataSource" should "fail in parsing with wrong sas url format" in {
val sas = "https://storage.blob.core.customDom/?<secret>"
assertThrows[InvalidParameterException] {
new TransientStorageCredentials(sas)
}
}

"KustoDataSource" should "match cluster default url pattern" in {
val ingestUrl = "https://ingest-ohbitton.dev.kusto.windows.net"
val engineUrl = "https://ohbitton.dev.kusto.windows.net"
Expand Down
5 changes: 4 additions & 1 deletion docs/KustoSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ deleting transient artifacts etc. KustoBlobStorageUtils module contains helper f
coordinates and account credentials, or a full SAS URL with write, read and list permissions once the corresponding RDD is no longer needed. Each transaction stores transient blob
artifacts in a separate directory. This directory is captured as part of read-transaction information logs reported on the Spark Driver node.

* ** KUSTO_TRANSIENT_STORAGE **:
* **KUSTO_TRANSIENT_STORAGE**:
'transientStorage' KustoSourceOptions.KUSTO_TRANSIENT_STORAGE -> new TransientStorageParameters(Array(new TransientStorageCredentials(blobSas)))

```python
transientStorage = "{ \"storageCredentials\" : [ { \
\"storageAccountName\": \"1jdldsdke2etestcluster01\",\
Expand All @@ -136,6 +137,8 @@ transientStorage = "{ \"storageCredentials\" : [ { \
...
option("transientStorage", transientStorage). \
```
>Note:
Creating `transientStorage` string by using `TransientStorageParameters.toString` will not work. Either it can be created manually or by using `TransientStorageParameters.toInsecureString`.

### Examples

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>3.1.13</revision>
<revision>3.1.14</revision>

<!-- Spark dependencies -->
<scala.version.major>2.11</scala.version.major>
Expand Down

0 comments on commit 2e94197

Please sign in to comment.