diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java index f2bd5b5f999c..245c3c0212df 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java @@ -30,13 +30,17 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.parquet.Strings; /** * Class for catalog resolution and accessing the common functions for {@link Catalog} API. @@ -140,15 +144,22 @@ public static Table createTable(Configuration conf, Properties props) { Map map = filterIcebergTableProperties(props); Optional catalog = loadCatalog(conf, catalogName); - + SortOrder sortOrder = getSortOrder(props, schema); if (catalog.isPresent()) { String name = props.getProperty(NAME); Preconditions.checkNotNull(name, "Table identifier not set"); - return catalog.get().createTable(TableIdentifier.parse(name), schema, spec, location, map); + return catalog.get().buildTable(TableIdentifier.parse(name), schema).withPartitionSpec(spec) + .withLocation(location).withProperties(map).withSortOrder(sortOrder).create(); } Preconditions.checkNotNull(location, "Table location not set"); - return new HadoopTables(conf).create(schema, spec, map, location); + return new HadoopTables(conf).create(schema, spec, sortOrder, map, location); + } + + private static SortOrder getSortOrder(Properties props, Schema schema) { + String sortOrderJsonString = props.getProperty(TableProperties.DEFAULT_SORT_ORDER); + return Strings.isNullOrEmpty(sortOrderJsonString) ? + SortOrder.unsorted() : SortOrderParser.fromJson(schema, sortOrderJsonString); } /** @@ -215,9 +226,9 @@ public static Table registerTable(Configuration conf, Properties props, String m Preconditions.checkNotNull(name, "Table identifier not set"); return catalog.get().registerTable(TableIdentifier.parse(name), metadataLocation); } - Preconditions.checkNotNull(location, "Table location not set"); - return new HadoopTables(conf).create(schema, spec, map, location); + SortOrder sortOrder = getSortOrder(props, schema); + return new HadoopTables(conf).create(schema, spec, sortOrder, map, location); } public static void renameTable(Configuration conf, Properties props, TableIdentifier to) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index b82143954d69..b2a8080e56a1 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -19,6 +19,7 @@ package org.apache.iceberg.mr.hive; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; @@ -53,6 +54,8 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc; +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -82,6 +85,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; @@ -89,6 +93,9 @@ import org.apache.iceberg.PartitionsTable; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; @@ -116,6 +123,7 @@ import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -137,6 +145,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); + private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper(); public static final Map COMMON_HMS_PROPERTIES = ImmutableMap.of( BaseMetastoreTableOperations.TABLE_TYPE_PROP, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase() ); @@ -271,6 +280,32 @@ public void preCreateTable(CreateTableRequest request) { setOrcOnlyFilesParam(hmsTable); // Remove hive primary key columns from table request, as iceberg doesn't support hive primary key. request.setPrimaryKeys(null); + setSortOrder(hmsTable, schema, catalogProperties); + } + + private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema, + Properties properties) { + String sortOderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER); + SortFields sortFields = null; + if (!Strings.isNullOrEmpty(sortOderJSONString)) { + try { + sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, SortFields.class); + } catch (Exception e) { + LOG.warn("Can not read write order json: {}", sortOderJSONString, e); + return; + } + if (sortFields != null && !sortFields.getSortFields().isEmpty()) { + SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema); + sortFields.getSortFields().forEach(fieldDesc -> { + NullOrder nullOrder = fieldDesc.getNullOrder() == SortFieldDesc.NullOrder.NULLS_FIRST ? + NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; + SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ? + SortDirection.ASC : SortDirection.DESC; + sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder); + }); + properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOderBuilder.build())); + } + } } @Override diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_ordered_table.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_ordered_table.q new file mode 100644 index 000000000000..3ababa1ab4e1 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_ordered_table.q @@ -0,0 +1,44 @@ +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ +-- Mask the totalSize value as it can have slight variability, causing test flakiness +--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/ +-- Mask random uuid +--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ +-- Mask a random snapshot id +--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/ +-- Mask added file size +--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask total file size +--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +-- Mask iceberg version +--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +set hive.llap.io.enabled=true; +set hive.vectorized.execution.enabled=true; +set hive.optimize.shared.work.merge.ts.schema=true; + +create table ice_orc (id int, text string) stored by iceberg stored as orc; + +insert into ice_orc values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a"); + +describe formatted ice_orc; +describe extended ice_orc; +set hive.fetch.task.conversion=more; +select * from ice_orc; + +create table ice_orc_sorted (id int, text string) write locally ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc; + +insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a"); + +describe formatted ice_orc_sorted; +describe extended ice_orc_sorted; +set hive.fetch.task.conversion=more; +select * from ice_orc_sorted; + +drop table ice_orc; +drop table ice_orc_sorted; + diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_ordered_table.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_ordered_table.q.out new file mode 100644 index 000000000000..391e5b12a02d --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_ordered_table.q.out @@ -0,0 +1,207 @@ +PREHOOK: query: create table ice_orc (id int, text string) stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc +POSTHOOK: query: create table ice_orc (id int, text string) stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: describe formatted ice_orc +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc +POSTHOOK: query: describe formatted ice_orc +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc +# col_name data_type comment +id int +text string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"text\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"text\",\"required\":false,\"type\":\"string\"}]} + current-snapshot-id #Masked# + current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"9\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"9\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"1\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} + current-snapshot-timestamp-ms #Masked# + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 1 + numRows 9 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 1 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [] +PREHOOK: query: describe extended ice_orc +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc +POSTHOOK: query: describe extended ice_orc +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc +id int +text string + +#### A masked pattern was here #### +PREHOOK: query: select * from ice_orc +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +3 3 +2 2 +4 4 +5 5 +1 1 +2 3 +3 NULL +2 NULL +NULL a +PREHOOK: query: create table ice_orc_sorted (id int, text string) write locally ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc_sorted +POSTHOOK: query: create table ice_orc_sorted (id int, text string) write locally ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc_sorted +PREHOOK: query: insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc_sorted +POSTHOOK: query: insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc_sorted +PREHOOK: query: describe formatted ice_orc_sorted +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc_sorted +POSTHOOK: query: describe formatted ice_orc_sorted +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc_sorted +# col_name data_type comment +id int +text string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"text\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"text\",\"required\":false,\"type\":\"string\"}]} + current-snapshot-id #Masked# + current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"9\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"9\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"1\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} + current-snapshot-timestamp-ms #Masked# + default-sort-order {\"order-id\":1,\"fields\":[{\"transform\":\"identity\",\"source-id\":1,\"direction\":\"desc\",\"null-order\":\"nulls-first\"},{\"transform\":\"identity\",\"source-id\":2,\"direction\":\"asc\",\"null-order\":\"nulls-last\"}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 1 + numRows 9 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 1 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [FieldSchema(name:id, type:int, comment:Transform: identity, Sort direction: DESC, Null sort order: NULLS_FIRST), FieldSchema(name:text, type:string, comment:Transform: identity, Sort direction: ASC, Null sort order: NULLS_LAST)] +PREHOOK: query: describe extended ice_orc_sorted +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc_sorted +POSTHOOK: query: describe extended ice_orc_sorted +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc_sorted +id int +text string + +#### A masked pattern was here #### +PREHOOK: query: select * from ice_orc_sorted +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc_sorted +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc_sorted +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc_sorted +#### A masked pattern was here #### +NULL a +5 5 +4 4 +3 3 +3 NULL +2 2 +2 3 +2 NULL +1 1 +PREHOOK: query: drop table ice_orc +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ice_orc +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc +POSTHOOK: query: drop table ice_orc +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ice_orc +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc +PREHOOK: query: drop table ice_orc_sorted +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ice_orc_sorted +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc_sorted +POSTHOOK: query: drop table ice_orc_sorted +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ice_orc_sorted +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc_sorted diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index d39f90c34e68..d7c686fb6d5b 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -419,6 +419,7 @@ iceberg.llap.query.files=\ iceberg_bucket_map_join_7.q,\ iceberg_bucket_map_join_8.q,\ iceberg_clustered.q,\ + iceberg_create_locally_ordered_table.q,\ iceberg_merge_delete_files.q,\ iceberg_merge_files.q,\ llap_iceberg_read_orc.q,\ @@ -455,6 +456,7 @@ iceberg.llap.only.query.files=\ iceberg_bucket_map_join_7.q,\ iceberg_bucket_map_join_8.q,\ iceberg_clustered.q,\ + iceberg_create_locally_ordered_table.q,\ iceberg_merge_delete_files.q,\ iceberg_merge_files.q,\ llap_iceberg_read_orc.q,\ diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/CreateDDLParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/CreateDDLParser.g index 97f04f8dc1f5..70b20472ff8e 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/CreateDDLParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/CreateDDLParser.g @@ -64,6 +64,7 @@ createTableStatement tableComment? createTablePartitionSpec? tableBuckets? + tableWriteLocallyOrdered? tableSkewed? tableRowFormat? tableFileFormat? @@ -77,6 +78,7 @@ createTableStatement tableComment? createTablePartitionSpec? tableBuckets? + tableWriteLocallyOrdered? tableSkewed? tableRowFormat? tableFileFormat? @@ -94,6 +96,7 @@ createTableStatement tableComment? createTablePartitionSpec? tableBuckets? + tableWriteLocallyOrdered? tableSkewed? tableRowFormat? tableFileFormat? @@ -107,6 +110,7 @@ createTableStatement tableComment? createTablePartitionSpec? tableBuckets? + tableWriteLocallyOrdered? tableSkewed? tableRowFormat? tableFileFormat? diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g index 078cd561b1b4..3d8b4ab7741a 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g @@ -37,6 +37,8 @@ KW_DESC : 'DESC'; KW_NULLS : 'NULLS'; KW_LAST : 'LAST'; KW_ORDER : 'ORDER'; +KW_ORDERED : 'ORDERED'; +KW_LOCALLY : 'LOCALLY'; KW_GROUP : 'GROUP'; KW_BY : 'BY'; KW_HAVING : 'HAVING'; diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index f05aa897f3af..497d2928a3bf 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -514,6 +514,7 @@ TOK_AS_OF_TIME; TOK_AS_OF_VERSION; TOK_FROM_VERSION; TOK_AS_OF_TAG; +TOK_WRITE_LOCALLY_ORDERED; } @@ -558,6 +559,8 @@ import org.apache.hadoop.hive.conf.HiveConf; xlateMap.put("KW_NULLS", "NULLS"); xlateMap.put("KW_LAST", "LAST"); xlateMap.put("KW_ORDER", "ORDER"); + xlateMap.put("KW_ORDERED", "ORDERED"); + xlateMap.put("KW_LOCALLY", "LOCALLY"); xlateMap.put("KW_BY", "BY"); xlateMap.put("KW_GROUP", "GROUP"); xlateMap.put("KW_WHERE", "WHERE"); @@ -1840,6 +1843,14 @@ tableImplBuckets -> ^(TOK_ALTERTABLE_BUCKETS $num) ; +tableWriteLocallyOrdered +@init { pushMsg("table sorted specification", state); } +@after { popMsg(state); } + : + KW_WRITE KW_LOCALLY KW_ORDERED KW_BY sortCols=columnNameOrderList + -> ^(TOK_WRITE_LOCALLY_ORDERED $sortCols?) + ; + tableSkewed @init { pushMsg("table skewed specification", state); } @after { popMsg(state); } @@ -2201,6 +2212,8 @@ columnNameOrder ^(TOK_TABSORTCOLNAMEDESC ^(TOK_NULLS_LAST identifier)) -> {$orderSpec.tree.getType()==HiveParser.KW_ASC}? ^(TOK_TABSORTCOLNAMEASC ^($nullSpec identifier)) + -> {$orderSpec.tree.getType()==HiveParser.KW_DESC}? + ^(TOK_TABSORTCOLNAMEDESC ^($nullSpec identifier)) -> ^(TOK_TABSORTCOLNAMEDESC ^($nullSpec identifier)) ; diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index deeb707ef446..cef19520f846 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -997,6 +997,8 @@ nonReserved | KW_PKFK_JOIN | KW_THAN | KW_TIMESTAMPLOCALTZ + | KW_ORDERED + | KW_LOCALLY ; //The following SQL2011 reserved keywords are used as function name only, but not as identifiers. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortFieldDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortFieldDesc.java new file mode 100644 index 000000000000..c110f5c970c0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortFieldDesc.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.ddl.misc.sortoder; + +public class SortFieldDesc { + + private String columnName; + private NullOrder nullOrder; + private SortDirection direction; + + public SortFieldDesc() { + } + + public SortFieldDesc(String columnName, SortDirection direction, NullOrder nullOrder) { + this.columnName = columnName; + this.direction = direction; + this.nullOrder = nullOrder; + } + + public enum NullOrder { + NULLS_FIRST, NULLS_LAST; + } + + public enum SortDirection { + ASC, + DESC; + } + + public String getColumnName() { + return columnName; + } + + public NullOrder getNullOrder() { + return nullOrder; + } + + public SortDirection getDirection() { + return direction; + } + + public void setColumnName(String columnName) { + this.columnName = columnName; + } + + public void setNullOrder(NullOrder nullOrder) { + this.nullOrder = nullOrder; + } + + public void setDirection(SortDirection direction) { + this.direction = direction; + } + + @Override + public String toString() { + return String.format("{columnName:%s,direction:%s,nullOrder:%s}", columnName, direction, nullOrder); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortFields.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortFields.java new file mode 100644 index 000000000000..e7080fff019e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortFields.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.ddl.misc.sortoder; + +import java.util.LinkedList; +import java.util.List; + +public class SortFields { + + private List sortFieldDescs; + + public SortFields() { + this.sortFieldDescs = new LinkedList<>(); + } + + public SortFields(List sortFields) { + if (sortFields == null) { + this.sortFieldDescs = new LinkedList<>(); + } else { + this.sortFieldDescs = sortFields; + } + } + + public List getSortFields() { + return sortFieldDescs; + } + + public void setSortFields(List sortFields) { + this.sortFieldDescs = sortFields; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index d30e8afaf320..763bc7615ac1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -65,6 +65,8 @@ import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.antlr.runtime.ClassicToken; import org.antlr.runtime.CommonToken; import org.antlr.runtime.Token; @@ -122,6 +124,8 @@ import org.apache.hadoop.hive.ql.ddl.DDLDescWithTableProperties; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.misc.hooks.InsertCommitHookDesc; +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc; +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields; import org.apache.hadoop.hive.ql.ddl.table.constraint.ConstraintsUtils; import org.apache.hadoop.hive.ql.ddl.table.convert.AlterTableConvertOperation; import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc; @@ -332,7 +336,6 @@ import com.google.common.collect.Sets; import com.google.common.math.IntMath; import com.google.common.math.LongMath; - /** * Implementation of the semantic analyzer. It generates the query plan. * There are other specific semantic analyzers for some hive operations such as @@ -481,7 +484,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { }; private int subQueryExpressionAliasCounter = 0; - + private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper(); static class Phase1Ctx { String dest; int nextNum; @@ -14003,7 +14006,7 @@ ASTNode analyzeCreateTable( boolean partitionTransformSpecExists = false; String likeFile = null; String likeFileFormat = null; - + String sortOrder = null; RowFormatParams rowFormatParams = new RowFormatParams(); StorageFormat storageFormat = new StorageFormat(conf); @@ -14135,6 +14138,9 @@ ASTNode analyzeCreateTable( numBuckets = Integer.parseInt(child.getChild(2).getText()); } break; + case HiveParser.TOK_WRITE_LOCALLY_ORDERED: + sortOrder = getSortOrderJson((ASTNode) child.getChild(0)); + break; case HiveParser.TOK_TABLEROWFORMAT: rowFormatParams.analyzeRowFormat(child); break; @@ -14263,7 +14269,9 @@ ASTNode analyzeCreateTable( isExt = isExternalTableChanged(tblProps, isTransactional, isExt, isDefaultTableTypeChanged); addDbAndTabToOutputs(new String[] {qualifiedTabName.getDb(), qualifiedTabName.getTable()}, TableType.MANAGED_TABLE, isTemporary, tblProps, storageFormat); - + if (!Strings.isNullOrEmpty(sortOrder)) { + tblProps.put("default-sort-order", sortOrder); + } CreateTableDesc crtTblDesc = new CreateTableDesc(qualifiedTabName, isExt, isTemporary, cols, partCols, bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, @@ -16097,7 +16105,26 @@ protected void addPartitionColsToInsert(List partCols, rewrittenQueryStr.append(")"); } } - + + private String getSortOrderJson(ASTNode ast) { + List sortFieldDescList = new ArrayList<>(); + SortFields sortFields = new SortFields(sortFieldDescList); + for (int i = 0; i < ast.getChildCount(); i++) { + ASTNode child = (ASTNode) ast.getChild(i); + SortFieldDesc.SortDirection sortDirection = child.getToken() + .getType() == HiveParser.TOK_TABSORTCOLNAMEDESC ? SortFieldDesc.SortDirection.DESC : SortFieldDesc.SortDirection.ASC; + child = (ASTNode) child.getChild(0); + String name = unescapeIdentifier(child.getChild(0).getText()).toLowerCase(); + SortFieldDesc.NullOrder nullOrder = child.getToken().getType() == HiveParser.TOK_NULLS_FIRST ? SortFieldDesc.NullOrder.NULLS_FIRST : SortFieldDesc.NullOrder.NULLS_LAST; + sortFieldDescList.add(new SortFieldDesc(name, sortDirection, nullOrder)); + } + try { + return JSON_OBJECT_MAPPER.writer().writeValueAsString(sortFields); + } catch (JsonProcessingException e) { + LOG.warn("Can not create write order json. ", e); + return null; + } + } @Override public WriteEntity getAcidAnalyzeTable() { return acidAnalyzeTable;