Skip to content

Commit

Permalink
write-straight (#80)
Browse files Browse the repository at this point in the history
* logs all

* logs all

* write straight to buffered writer

* write straight to buffered writer

* tests

* csvWriter

* csvWriter

* csvWriter

* naming

* fix write

* fix csv writing

* fix csv writing

* fix csv writing

* fix
  • Loading branch information
ohadbitt authored Aug 21, 2019
1 parent af1cf7e commit 657f383
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 185 deletions.
19 changes: 13 additions & 6 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<artifactId>jackson-core</artifactId>
<version>[2.9.6, 2.10.0)</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.version}</artifactId>
<version>[2.9.4,2.9.9]</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>adal4j</artifactId>
Expand Down Expand Up @@ -89,11 +94,6 @@
<artifactId>hadoop-azure</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.version}</artifactId>
<version>[2.9.4,2.9.9]</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
Expand Down Expand Up @@ -185,7 +185,6 @@
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
Expand Down Expand Up @@ -295,6 +294,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ trait KustoOptions {
val KEY_VAULT_APP_ID = "keyVaultAppId"
val KEY_VAULT_APP_KEY = "keyVaultAppKey"


// AAD application identifier of the client
val KUSTO_AAD_CLIENT_ID: String = newOption("kustoAADClientID")
// AAD authentication authority
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.microsoft.kusto.spark.datasink

import java.io.Writer

case class CountingCsvWriter(out: Writer) {
val newLineSep: String = java.security.AccessController.doPrivileged(
new sun.security.action.GetPropertyAction("line.separator"))
val newLineSepLength: Int = newLineSep.length
var bytsCounter: Long = 0L

def newLine(): Unit = {
out.write(newLineSep)
bytsCounter += newLineSepLength
}

def write(c: Char): Unit ={
out.write(c)
bytsCounter += 1
}
def write(str: String): Unit = {
out.write(str)
bytsCounter += str.length
}

def writeStringField(str: String, nested: Boolean) {
if (str.length > 0) {

out.write('"')
if(nested){
out.write('"')
bytsCounter += 2
}

bytsCounter += 2

for (c <- str) {
if (c == '"') {
out.write("\"\"")
bytsCounter += 1
}
else {
out.write(c)
}
}

out.write('"')
if(nested){
out.write('"')
}

bytsCounter += str.length
}
}

def getCounter: Long = bytsCounter

def resetCounter(): Unit = {
bytsCounter = 0
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ object KustoSinkOptions extends KustoOptions{
val KUSTO_WRITE_RESULT_LIMIT: String = newOption("writeResultLimit")

// A json representation of the SparkIngestionProperties object used for writing to Kusto
var KUSTO_SPARK_INGESTION_PROPERTIES_JSON: String = newOption("sparkIngestionPropertiesJson")
val KUSTO_SPARK_INGESTION_PROPERTIES_JSON: String = newOption("sparkIngestionPropertiesJson")

val KUSTO_OMIT_NULLS: String = newOption("omitNulls")

val NONE_RESULT_LIMIT = "none"
}
Expand All @@ -37,4 +39,5 @@ case class WriteOptions(tableCreateOptions: SinkTableCreationMode.SinkTableCreat
isAsync: Boolean = false,
writeResultLimit: String = KustoSinkOptions.NONE_RESULT_LIMIT,
timeZone: String = "UTC", timeout: FiniteDuration,
IngestionProperties: Option[String] = None)
IngestionProperties: Option[String] = None,
omitNulls: Boolean = true)
Loading

0 comments on commit 657f383

Please sign in to comment.