Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Azure/azure-kusto-spark i…
Browse files Browse the repository at this point in the history
…nto 2.4
  • Loading branch information
asaharn committed Mar 29, 2023
2 parents 264dfb3 + 325236b commit bb42a2a
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 16 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ link your application with the artifact below to use the Azure Data Explorer Con
```
groupId = com.microsoft.azure.kusto
artifactId = kusto-spark_3.0_2.12
version = 3.1.12
version = 3.1.13
```

**In Maven**:

Look for the following coordinates:
```
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.12
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.13
```

Or clone this repository and build it locally to add it to your local maven repository,.
Expand All @@ -49,15 +49,15 @@ The jar can also be found under the [released package](https://github.com/Azure/
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-spark_3.0_2.12</artifactId>
<version>3.1.12</version>
<version>3.1.13</version>
</dependency>
```

**In SBT**:

```scala
libraryDependencies ++= Seq(
"com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "3.1.12"
"com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "3.1.13"
)
```

Expand All @@ -66,7 +66,7 @@ libraryDependencies ++= Seq(
Libraries -> Install New -> Maven -> copy the following coordinates:

```
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.12
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:3.1.13
```

#### Building Samples Module
Expand Down
41 changes: 41 additions & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<!-- BEGIN MICROSOFT SECURITY.MD V0.0.8 BLOCK -->

## Security

Microsoft takes the security of our software products and services seriously, which includes all source code repositories managed through our GitHub organizations, which include [Microsoft](https://github.com/microsoft), [Azure](https://github.com/Azure), [DotNet](https://github.com/dotnet), [AspNet](https://github.com/aspnet), [Xamarin](https://github.com/xamarin), and [our GitHub organizations](https://opensource.microsoft.com/).

If you believe you have found a security vulnerability in any Microsoft-owned repository that meets [Microsoft's definition of a security vulnerability](https://aka.ms/opensource/security/definition), please report it to us as described below.

## Reporting Security Issues

**Please do not report security vulnerabilities through public GitHub issues.**

Instead, please report them to the Microsoft Security Response Center (MSRC) at [https://msrc.microsoft.com/create-report](https://aka.ms/opensource/security/create-report).

If you prefer to submit without logging in, send email to [[email protected]](mailto:[email protected]). If possible, encrypt your message with our PGP key; please download it from the [Microsoft Security Response Center PGP Key page](https://aka.ms/opensource/security/pgpkey).

You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Additional information can be found at [microsoft.com/msrc](https://aka.ms/opensource/security/msrc).

Please include the requested information listed below (as much as you can provide) to help us better understand the nature and scope of the possible issue:

* Type of issue (e.g. buffer overflow, SQL injection, cross-site scripting, etc.)
* Full paths of source file(s) related to the manifestation of the issue
* The location of the affected source code (tag/branch/commit or direct URL)
* Any special configuration required to reproduce the issue
* Step-by-step instructions to reproduce the issue
* Proof-of-concept or exploit code (if possible)
* Impact of the issue, including how an attacker might exploit the issue

This information will help us triage your report more quickly.

If you are reporting for a bug bounty, more complete reports can contribute to a higher bounty award. Please visit our [Microsoft Bug Bounty Program](https://aka.ms/opensource/security/bounty) page for more details about our active programs.

## Preferred Languages

We prefer all communications to be in English.

## Policy

Microsoft follows the principle of [Coordinated Vulnerability Disclosure](https://aka.ms/opensource/security/cvd).

<!-- END MICROSOFT SECURITY.MD BLOCK -->
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.microsoft.kusto.spark.authentication

import com.microsoft.kusto.spark.utils.KustoConstants

import java.util.concurrent.Callable

trait KustoAuthentication {
Expand All @@ -10,7 +12,9 @@ trait KustoAuthentication {
case _ => false
}

override def hashCode(): Int = this.hashCode
override def toString: String = KustoConstants.EmptyString

override def hashCode(): Int = this.hashCode()
}

abstract class KeyVaultAuthentication(uri: String, authority: String) extends KustoAuthentication
Expand All @@ -23,7 +27,7 @@ case class AadApplicationAuthentication(ID: String, password: String, authority:
case _ => false
}

override def hashCode(): Int = ID.hashCode + (if (authority == null) 0 else (authority.hashCode))
override def hashCode(): Int = ID.hashCode + (if (authority == null) 0 else authority.hashCode)
}

case class ManagedIdentityAuthentication(clientId : Option[String]) extends KustoAuthentication {
Expand All @@ -49,7 +53,8 @@ case class AadApplicationCertificateAuthentication(appId: String, certFilePath:
override def hashCode(): Int = appId.hashCode + certFilePath.hashCode + certPassword.hashCode()
}

case class KeyVaultAppAuthentication(uri: String, keyVaultAppID: String, keyVaultAppKey: String, authority: String) extends KeyVaultAuthentication(uri, authority) {
final case class KeyVaultAppAuthentication(uri: String, keyVaultAppID: String, keyVaultAppKey: String, authority: String)
extends KeyVaultAuthentication(uri, authority) {
def canEqual(that: Any): Boolean = that.isInstanceOf[KeyVaultAppAuthentication]

override def equals(that: Any): Boolean = that match {
Expand All @@ -60,7 +65,8 @@ case class KeyVaultAppAuthentication(uri: String, keyVaultAppID: String, keyVaul
override def hashCode(): Int = uri.hashCode + keyVaultAppID.hashCode
}

case class KeyVaultCertificateAuthentication(uri: String, pemFilePath: String, pemFilePassword: String, authority: String) extends KeyVaultAuthentication(uri, authority) {
final case class KeyVaultCertificateAuthentication(uri: String, pemFilePath: String, pemFilePassword: String, authority: String)
extends KeyVaultAuthentication(uri, authority) {
def canEqual(that: Any): Boolean = that.isInstanceOf[KeyVaultCertificateAuthentication]

override def equals(that: Any): Boolean = that match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ class TransientStorageParameters(val storageCredentials: scala.Array[TransientSt
}

override def toString: String = {
new ObjectMapper().setVisibility(JsonMethod.FIELD, Visibility.ANY)
.writerWithDefaultPrettyPrinter
.writeValueAsString(this)
storageCredentials.map(tsc => tsc.toString).mkString("[",System.lineSeparator(),s", domain: $endpointSuffix]")
}
}

case class TransientStorageCredentials() {
final case class TransientStorageCredentials() {
var blobContainer: String = _
var storageAccountName: String = _
var storageAccountKey: String = _
Expand Down Expand Up @@ -79,6 +77,10 @@ case class TransientStorageCredentials() {
)
}
}

override def toString: String = {
s"BlobContainer: $blobContainer ,Storage: $storageAccountName , IsSasKeyDefined: $sasDefined"
}
}

object TransientStorageParameters {
Expand All @@ -88,5 +90,5 @@ object TransientStorageParameters {
}

object TransientStorageCredentials {
val SasPattern: Regex = raw"(?:https://)?([^.]+).blob.([^/]+)/([^?]+)?(.+)".r
private val SasPattern: Regex = raw"(?:https://)?([^.]+).blob.([^/]+)/([^?]+)?(.+)".r
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object KustoConstants {
val DefaultTimeoutQueueing: Int = TimeUnit.SECONDS.toMillis(40).toInt
val MaxIngestRetryAttempts = 2
val MaxCommandsRetryAttempts = 4
val EmptyString = ""
val DefaultMaximumIngestionTime: FiniteDuration = FiniteDuration.apply(
MaxIngestRetryAttempts * (DefaultExecutionQueueing + DefaultTimeoutQueueing) + 2000,"millis")
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.microsoft.kusto.spark

import com.microsoft.kusto.spark.datasource.{KustoSourceOptions, TransientStorageCredentials}
import com.microsoft.azure.kusto.data.ClientRequestProperties
import com.microsoft.kusto.spark.authentication.KustoAccessTokenAuthentication
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasource.{KustoRelation, KustoSourceOptions, TransientStorageCredentials, TransientStorageParameters}
import com.microsoft.kusto.spark.utils.KustoClientCache.ClusterAndAuth
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import org.apache.spark.SparkContext
Expand All @@ -11,6 +14,9 @@ import org.scalamock.scalatest.MockFactory
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

@RunWith(classOf[JUnitRunner])
class KustoSourceTests extends FlatSpec with MockFactory with Matchers with BeforeAndAfterAll {
private val loggingLevel: Option[String] = Option(System.getProperty("logLevel"))
Expand Down Expand Up @@ -68,6 +74,22 @@ class KustoSourceTests extends FlatSpec with MockFactory with Matchers with Befo
assert(df.schema.equals(expected))
}

"KustoDataSource" should "fail with credentials in plain text" in {
val ksr = KustoRelation(
KustoCoordinates(cluster, "", database, table = Option("tablename"), ingestionUrl = Option("")),
KustoAccessTokenAuthentication("token1"),
"",
KDSU.getReadParameters(Map[String, String](), null),
Duration(20, TimeUnit.SECONDS),
Option(""),
Option(new TransientStorageParameters(Array(new TransientStorageCredentials("https://storage.blob.core.windows.net/someplace-0?sp=r&st=2023-03-15T17:05:53Z&se=2023-03-16T01:05:53Z&spr=https&sv=2021-12-02&sr=c&sig=123456789")))),
Option(new ClientRequestProperties),
"reqid"
)(sqlContext.sparkSession)
assert(!ksr.toString.contains("token1"))
assert(ksr.toString.contains("[BlobContainer: someplace-0 ,Storage: storage , IsSasKeyDefined: true, domain: core.windows.net]"))
}

"KustoDataSource" should "parse sas" in {
val sas = "https://storage.blob.core.customDom/upload/?<secret>"
val params = new TransientStorageCredentials(sas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ class kustoAuthenticationTests extends FlatSpec {
assert(kata1 != kata2)
}

"KustoAuthentication Equals" should "Check token not getting printed" in {
val kata1 = KustoAccessTokenAuthentication("token1")
val kvaa11 = KeyVaultAppAuthentication("uri1", "appId1", "pass1", null)
val kvca11 = KeyVaultCertificateAuthentication("uri1", "path1", "pass1", null)

assert(kata1.toString == "")
assert(kvaa11.toString == "")
assert(kvca11.toString == "")
}

"KustoAccessTokenAuthentication Equals" should "Verify that different types of authentication won't equal" in {
val kvaa11 = KeyVaultAppAuthentication("uri1", "appId1", "pass1", null)
val kvca11 = KeyVaultCertificateAuthentication("uri1", "path1", "pass1", null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.microsoft.kusto.spark.datasource

import org.scalatest.Matchers.convertToAnyShouldWrapper
import org.scalatest.FlatSpec

import java.security.InvalidParameterException

class TransientStorageParametersTest extends FlatSpec {

"TransientStorageParameters ToString" should "check token not getting printed" in {
val transientStorage = "{\"storageCredentials\":[{\"storageAccountName\":\"ateststorage\"," +
"\"blobContainer\":\"kusto\",\"sasUrl\":\"https://ateststorage.blob.core.windows.net/kusto\"," +
"\"sasKey\":\"?sp=racwdlmeop&st=2020-03-15T04:26:19Z&se=2020-03-16T12:26:19Z&spr=https&sv=2019-12-02" +
"&sr=c&sig=xxxxxx\"},{\"storageAccountName\":\"ateststorage2\"," +
"\"blobContainer\":\"kusto2\",\"sasUrl\":\"https://ateststorage2.blob.core.windows.net/kusto2\"," +
"\"sasKey\":\"?sp=racwdlmeop&st=2020-03-15T04:26:19Z&se=2020-03-16T12:26:19Z&spr=https&sv=2019-12-02" +
"&sr=c&sig=yyyyyyyyy\"}],\"endpointSuffix\":\"core.windows.net\"}"
val transientStorageParameters = TransientStorageParameters.fromString(transientStorage)
val tsString = transientStorageParameters.toString()
transientStorageParameters.storageCredentials.length shouldEqual 2

tsString shouldEqual s"[BlobContainer: kusto ,Storage: ateststorage , IsSasKeyDefined: true${System.lineSeparator()}BlobContainer: kusto2 ,Storage: ateststorage2 , IsSasKeyDefined: true, domain: core.windows.net]"
}
"TransientStorageCredentials ToString" should "parse SAS and not print tokens " in {
val transientStorageCredentials = new TransientStorageCredentials("https://ateststorage2.blob.core.windows.net/kusto2" +
"?sp=racwdlmeop&st=2020-03-15T04:26:19Z&se=2020-03-16T12:26:19Z&spr=https&sv=2019-12-02&sr=c&sig=xxxxxx")
transientStorageCredentials.toString shouldEqual "BlobContainer: kusto2 ,Storage: ateststorage2 , IsSasKeyDefined: true"
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>3.1.12</revision>
<revision>3.1.13</revision>

<!-- Spark dependencies -->
<scala.version.major>2.11</scala.version.major>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cells":[{"cell_type":"code","source":["# Azure storage access info\n","blob_account_name = \"azureopendatastorage\"\n","blob_container_name = \"nyctlc\"\n","blob_relative_path = \"green\"\n","blob_sas_token = r\"\"\n","\n","# Allow SPARK to read from Blob remotely\n","wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)\n","spark.conf.set(\n"," 'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),\n"," blob_sas_token)\n","print('Remote blob path: ' + wasbs_path)\n","\n","# SPARK read parquet, note that it won't load any data yet by now\n","df = spark.read.parquet(wasbs_path)\n","# Display top 10 rows\n","print('Displaying top 10 rows: ')\n","df.printSchema"],"outputs":[],"execution_count":null,"metadata":{"cellStatus":"{\"Ramachandran G\":{\"queued_time\":\"2023-03-23T08:37:28.0477136Z\",\"session_start_time\":\"2023-03-23T08:37:28.3245362Z\",\"execution_start_time\":\"2023-03-23T08:37:41.7858145Z\",\"execution_finish_time\":\"2023-03-23T08:38:02.2179974Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}","collapsed":false}},{"cell_type":"code","source":["#The target where this data will be written to\r\n","import trident_token_library_wrapper\r\n","kustoUri = \"https://tridr7n160xjhpumzqnns9.z0.kusto.data.microsoft.com\"\r\n","database=\"Stocks\"\r\n","table=\"GreenTaxiData\""],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"cellStatus":"{\"Ramachandran G\":{\"queued_time\":\"2023-03-23T08:40:42.2507992Z\",\"session_start_time\":null,\"execution_start_time\":\"2023-03-23T08:40:42.7470834Z\",\"execution_finish_time\":\"2023-03-23T08:40:43.1656264Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}"}},{"cell_type":"code","source":["#This is an example of writing data to Kusto. The source data is read as a blob into a dataframe from Azure Open Data for GreenTaxi / Limousines in NYC.\r\n","#The access token is created using the user's credential and will be used to write the data to the Kusto table GreenTaxiData, therefore the user is required \r\n","#for 'user' privileges or above on the target database and table 'admin' privileges if the table already exists. If the table does not exist, \r\n","#it will be created with the DataFrame schema.\r\n","df.write.format(\"com.microsoft.kusto.spark.synapse.datasource\").\\\r\n","option(\"kustoCluster\",kustoUri).\\\r\n","option(\"kustoDatabase\",database).\\\r\n","option(\"kustoTable\", table).\\\r\n","option(\"accessToken\", trident_token_library_wrapper.PyTridentTokenLibrary.get_access_token(kustoUri)).\\\r\n","option(\"tableCreateOptions\", \"CreateIfNotExist\").mode(\"Append\").save()"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"cellStatus":"{\"Ramachandran G\":{\"queued_time\":\"2023-03-23T08:40:46.5885086Z\",\"session_start_time\":null,\"execution_start_time\":\"2023-03-23T08:40:46.965185Z\",\"execution_finish_time\":\"2023-03-23T08:49:19.0524624Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}"}},{"cell_type":"code","source":["#This is an example of Reading data from Kusto. Here the query retrieves the max,min fares and distances that the taxi recorded every month from the years 2014 to 2020\r\n","kustoQuery = \"GreenTaxiData | where puYear between (2014 .. 2020 ) | summarize MaxDistance=max(tripDistance) , MaxFare = max(fareAmount) ,MinDistance=min(tripDistance) , MinFare = min(fareAmount) by puYear,puMonth | order by puYear,puMonth desc\"\r\n","kustoDf = spark.read\\\r\n"," .format(\"com.microsoft.kusto.spark.synapse.datasource\")\\\r\n"," .option(\"accessToken\", trident_token_library_wrapper.PyTridentTokenLibrary.get_access_token(kustoUri))\\\r\n"," .option(\"kustoCluster\", kustoUri)\\\r\n"," .option(\"kustoDatabase\", database) \\\r\n"," .option(\"kustoQuery\", kustoQuery).load()"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"cellStatus":"{\"Ramachandran G\":{\"queued_time\":\"2023-03-23T08:49:29.7202163Z\",\"session_start_time\":null,\"execution_start_time\":\"2023-03-23T08:49:30.1748177Z\",\"execution_finish_time\":\"2023-03-23T08:49:30.6119991Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}"}},{"cell_type":"code","source":["kustoDf.show()"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"cellStatus":"{\"Ramachandran G\":{\"queued_time\":\"2023-03-23T08:49:36.5443089Z\",\"session_start_time\":null,\"execution_start_time\":\"2023-03-23T08:49:37.0371176Z\",\"execution_finish_time\":\"2023-03-23T08:49:42.6677216Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}"}}],"metadata":{"language_info":{"name":"python"},"widgets":{},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"kernel_info":{"name":"synapse_pyspark"},"description":"","save_output":true,"spark_compute":{"compute_id":"/trident/default","session_options":{"keepAliveTimeout":30,"enableDebugMode":false,"conf":{"spark.livy.synapse.ipythonInterpreter.enabled":"true"}}},"notebook_environment":{},"synapse_widget":{"version":"0.1","state":{}},"trident":{"lakehouse":{}}},"nbformat":4,"nbformat_minor":0}

0 comments on commit bb42a2a

Please sign in to comment.