Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4733] feat(paimon-spark-connector): support hive backend catalog in paimon spark connector #5858

Open
wants to merge 41 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ed8e5b7
basic ddl
Dec 1, 2024
e4a11d1
basic ddl
Dec 1, 2024
a16e949
basic dml
Dec 1, 2024
e28d323
support partition
Dec 1, 2024
6de61c2
basic dml
Dec 1, 2024
17690df
basic schema ddl
Dec 1, 2024
dae7ccf
Merge branch 'support-paimon-connector-ddl' of github.com:caican00/gr…
Dec 2, 2024
7c6013c
fix
Dec 2, 2024
0915488
fix
Dec 2, 2024
4190fd9
Merge branch 'support-paimon-connector-ddl' into support-paimon-conne…
Dec 2, 2024
01bf325
Merge branch 'main' of github.com:apache/gravitino into support-paimo…
Dec 2, 2024
a83a406
fix
Dec 2, 2024
8846dc9
support partition management
Dec 2, 2024
c114d1c
support partition management
Dec 2, 2024
af0b32a
fix
Dec 3, 2024
79403e2
fix
Dec 3, 2024
1aa58cd
fix
Dec 3, 2024
33c6956
Merge branch 'support-paimon-connector-dml' of github.com:caican00/gr…
Dec 3, 2024
3512934
support hive backend
Dec 4, 2024
4499371
support hive backend
Dec 4, 2024
84f48f0
support hive backend
Dec 4, 2024
659338f
fix
Dec 2, 2024
069b748
fix
Dec 6, 2024
d354e2c
Merge branch 'support-paimon-connector-ddl' of github.com:caican00/gr…
Dec 9, 2024
658f687
support paimon connector dml together
Dec 9, 2024
03f1a2d
Merge branch 'main' into support-paimon-connector-ddl
caican00 Dec 9, 2024
f0bfd60
fix
Dec 9, 2024
22de28e
fix
Dec 9, 2024
3195e8c
Merge branch 'support-paimon-connector-ddl' of github.com:caican00/gr…
Dec 9, 2024
e337024
fix
Dec 9, 2024
ce0d591
fix
Dec 9, 2024
c77b2f4
Merge branch 'support-paimon-connector-ddl' of github.com:caican00/gr…
Dec 9, 2024
1ce5e4e
fix
Dec 9, 2024
a5413d1
Merge branch 'support-paimon-connector-partition-management' of githu…
Dec 9, 2024
4f92ddd
Merge branch 'support-paimon-connector-partition-management' of githu…
Dec 9, 2024
245dbfb
fix
Dec 9, 2024
f8220e5
fix
Dec 9, 2024
3c51336
Merge branch 'support-paimon-connector-ddl' of github.com:caican00/gr…
Dec 15, 2024
556937e
fix
Dec 15, 2024
59c9456
Merge branch 'main' of github.com:apache/gravitino into support-paimo…
Dec 24, 2024
ff12574
fix
Dec 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,22 @@ protected void testCreateAndLoadSchema() {
Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
// The database of the Paimon filesystem backend do not store any properties.
Assertions.assertFalse(databaseMeta.containsKey("ID"));
Assertions.assertFalse(databaseMeta.containsKey("comment"));
}

@Test
@Override
protected void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);");
sql(
"CREATE DATABASE "
+ testDatabaseName
+ " COMMENT 'db comment' WITH DBPROPERTIES (ID=001);");
Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
// The database of the Paimon filesystem backend do not store any properties.
Assertions.assertFalse(databaseMeta.containsKey("ID"));
Assertions.assertFalse(databaseMeta.get("Properties").contains("(ID,001)"));
Assertions.assertFalse(databaseMeta.containsKey("Comment"));

// The Paimon filesystem backend do not support alter database operation.
Assertions.assertThrows(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.spark.connector.integration.test.paimon;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.gravitino.spark.connector.paimon.PaimonPropertiesConstants;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

/** This class use Apache Paimon HiveCatalog for backend catalog. */
@Tag("gravitino-docker-test")
public abstract class SparkPaimonCatalogHiveBackendIT extends SparkPaimonCatalogIT {

@Override
protected Map<String, String> getCatalogConfigs() {
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put(
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_HIVE);
catalogProperties.put(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_URI, hiveMetastoreUri);
catalogProperties.put(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE, warehouse);
return catalogProperties;
}

@Test
@Override
protected void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
sql(
"CREATE DATABASE "
+ testDatabaseName
+ " COMMENT 'db comment' WITH DBPROPERTIES (ID=001);");
Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
Assertions.assertTrue(databaseMeta.get("Properties").contains("(ID,001)"));
Assertions.assertEquals("db comment", databaseMeta.get("Comment"));

// The Paimon filesystem backend do not support alter database operation.
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
sql(
String.format(
"ALTER DATABASE %s SET DBPROPERTIES ('ID'='002')", testDatabaseName)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,38 @@ void testPaimonPartitions() {
checkDirExists(partitionPath);
}

@Test
void testPaimonPartitionManagement() {
// replace, add and load partition operations are unsupported in Paimon now.
// Therefore, Paimon spark runtime only supports list and drop partition operations.
testPaimonListAndDropPartition();
}

private void testPaimonListAndDropPartition() {
String tableName = "test_paimon_drop_partition";
dropTableIfExists(tableName);
String createTableSQL = getCreatePaimonSimpleTableString(tableName);
createTableSQL = createTableSQL + " PARTITIONED BY (name);";
sql(createTableSQL);

String insertData =
String.format(
"INSERT into %s values(1,'a','beijing'), (2,'b','beijing'), (3,'c','beijing');",
tableName);
sql(insertData);
List<String> queryResult = getTableData(tableName);
Assertions.assertEquals(3, queryResult.size());

List<String> partitions = getQueryData(String.format("show partitions %s", tableName));
Assertions.assertEquals(3, partitions.size());
Assertions.assertEquals("name=a;name=b;name=c", String.join(";", partitions));

sql(String.format("ALTER TABLE %s DROP PARTITION (`name`='a')", tableName));
partitions = getQueryData(String.format("show partitions %s", tableName));
Assertions.assertEquals(2, partitions.size());
Assertions.assertEquals("name=b;name=c", String.join(";", partitions));
}

private String getCreatePaimonSimpleTableString(String tableName) {
return String.format(
"CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', address STRING COMMENT '') USING paimon",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.spark.connector.integration.test.paimon;

import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SparkPaimonCatalogHiveBackendIT33 extends SparkPaimonCatalogHiveBackendIT {
@Test
void testCatalogClassName() {
String catalogClass =
getSparkSession().sparkContext().conf().get("spark.sql.catalog." + getCatalogName());
Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(), catalogClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.spark.connector.integration.test.paimon;

import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SparkPaimonCatalogHiveBackendIT34 extends SparkPaimonCatalogHiveBackendIT {

@Test
void testCatalogClassName() {
String catalogClass =
getSparkSession().sparkContext().conf().get("spark.sql.catalog." + getCatalogName());
Assertions.assertEquals(GravitinoPaimonCatalogSpark34.class.getName(), catalogClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.spark.connector.integration.test.paimon;

import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SparkPaimonCatalogHiveBackendIT35 extends SparkPaimonCatalogHiveBackendIT {

@Test
void testCatalogClassName() {
String catalogClass =
getSparkSession().sparkContext().conf().get("spark.sql.catalog." + getCatalogName());
Assertions.assertEquals(GravitinoPaimonCatalogSpark35.class.getName(), catalogClass);
}
}
Loading