Skip to content

Commit

Permalink
new clientRequestID per call
Browse files Browse the repository at this point in the history
  • Loading branch information
ohadbitt committed Jan 28, 2025
1 parent 6684d15 commit cebc2b5
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 70 deletions.
2 changes: 1 addition & 1 deletion connector/scalastyle_config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
<parameters>
<parameter name="maxLength"><![CDATA[70]]></parameter>
<parameter name="maxLength"><![CDATA[120]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ object FinalizeHelper {
tmpTableName,
allowMerge = false,
allowRebuild = false),
"alterMergePolicyCommand",
crp)
// Drop dedup tags
if (writeOptions.ensureNoDupBlobs) {
Expand All @@ -112,6 +113,7 @@ object FinalizeHelper {
crp,
writeOptions.timeout,
s"drops extents from temp table '$tmpTableName' ",
"extentsDrop",
writeOptions.requestId)
}
client.moveExtents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ object KustoWriter {
.executeEngine(
tableCoordinates.database,
generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get),
"schemaShow",
crp)
.getPrimaryResults

Expand Down Expand Up @@ -131,6 +132,7 @@ object KustoWriter {
.executeEngine(
tableCoordinates.database,
generateTableGetSchemaAsRowsCommand(writeOptions.userTempTableName.get),
"schemaShow",
crp)
.getPrimaryResults
.count() <= 0 ||
Expand Down Expand Up @@ -449,7 +451,7 @@ object KustoWriter {
streamingClient: ManagedStreamingIngestClient,
inputStreamLastIdx: Int): Unit = {
KDSU.retryApplyFunction(
() => {
i => {
val inputStream = new ByteArrayInputStream(bytes, 0, inputStreamLastIdx)
// The SDK will compress the stream by default.
val streamSourceInfo = new StreamSourceInfo(inputStream)
Expand All @@ -463,7 +465,8 @@ object KustoWriter {
s"details: ${ingestionStatus.details}, " +
s"activityId: ${ingestionStatus.activityId}, " +
s"errorCode: ${ingestionStatus.errorCode}, " +
s"errorCodeString: ${ingestionStatus.errorCodeString}" +
s"errorCodeString: ${ingestionStatus.errorCodeString}," +
s"retry: $i" +
"}")
})
case Failure(e: Throwable) =>
Expand Down Expand Up @@ -583,7 +586,7 @@ object KustoWriter {
}
// write the data here
val partitionsResult = KDSU.retryApplyFunction(
() => {
i => {
Try(
ingestClient.ingestFromBlob(
new BlobSourceInfo(blobUri + sas, size, UUID.randomUUID()),
Expand All @@ -599,7 +602,7 @@ object KustoWriter {
KDSU.reportExceptionAndThrow(
className,
e,
"Queueing blob for ingestion in partition " +
s"Queueing blob for ingestion, retry number '$i', in partition " +
s"$partitionIdString for requestId: '${parameters.writeOptions.requestId}")
val blobUrlWithSas =
s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private[kusto] object KustoReader {
.executeEngine(
request.kustoCoordinates.database,
filteredQuery,
"executeQuery",
request.clientRequestProperties.orNull)
.getPrimaryResults

Expand Down Expand Up @@ -329,6 +330,7 @@ private[kusto] class KustoReader(client: ExtendedKustoClient) {
.executeEngine(
request.kustoCoordinates.database,
exportCommand,
"exportPartitionToBlob",
request.clientRequestProperties.orNull)
.getPrimaryResults
KDSU.verifyAsyncCommandCompletion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ContainerProvider(

private def refresh(exportContainer: Boolean = false): ContainerAndSas = {
if (exportContainer) {
Try(client.executeDM(command, None, Some(retryConfigExportContainers))) match {
Try(client.executeDM(command, None, "refreshContainers", Some(retryConfigExportContainers))) match {
case Success(res) =>
val storage = res.getPrimaryResults.getData.asScala.map(row => {
val parts = row.get(0).toString.split('?')
Expand Down
Loading

0 comments on commit cebc2b5

Please sign in to comment.