-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Random duplicates in Kusto table after ingestion from Spark (Azure Databricks) #412
Comments
Hi everyone! TL;DR We need to cache() the final dataframe AND repartition it AND sort it before we try to ingest it into Kusto, just like this:
We also need to increase clientBatchingLimit from 100 to 1024. Detailed explanation: 1) Why partition and order When Spark fails a task (for example due to a network glitch, I/O error, or driver interruption), it will retry that same “slice” of work. However, depending on how the DataFrame is partitioned and read, the retry can end up re-reading or re-sending data in a slightly different way if the partitioning is not deterministic (for example, if it was based on a random split or has no deterministic ordering). If a chunk of data is resent to Kusto during a retry, but the connector logic does not recognize it’s already ingested, rows can appear twice in the Kusto table. That’s how random duplicates show up. Repartition ensures a deterministic grouping of rows (for example by date or some unique column). Sort within each partition ensures that the order of rows is also deterministic. Essentially, you end up with a stable partition layout that doesn’t shift between retries. 2) Why cache Without caching, every time a task retries, it re-reads from the underlying source (the Delta table on ADLS). If reading from ADLS is slow, or if there’s any chance the underlying data could look slightly different across reads (for instance, if something about the file listing or partition discovery changes on repeated reads), you can again end up with slightly different data in the tasks. 3) Why increase clientBatchingLimit I'm not sure here, but I guess that with default clientBatchingLimit==100 Spark might send many small chunks of data to Kusto. If one chunk fails halfway through, it might get retried, potentially causing duplicates. @ag-ramachandran Please correct me if I'm wrong. Hope it will help someone :) |
Describe the bug
I have external Delta table in Azure Databricks based on Azure ADLS Gen 2. I perform full ingestion from this table to the related Kusto table daily. Initially the data is being ingested into the temp Kusto table, and them I perform moving extents operation between the temp & target table in Kusto. During this operation the duplicates of some random rows appear in the target Kusto table, though Databricks table doesn't contains this duplicates. This specific duplicates rows can dissapeat after the re-run the job, but other ones can appear.
To Reproduce
This is the full code snippet:
Expected behavior
Target Kusto table contains exactly the same rows as Databricks source table.
Screenshots
![Image](https://private-user-images.githubusercontent.com/98310581/398176163-52633d3a-da21-4055-a9d3-024c673403d3.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkwNTQ5NzIsIm5iZiI6MTczOTA1NDY3MiwicGF0aCI6Ii85ODMxMDU4MS8zOTgxNzYxNjMtNTI2MzNkM2EtZGEyMS00MDU1LWE5ZDMtMDI0YzY3MzQwM2QzLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA4VDIyNDQzMlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWNlNDA4MjQ4NjM4ZWM2ZWJmZWUyNjVlMDI2M2IzYTgyOTMyMDI4NGIwZWJlNmJkNmM4YWQ4ODVlY2FiYTk1ZTkmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.S35SGNp-95Y3ls_ONCaFSwe39ymGZmwEuXcx7NNC6j4)
![Image](https://private-user-images.githubusercontent.com/98310581/398176483-8fe23c99-6842-42b5-825d-bc8b5e4b29a8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkwNTQ5NzIsIm5iZiI6MTczOTA1NDY3MiwicGF0aCI6Ii85ODMxMDU4MS8zOTgxNzY0ODMtOGZlMjNjOTktNjg0Mi00MmI1LTgyNWQtYmM4YjVlNGIyOWE4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA4VDIyNDQzMlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWRiNTRiN2IwNjE4MWVlY2JjM2Y1OTJkYmJlOWFkYmQ1MGI0ZjQwMGEyNmEzZWVlMTEzN2Y2NmFmZWNkNzk1YTEmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.iY-8UgJT_F5MdqeSyfgU6KCir-EqU5-4BeVxZRIuXwk)
![Image](https://private-user-images.githubusercontent.com/98310581/398177450-874c53b1-521e-4ba8-8431-be3173b3125d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkwNTQ5NzIsIm5iZiI6MTczOTA1NDY3MiwicGF0aCI6Ii85ODMxMDU4MS8zOTgxNzc0NTAtODc0YzUzYjEtNTIxZS00YmE4LTg0MzEtYmUzMTczYjMxMjVkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA4VDIyNDQzMlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTIzMGI0MGNmOGFiNWI3NzdmMTcwNTE2YTIwYmU0YTI1NTQxOTI1YWNhNWYwNDZlNDk0NjY5NGM4ZGRmODIyYmQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.4JJGJvE23kgYAKplXMyYZu1Yvs3hrJH_GBpOQ6_ejJM)
![Image](https://private-user-images.githubusercontent.com/98310581/398177758-f722b35e-3eba-404a-a9cc-2a6a22796af8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkwNTQ5NzIsIm5iZiI6MTczOTA1NDY3MiwicGF0aCI6Ii85ODMxMDU4MS8zOTgxNzc3NTgtZjcyMmIzNWUtM2ViYS00MDRhLWE5Y2MtMmE2YTIyNzk2YWY4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA4VDIyNDQzMlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUzNzEyNjIzOGIxM2Y4Njc5YTg5MzRmMzkzNGM3NTg3Zjk0NWU0ZDZiYjQ0N2NhMjZmYzU3MjdjOTljNzMwZGMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.TkJT3y0NG3M7IqDM3b7XK0duE0w1a7AY1pKCKqcJhAg)
Desktop (please complete the following information):
Additional context
The first 2 screenshots show that tables have different count of rows after ingestion.
The second 2 screenshots show an example of single row in Databricks table which has duplicates in target Kusto table after ingestion.
The text was updated successfully, but these errors were encountered: