Skip to content

Commit

Permalink
HIVE-28586: Iceberg: Support for write order in iceberg tables at CRE…
Browse files Browse the repository at this point in the history
…ATE TABLE (Zoltan Ratkai, reviewed by Ayush Saxena, Butao Zhang, Denys Kuzmenko, Shohei Okumiya)

Closes #5541
  • Loading branch information
zratkai authored Jan 21, 2025
1 parent 5146dba commit bc87c4d
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.Strings;

/**
* Class for catalog resolution and accessing the common functions for {@link Catalog} API.
Expand Down Expand Up @@ -140,15 +144,22 @@ public static Table createTable(Configuration conf, Properties props) {
Map<String, String> map = filterIcebergTableProperties(props);

Optional<Catalog> catalog = loadCatalog(conf, catalogName);

SortOrder sortOrder = getSortOrder(props, schema);
if (catalog.isPresent()) {
String name = props.getProperty(NAME);
Preconditions.checkNotNull(name, "Table identifier not set");
return catalog.get().createTable(TableIdentifier.parse(name), schema, spec, location, map);
return catalog.get().buildTable(TableIdentifier.parse(name), schema).withPartitionSpec(spec)
.withLocation(location).withProperties(map).withSortOrder(sortOrder).create();
}

Preconditions.checkNotNull(location, "Table location not set");
return new HadoopTables(conf).create(schema, spec, map, location);
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}

private static SortOrder getSortOrder(Properties props, Schema schema) {
String sortOrderJsonString = props.getProperty(TableProperties.DEFAULT_SORT_ORDER);
return Strings.isNullOrEmpty(sortOrderJsonString) ?
SortOrder.unsorted() : SortOrderParser.fromJson(schema, sortOrderJsonString);
}

/**
Expand Down Expand Up @@ -215,9 +226,9 @@ public static Table registerTable(Configuration conf, Properties props, String m
Preconditions.checkNotNull(name, "Table identifier not set");
return catalog.get().registerTable(TableIdentifier.parse(name), metadataLocation);
}

Preconditions.checkNotNull(location, "Table location not set");
return new HadoopTables(conf).create(schema, spec, map, location);
SortOrder sortOrder = getSortOrder(props, schema);
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}

public static void renameTable(Configuration conf, Properties props, TableIdentifier to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.mr.hive;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
Expand Down Expand Up @@ -53,6 +54,8 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.AcidUtils;
Expand Down Expand Up @@ -82,13 +85,17 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
Expand Down Expand Up @@ -116,6 +123,7 @@
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand All @@ -137,6 +145,7 @@

public class HiveIcebergMetaHook implements HiveMetaHook {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class);
private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
public static final Map<String, String> COMMON_HMS_PROPERTIES = ImmutableMap.of(
BaseMetastoreTableOperations.TABLE_TYPE_PROP, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()
);
Expand Down Expand Up @@ -271,6 +280,32 @@ public void preCreateTable(CreateTableRequest request) {
setOrcOnlyFilesParam(hmsTable);
// Remove hive primary key columns from table request, as iceberg doesn't support hive primary key.
request.setPrimaryKeys(null);
setSortOrder(hmsTable, schema, catalogProperties);
}

private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema,
Properties properties) {
String sortOderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
SortFields sortFields = null;
if (!Strings.isNullOrEmpty(sortOderJSONString)) {
try {
sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, SortFields.class);
} catch (Exception e) {
LOG.warn("Can not read write order json: {}", sortOderJSONString, e);
return;
}
if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
sortFields.getSortFields().forEach(fieldDesc -> {
NullOrder nullOrder = fieldDesc.getNullOrder() == SortFieldDesc.NullOrder.NULLS_FIRST ?
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ?
SortDirection.ASC : SortDirection.DESC;
sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
});
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOderBuilder.build()));
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask the totalSize value as it can have slight variability, causing test flakiness
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (id int, text string) stored by iceberg stored as orc;

insert into ice_orc values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a");

describe formatted ice_orc;
describe extended ice_orc;
set hive.fetch.task.conversion=more;
select * from ice_orc;

create table ice_orc_sorted (id int, text string) write locally ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc;

insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a");

describe formatted ice_orc_sorted;
describe extended ice_orc_sorted;
set hive.fetch.task.conversion=more;
select * from ice_orc_sorted;

drop table ice_orc;
drop table ice_orc_sorted;

Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
PREHOOK: query: create table ice_orc (id int, text string) stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_orc
POSTHOOK: query: create table ice_orc (id int, text string) stored by iceberg stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a")
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a")
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: describe formatted ice_orc
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@ice_orc
POSTHOOK: query: describe formatted ice_orc
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@ice_orc
# col_name data_type comment
id int
text string

# Detailed Table Information
Database: default
#### A masked pattern was here ####
Retention: 0
#### A masked pattern was here ####
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"text\":\"true\"}}
EXTERNAL TRUE
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"text\",\"required\":false,\"type\":\"string\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"9\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"9\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"1\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
format-version 2
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 1
numRows 9
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 1
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
#### A masked pattern was here ####
uuid #Masked#
write.delete.mode merge-on-read
write.format.default orc
write.merge.mode merge-on-read
write.update.mode merge-on-read

# Storage Information
SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe
InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: []
PREHOOK: query: describe extended ice_orc
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@ice_orc
POSTHOOK: query: describe extended ice_orc
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@ice_orc
id int
text string

#### A masked pattern was here ####
PREHOOK: query: select * from ice_orc
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
#### A masked pattern was here ####
POSTHOOK: query: select * from ice_orc
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
#### A masked pattern was here ####
3 3
2 2
4 4
5 5
1 1
2 3
3 NULL
2 NULL
NULL a
PREHOOK: query: create table ice_orc_sorted (id int, text string) write locally ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_orc_sorted
POSTHOOK: query: create table ice_orc_sorted (id int, text string) write locally ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_orc_sorted
PREHOOK: query: insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a")
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc_sorted
POSTHOOK: query: insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a")
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc_sorted
PREHOOK: query: describe formatted ice_orc_sorted
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@ice_orc_sorted
POSTHOOK: query: describe formatted ice_orc_sorted
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@ice_orc_sorted
# col_name data_type comment
id int
text string

# Detailed Table Information
Database: default
#### A masked pattern was here ####
Retention: 0
#### A masked pattern was here ####
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"text\":\"true\"}}
EXTERNAL TRUE
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"text\",\"required\":false,\"type\":\"string\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"9\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"9\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"1\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
default-sort-order {\"order-id\":1,\"fields\":[{\"transform\":\"identity\",\"source-id\":1,\"direction\":\"desc\",\"null-order\":\"nulls-first\"},{\"transform\":\"identity\",\"source-id\":2,\"direction\":\"asc\",\"null-order\":\"nulls-last\"}]}
format-version 2
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 1
numRows 9
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 1
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
#### A masked pattern was here ####
uuid #Masked#
write.delete.mode merge-on-read
write.format.default orc
write.merge.mode merge-on-read
write.update.mode merge-on-read

# Storage Information
SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe
InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: [FieldSchema(name:id, type:int, comment:Transform: identity, Sort direction: DESC, Null sort order: NULLS_FIRST), FieldSchema(name:text, type:string, comment:Transform: identity, Sort direction: ASC, Null sort order: NULLS_LAST)]
PREHOOK: query: describe extended ice_orc_sorted
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@ice_orc_sorted
POSTHOOK: query: describe extended ice_orc_sorted
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@ice_orc_sorted
id int
text string

#### A masked pattern was here ####
PREHOOK: query: select * from ice_orc_sorted
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc_sorted
#### A masked pattern was here ####
POSTHOOK: query: select * from ice_orc_sorted
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc_sorted
#### A masked pattern was here ####
NULL a
5 5
4 4
3 3
3 NULL
2 2
2 3
2 NULL
1 1
PREHOOK: query: drop table ice_orc
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@ice_orc
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_orc
POSTHOOK: query: drop table ice_orc
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_orc
PREHOOK: query: drop table ice_orc_sorted
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@ice_orc_sorted
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_orc_sorted
POSTHOOK: query: drop table ice_orc_sorted
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@ice_orc_sorted
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_orc_sorted
Loading

0 comments on commit bc87c4d

Please sign in to comment.