Skip to content

Commit

Permalink
Merge pull request ClickHouse#56810 from Avogar/iceberg-metadata-files
Browse files Browse the repository at this point in the history
Iceberg metadata files
  • Loading branch information
Avogar authored Nov 22, 2023
2 parents 3463d9a + 4e6f265 commit 9fb1acc
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ MutableColumns parseAvro(

/**
* Each version of table metadata is stored in a `metadata` directory and
* has format: v<V>.metadata.json, where V - metadata version.
* has one of 2 formats:
* 1) v<V>.metadata.json, where V - metadata version.
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
*/
std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuration & configuration)
{
Expand All @@ -322,7 +324,14 @@ std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuratio
for (const auto & path : metadata_files)
{
String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
String version_str(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.'));
String version_str;
/// v<V>.metadata.json
if (file_name.starts_with('v'))
version_str = String(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.'));
/// <V>-<random-uuid>.metadata.json
else
version_str = String(file_name.begin(), file_name.begin() + file_name.find_first_of('-'));

if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
metadata_files_with_versions.emplace_back(std::stoi(version_str), path);
Expand Down
34 changes: 34 additions & 0 deletions tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pytest
import time
import glob
import uuid
import os

from pyspark.sql.types import (
StructType,
Expand Down Expand Up @@ -515,3 +517,35 @@ def test_metadata_file_selection(started_cluster, format_version):
create_iceberg_table(instance, TABLE_NAME)

assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500


@pytest.mark.parametrize("format_version", ["1", "2"])
def test_metadata_file_format_with_uuid(started_cluster, format_version):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_metadata_selection_with_uuid_" + format_version

spark.sql(
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
)

for i in range(50):
spark.sql(
f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)"
)

for i in range(50):
os.rename(
f"/iceberg_data/default/{TABLE_NAME}/metadata/v{i + 1}.metadata.json",
f"/iceberg_data/default/{TABLE_NAME}/metadata/{str(i).zfill(5)}-{uuid.uuid4()}.metadata.json",
)

files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)

create_iceberg_table(instance, TABLE_NAME)

assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500

0 comments on commit 9fb1acc

Please sign in to comment.