Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random duplicates in Kusto table after ingestion from Spark (Azure Databricks) #412

Open
edobrynin-dodo opened this issue Dec 23, 2024 · 1 comment

Comments

@edobrynin-dodo
Copy link

Describe the bug
I have external Delta table in Azure Databricks based on Azure ADLS Gen 2. I perform full ingestion from this table to the related Kusto table daily. Initially the data is being ingested into the temp Kusto table, and them I perform moving extents operation between the temp & target table in Kusto. During this operation the duplicates of some random rows appear in the target Kusto table, though Databricks table doesn't contains this duplicates. This specific duplicates rows can dissapeat after the re-run the job, but other ones can appear.

To Reproduce
This is the full code snippet:

import json
import time
from pyspark.sql import SparkSession, functions as F
from pyspark.dbutils import DBUtils
from typing import Dict, Type

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoError
from pyspark.sql import DataFrame
from pyspark.sql.types import (
    ArrayType,
    BinaryType,
    BooleanType,
    ByteType,
    DataType,
    DateType,
    DecimalType,
    DoubleType,
    FloatType,
    IntegerType,
    LongType,
    MapType,
    ShortType,
    StringType,
    StructType,
    TimestampType,
)

reader_options = {}
source_path = "abfss://path_to_source_table"
kusto_connection_string = "azure_secret_with_connection"
kusto_table = "metrics_MP_CR_to_Menu"
table_name = source_path.split("/")[-1]

ingestion_mode = None
clientBatchingLimit = None

SparkTypeToKustoTypeMap: Dict[Type, str] = {
    StringType: "string",
    BooleanType: "bool",
    DateType: "datetime",
    TimestampType: "datetime",
    DecimalType: "decimal",
    DoubleType: "real",
    FloatType: "real",
    ByteType: "int",
    BinaryType: "string",
    IntegerType: "int",
    LongType: "long",
    ShortType: "int",
}


def create_kusto_client(kusto_conf: dict):
    kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
        kusto_conf["kustoCluster"],
        kusto_conf["kustoAadAppId"],
        kusto_conf["kustoAadAppSecret"],
        kusto_conf["kustoAadAuthorityID"],
    )
    return KustoClient(kcsb)


def replace_content(client, database, target_table, source_table):
    params = {"target_table": target_table, "source_table": source_table}
    query = """.replace extents in table {target_table} <|
             {{ .show table {target_table} extents }},
             {{ .show table {source_table} extents }}""".format(**params)

    client.execute(database, query)


def table_exists(client, database, table_name):
    try:
        res = client.execute(database, ".show tables")[0]
    except KustoError:
        return False

    tables = {table[0] for table in res}
    return table_name in tables


def drop_table(client: KustoClient, database: str, table_name: str):
    query = f".drop table {table_name} ifexists"
    client.execute(database, query)


def create_table(
    client: KustoClient,
    database: str,
    source: DataFrame,
    table_name: str,
    hidden: bool = False,
):
    table_schema_builder = []
    for field in source.schema.fields:
        field_type = get_spark_type_to_kusto_type_map(field.dataType)
        table_schema_builder.append(f"['{field.name}']:{field_type}")

    tmp_table_schema = ",".join(table_schema_builder)
    query = f".create table {table_name} ({tmp_table_schema})"
    if hidden:
        query += "with(hidden=true)"
    client.execute(database, query)


def get_spark_type_to_kusto_type_map(field_type: DataType):
    if isinstance(field_type, DecimalType):
        return "decimal"
    if isinstance(field_type, (ArrayType, StructType, MapType)):
        return "dynamic"
    return SparkTypeToKustoTypeMap[type(field_type)]


def get_max_sink_timestamp(client, database, table_name, overlap_column):
    query = f""" {table_name}
              | summarize LatestRecordTimestamp=max({overlap_column})
          """
    try:
        res = client.execute(database, query)
        return res.primary_results[0][0][0]
    except KustoError:
        return None


def get_kusto_options():
    dbutils = DBUtils(spark)
    connection = json.loads(dbutils.secrets.get(scope="keyvault", key=kusto_connection_string))

    return {
        "kustoCluster": connection["cluster"],
        "kustoDatabase": connection["database"],
        "kustoTable": "metrics_MP_CR_to_Menu",
        "kustoAadAppId": connection["aadAppId"],
        "kustoAadAppSecret": connection["aadAppSecret"],
        "kustoAadAuthorityID": connection["aadAuthorityID"],
        "clientBatchingLimit": clientBatchingLimit if clientBatchingLimit else "100",
        "writeEnableAsync": False,
        "tableCreateOptions": "CreateIfNotExist"
    }


spark = (
    SparkSession.builder.appName("serving.metrics_MP_CR_to_Menu")
    .getOrCreate()
)

options = get_kusto_options()
database = options["kustoDatabase"]
origin_table = options["kustoTable"]


try:
    print("Job has been started.")

    client = create_kusto_client(options)
    temp_table = f"_temp_{origin_table}_{int(time.time())}"

    source = (
        spark
        .read
        .format("delta")
        .options(**reader_options)
        .load(source_path)
        .withColumn("LoadDateTime", F.current_timestamp())
        .withColumn("LoadDate", F.current_date())
    )

    create_table(client, database, source, temp_table, hidden=True)

    (
        source
        .write
        .format("com.microsoft.kusto.spark.datasource")
        .options(**options)
        .option("kustoTable", temp_table)
        .mode("Append")
        .save()
    )

    drop_table(client, database, origin_table)
    create_table(client, database, source, origin_table)

    replace_content(client, database, origin_table, temp_table)
    drop_table(client, database, temp_table)

except KeyboardInterrupt:
    print("Job was cancelled by the user or because of re-deployment")
except BaseException as exc:
    import traceback
    tbc = traceback.format_exc()
    print("Job failed with an error:\n%s\n\n%s", exc, tbc)
    raise exc

Expected behavior
Target Kusto table contains exactly the same rows as Databricks source table.

Screenshots
Image
Image
Image
Image

Desktop (please complete the following information):

  • Databricks cluster: Driver: Standard_D4ads_v5 · Workers: Standard_D4ads_v5 · 1-8 workers · DBR: 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)
  • kusto-spark_3.0_2.12-5.0.6-jar-with-dependencies.jar
  • azure-kusto-data==4.2.0

Additional context
The first 2 screenshots show that tables have different count of rows after ingestion.
The second 2 screenshots show an example of single row in Databricks table which has duplicates in target Kusto table after ingestion.

@edobrynin-dodo
Copy link
Author

edobrynin-dodo commented Jan 30, 2025

Hi everyone!
Here is a workaround which helps to fix this issue.

TL;DR

We need to cache() the final dataframe AND repartition it AND sort it before we try to ingest it into Kusto, just like this:

cachedDataFrame=source.repartition(col("LoadDate")).orderBy(col("LoadDateTime")).cache()

We also need to increase clientBatchingLimit from 100 to 1024.

Detailed explanation:

1) Why partition and order

When Spark fails a task (for example due to a network glitch, I/O error, or driver interruption), it will retry that same “slice” of work. However, depending on how the DataFrame is partitioned and read, the retry can end up re-reading or re-sending data in a slightly different way if the partitioning is not deterministic (for example, if it was based on a random split or has no deterministic ordering).

If a chunk of data is resent to Kusto during a retry, but the connector logic does not recognize it’s already ingested, rows can appear twice in the Kusto table. That’s how random duplicates show up.

Repartition ensures a deterministic grouping of rows (for example by date or some unique column).
Each partition is turned into a chunk of data that is always the same set of rows whenever Spark processes that partition.

Sort within each partition ensures that the order of rows is also deterministic.
This means that if Spark fails and retries that chunk, it will read and send precisely the same records in the same order, so ingestion is more predictable.

Essentially, you end up with a stable partition layout that doesn’t shift between retries.

2) Why cache

Without caching, every time a task retries, it re-reads from the underlying source (the Delta table on ADLS). If reading from ADLS is slow, or if there’s any chance the underlying data could look slightly different across reads (for instance, if something about the file listing or partition discovery changes on repeated reads), you can again end up with slightly different data in the tasks.
Caching the DataFrame in memory (or on disk, if memory is not sufficient) means that after the first read/transform/partitioning/sorting step, Spark can serve the exact same data from the cache if a task has to be retried. It’s effectively guaranteeing the data is stable, which eliminates any chance that a new read might produce subtly different sets or order of rows.
But of you cache() it - Spark can serve the exact same data from the cache after the first read/transform/partitioning/sorting step, if a task has to be retried. It’s effectively guaranteeing the data is stable, which eliminates any chance that a new read might produce subtly different sets or order of rows

3) Why increase clientBatchingLimit

I'm not sure here, but I guess that with default clientBatchingLimit==100 Spark might send many small chunks of data to Kusto. If one chunk fails halfway through, it might get retried, potentially causing duplicates.
A larger batching limit (1024 MB) makes it less likely that a single batch fails mid-send or gets chunked up in unpredictable ways.

@ag-ramachandran Please correct me if I'm wrong.

Hope it will help someone :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant