From 08a1899dd7ac1912b5bda0e03e083777c46f0111 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Mon, 10 Feb 2025 09:29:52 -0800 Subject: [PATCH] [v3.0.0] Remove SparkSQL support (#3306) * Remove SparkSQL support Signed-off-by: Tomoyuki Morita * Remove a dependency Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita --- DEVELOPER_GUIDE.rst | 2 - async-query-core/build.gradle | 2 +- .../sql/spark/data/type/SparkDataType.java | 0 .../sql/spark/data/value/SparkExprValue.java | 0 ...DefaultSparkSqlFunctionResponseHandle.java | 0 .../SparkSqlFunctionResponseHandle.java | 0 .../spark/data/type/SparkDataTypeTest.java | 34 ++ .../spark/data/value/SparkExprValueTest.java | 40 +++ ...ultSparkSqlFunctionResponseHandleTest.java | 63 ++++ .../validator/PPLQueryValidatorTest.java | 4 +- .../validator/SQLQueryValidatorTest.java | 12 +- .../spark_execution_result_test.json | 79 +++++ .../query_execution_request_mapping.yml | 0 .../query_execution_request_settings.yml | 0 .../sql/datasource/model/DataSourceType.java | 3 +- .../datasource/model/DataSourceTypeTest.java | 1 - .../ppl/admin/connectors/spark_connector.rst | 92 ------ plugin/build.gradle | 1 - .../org/opensearch/sql/plugin/SQLPlugin.java | 2 - settings.gradle | 1 - spark/build.gradle | 88 ------ .../sql/spark/client/EmrClientImpl.java | 128 -------- .../sql/spark/client/SparkClient.java | 20 -- .../SparkSqlFunctionImplementation.java | 106 ------- .../SparkSqlTableFunctionResolver.java | 81 ----- .../SparkSqlFunctionTableScanBuilder.java | 32 -- .../SparkSqlFunctionTableScanOperator.java | 69 ----- .../sql/spark/helper/FlintHelper.java | 51 --- .../sql/spark/request/SparkQueryRequest.java | 16 - .../sql/spark/response/SparkResponse.java | 102 ------ .../sql/spark/storage/SparkScan.java | 50 --- .../sql/spark/storage/SparkStorageEngine.java | 32 -- .../spark/storage/SparkStorageFactory.java | 130 -------- .../sql/spark/storage/SparkTable.java | 62 ---- .../sql/spark/client/EmrClientImplTest.java | 158 ---------- .../sql/spark/constants/TestConstants.java | 26 -- .../spark/data/type/SparkDataTypeTest.java | 19 -- .../spark/data/value/SparkExprValueTest.java | 28 -- .../SparkSqlFunctionImplementationTest.java | 78 ----- .../SparkSqlFunctionTableScanBuilderTest.java | 46 --- ...SparkSqlFunctionTableScanOperatorTest.java | 292 ------------------ .../SparkSqlTableFunctionResolverTest.java | 140 --------- .../sql/spark/response/SparkResponseTest.java | 117 ------- .../sql/spark/storage/SparkScanTest.java | 40 --- .../spark/storage/SparkStorageEngineTest.java | 46 --- .../storage/SparkStorageFactoryTest.java | 182 ----------- .../sql/spark/storage/SparkTableTest.java | 77 ----- .../opensearch/sql/spark/utils/TestUtils.java | 17 - spark/src/test/resources/all_data_type.json | 22 -- spark/src/test/resources/issue2210.json | 17 - .../org.mockito.plugins.MockMaker | 1 - .../test/resources/select_query_response.json | 12 - spark/src/test/resources/spark_data_type.json | 13 - 53 files changed, 226 insertions(+), 2408 deletions(-) rename {spark => async-query-core}/src/main/java/org/opensearch/sql/spark/data/type/SparkDataType.java (100%) rename {spark => async-query-core}/src/main/java/org/opensearch/sql/spark/data/value/SparkExprValue.java (100%) rename {spark => async-query-core}/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java (100%) rename {spark => async-query-core}/src/main/java/org/opensearch/sql/spark/functions/response/SparkSqlFunctionResponseHandle.java (100%) create mode 100644 async-query-core/src/test/java/org/opensearch/sql/spark/data/type/SparkDataTypeTest.java create mode 100644 async-query-core/src/test/java/org/opensearch/sql/spark/data/value/SparkExprValueTest.java create mode 100644 async-query-core/src/test/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandleTest.java create mode 100644 async-query-core/src/test/resources/spark_execution_result_test.json rename {spark => async-query}/src/main/resources/query_execution_request_mapping.yml (100%) rename {spark => async-query}/src/main/resources/query_execution_request_settings.yml (100%) delete mode 100644 docs/user/ppl/admin/connectors/spark_connector.rst delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanOperator.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/helper/FlintHelper.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkScan.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/client/EmrClientImplTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/data/type/SparkDataTypeTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/data/value/SparkExprValueTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanOperatorTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkScanTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java delete mode 100644 spark/src/test/resources/all_data_type.json delete mode 100644 spark/src/test/resources/issue2210.json delete mode 100644 spark/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker delete mode 100644 spark/src/test/resources/select_query_response.json delete mode 100644 spark/src/test/resources/spark_data_type.json diff --git a/DEVELOPER_GUIDE.rst b/DEVELOPER_GUIDE.rst index ec00c587a6..c9ca9ae4d1 100644 --- a/DEVELOPER_GUIDE.rst +++ b/DEVELOPER_GUIDE.rst @@ -146,7 +146,6 @@ The plugin codebase is in standard layout of Gradle project:: ├── plugin ├── protocol ├── ppl - ├── spark ├── sql ├── sql-cli ├── sql-jdbc @@ -161,7 +160,6 @@ Here are sub-folders (Gradle modules) for plugin source code: - ``core``: core query engine. - ``opensearch``: OpenSearch storage engine. - ``prometheus``: Prometheus storage engine. -- ``spark`` : Spark storage engine - ``protocol``: request/response protocol formatter. - ``common``: common util code. - ``integ-test``: integration and comparison test. diff --git a/async-query-core/build.gradle b/async-query-core/build.gradle index 37bf6748c9..02a63afae4 100644 --- a/async-query-core/build.gradle +++ b/async-query-core/build.gradle @@ -46,10 +46,10 @@ dependencies { antlr "org.antlr:antlr4:4.7.1" implementation project(':core') - implementation project(':spark') // TODO: dependency to spark should be eliminated implementation project(':datasources') // TODO: dependency to datasources should be eliminated implementation 'org.json:json:20231013' implementation 'com.google.code.gson:gson:2.8.9' + api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: "${aws_java_sdk_version}" testImplementation(platform("org.junit:junit-bom:5.9.3")) diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/type/SparkDataType.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/type/SparkDataType.java similarity index 100% rename from spark/src/main/java/org/opensearch/sql/spark/data/type/SparkDataType.java rename to async-query-core/src/main/java/org/opensearch/sql/spark/data/type/SparkDataType.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/value/SparkExprValue.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/value/SparkExprValue.java similarity index 100% rename from spark/src/main/java/org/opensearch/sql/spark/data/value/SparkExprValue.java rename to async-query-core/src/main/java/org/opensearch/sql/spark/data/value/SparkExprValue.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java b/async-query-core/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java similarity index 100% rename from spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java rename to async-query-core/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/response/SparkSqlFunctionResponseHandle.java b/async-query-core/src/main/java/org/opensearch/sql/spark/functions/response/SparkSqlFunctionResponseHandle.java similarity index 100% rename from spark/src/main/java/org/opensearch/sql/spark/functions/response/SparkSqlFunctionResponseHandle.java rename to async-query-core/src/main/java/org/opensearch/sql/spark/functions/response/SparkSqlFunctionResponseHandle.java diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/data/type/SparkDataTypeTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/data/type/SparkDataTypeTest.java new file mode 100644 index 0000000000..ed94cd9e16 --- /dev/null +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/data/type/SparkDataTypeTest.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.data.type; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class SparkDataTypeTest { + + @Test + void testTypeName() { + String expectedTypeName = "spark_string"; + SparkDataType sparkDataType = new SparkDataType(expectedTypeName); + + assertEquals( + expectedTypeName, sparkDataType.typeName(), "Type name should match the expected value"); + } + + @Test + void testEqualsAndHashCode() { + SparkDataType type1 = new SparkDataType("spark_integer"); + SparkDataType type2 = new SparkDataType("spark_integer"); + SparkDataType type3 = new SparkDataType("spark_double"); + + assertEquals(type1, type2); + assertNotEquals(type1, type3); + assertEquals(type1.hashCode(), type2.hashCode()); + assertNotEquals(type1.hashCode(), type3.hashCode()); + } +} diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/data/value/SparkExprValueTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/data/value/SparkExprValueTest.java new file mode 100644 index 0000000000..3b1ea14d40 --- /dev/null +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/data/value/SparkExprValueTest.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.data.value; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.opensearch.sql.spark.data.type.SparkDataType; + +class SparkExprValueTest { + private final SparkDataType sparkDataType = new SparkDataType("char"); + + @Test + public void getters() { + SparkExprValue sparkExprValue = new SparkExprValue(sparkDataType, "str"); + + assertEquals(sparkDataType, sparkExprValue.type()); + assertEquals("str", sparkExprValue.value()); + } + + @Test + public void unsupportedCompare() { + SparkExprValue sparkExprValue = new SparkExprValue(sparkDataType, "str"); + + assertThrows(UnsupportedOperationException.class, () -> sparkExprValue.compare(sparkExprValue)); + } + + @Test + public void testEquals() { + SparkExprValue sparkExprValue1 = new SparkExprValue(sparkDataType, "str"); + SparkExprValue sparkExprValue2 = new SparkExprValue(sparkDataType, "str"); + SparkExprValue sparkExprValue3 = new SparkExprValue(sparkDataType, "other"); + + assertTrue(sparkExprValue1.equal(sparkExprValue2)); + assertFalse(sparkExprValue1.equal(sparkExprValue3)); + } +} diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandleTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandleTest.java new file mode 100644 index 0000000000..b9e128e836 --- /dev/null +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandleTest.java @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.functions.response; + +import static org.junit.jupiter.api.Assertions.*; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.data.model.ExprBooleanValue; +import org.opensearch.sql.data.model.ExprByteValue; +import org.opensearch.sql.data.model.ExprDateValue; +import org.opensearch.sql.data.model.ExprDoubleValue; +import org.opensearch.sql.data.model.ExprFloatValue; +import org.opensearch.sql.data.model.ExprIntegerValue; +import org.opensearch.sql.data.model.ExprLongValue; +import org.opensearch.sql.data.model.ExprShortValue; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTimestampValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; + +class DefaultSparkSqlFunctionResponseHandleTest { + + @Test + public void testConstruct() throws Exception { + DefaultSparkSqlFunctionResponseHandle handle = + new DefaultSparkSqlFunctionResponseHandle(readJson()); + + assertTrue(handle.hasNext()); + ExprValue value = handle.next(); + Map row = value.tupleValue(); + assertEquals(ExprBooleanValue.of(true), row.get("col1")); + assertEquals(new ExprLongValue(2), row.get("col2")); + assertEquals(new ExprIntegerValue(3), row.get("col3")); + assertEquals(new ExprShortValue(4), row.get("col4")); + assertEquals(new ExprByteValue(5), row.get("col5")); + assertEquals(new ExprDoubleValue(6.1), row.get("col6")); + assertEquals(new ExprFloatValue(7.1), row.get("col7")); + assertEquals(new ExprTimestampValue("2024-01-02 03:04:05.1234"), row.get("col8")); + assertEquals(new ExprDateValue("2024-01-03 04:05:06.1234"), row.get("col9")); + assertEquals(new ExprStringValue("some string"), row.get("col10")); + + ExecutionEngine.Schema schema = handle.schema(); + List columns = schema.getColumns(); + assertEquals("col1", columns.get(0).getName()); + } + + private JSONObject readJson() throws Exception { + final URL url = + DefaultSparkSqlFunctionResponseHandle.class.getResource( + "/spark_execution_result_test.json"); + return new JSONObject(Files.readString(Paths.get(url.toURI()))); + } +} diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/validator/PPLQueryValidatorTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/validator/PPLQueryValidatorTest.java index 8d02bb3c72..aebf7e06f8 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/validator/PPLQueryValidatorTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/validator/PPLQueryValidatorTest.java @@ -97,7 +97,7 @@ private String[] addPrefix(String... queries) { void testAllowAllByDefault() { when(mockedProvider.getValidatorForDatasource(any())) .thenReturn(new DefaultGrammarElementValidator()); - VerifyValidator v = new VerifyValidator(pplQueryValidator, DataSourceType.SPARK); + VerifyValidator v = new VerifyValidator(pplQueryValidator, DataSourceType.S3GLUE); Arrays.stream(PPLQueryValidatorTest.TestElement.values()).forEach(v::ok); } @@ -127,7 +127,7 @@ public TestPPLGrammarElementValidator() { void testCwlValidator() { when(mockedProvider.getValidatorForDatasource(any())) .thenReturn(new TestPPLGrammarElementValidator()); - VerifyValidator v = new VerifyValidator(pplQueryValidator, DataSourceType.SPARK); + VerifyValidator v = new VerifyValidator(pplQueryValidator, DataSourceType.S3GLUE); v.ok(TestElement.FIELDS); v.ok(TestElement.WHERE); diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/validator/SQLQueryValidatorTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/validator/SQLQueryValidatorTest.java index 3cfc33a5b1..bb81a7b46d 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/validator/SQLQueryValidatorTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/validator/SQLQueryValidatorTest.java @@ -213,14 +213,14 @@ private enum TestElement { void testAllowAllByDefault() { when(mockedProvider.getValidatorForDatasource(any())) .thenReturn(new DefaultGrammarElementValidator()); - VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.SPARK); + VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.S3GLUE); Arrays.stream(TestElement.values()).forEach(v::ok); } @Test void testDenyAllValidator() { when(mockedProvider.getValidatorForDatasource(any())).thenReturn(element -> false); - VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.SPARK); + VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.S3GLUE); // The elements which doesn't have validation will be accepted. // That's why there are some ok case @@ -587,7 +587,7 @@ void testValidateFlintExtensionQuery() { @Test void testInvalidIdentifier() { when(mockedProvider.getValidatorForDatasource(any())).thenReturn(element -> true); - VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.SPARK); + VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.S3GLUE); v.ng("SELECT a.b.c as a-b-c FROM abc"); v.ok("SELECT a.b.c as `a-b-c` FROM abc"); v.ok("SELECT a.b.c as a_b_c FROM abc"); @@ -601,7 +601,7 @@ void testInvalidIdentifier() { @Test void testUnsupportedType() { when(mockedProvider.getValidatorForDatasource(any())).thenReturn(element -> true); - VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.SPARK); + VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.S3GLUE); v.ng("SELECT cast ( a as DateTime ) FROM tbl"); v.ok("SELECT cast ( a as DATE ) FROM tbl"); @@ -612,7 +612,7 @@ void testUnsupportedType() { @Test void testUnsupportedTypedLiteral() { when(mockedProvider.getValidatorForDatasource(any())).thenReturn(element -> true); - VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.SPARK); + VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.S3GLUE); v.ng("SELECT DATETIME '2024-10-11'"); v.ok("SELECT DATE '2024-10-11'"); @@ -622,7 +622,7 @@ void testUnsupportedTypedLiteral() { @Test void testUnsupportedHiveNativeCommand() { when(mockedProvider.getValidatorForDatasource(any())).thenReturn(element -> true); - VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.SPARK); + VerifyValidator v = new VerifyValidator(sqlQueryValidator, DataSourceType.S3GLUE); v.ng("CREATE ROLE aaa"); v.ng("SHOW GRANT"); diff --git a/async-query-core/src/test/resources/spark_execution_result_test.json b/async-query-core/src/test/resources/spark_execution_result_test.json new file mode 100644 index 0000000000..80d5a49283 --- /dev/null +++ b/async-query-core/src/test/resources/spark_execution_result_test.json @@ -0,0 +1,79 @@ +{ + "data" : { + "schema": [ + { + "column_name": "col1", + "data_type": "boolean" + }, + { + "column_name": "col2", + "data_type": "long" + }, + { + "column_name": "col3", + "data_type": "integer" + }, + { + "column_name": "col4", + "data_type": "short" + }, + { + "column_name": "col5", + "data_type": "byte" + }, + { + "column_name": "col6", + "data_type": "double" + }, + { + "column_name": "col7", + "data_type": "float" + }, + { + "column_name": "col8", + "data_type": "timestamp" + }, + { + "column_name": "col9", + "data_type": "date" + }, + { + "column_name": "col10", + "data_type": "string" + }, + { + "column_name": "col11", + "data_type": "other" + }, + { + "column_name": "col12", + "data_type": "other object" + }, + { + "column_name": "col13", + "data_type": "other array" + }, + { + "column_name": "col14", + "data_type": "other" + } + ], + "result": [ + { + "col1": true, + "col2": 2, + "col3": 3, + "col4": 4, + "col5": 5, + "col6": 6.1, + "col7": 7.1, + "col8": "2024-01-02 03:04:05.1234", + "col9": "2024-01-03 04:05:06.1234", + "col10": "some string", + "col11": "other value", + "col12": { "hello": "world" }, + "col13": [1, 2, 3] + } + ] + } +} \ No newline at end of file diff --git a/spark/src/main/resources/query_execution_request_mapping.yml b/async-query/src/main/resources/query_execution_request_mapping.yml similarity index 100% rename from spark/src/main/resources/query_execution_request_mapping.yml rename to async-query/src/main/resources/query_execution_request_mapping.yml diff --git a/spark/src/main/resources/query_execution_request_settings.yml b/async-query/src/main/resources/query_execution_request_settings.yml similarity index 100% rename from spark/src/main/resources/query_execution_request_settings.yml rename to async-query/src/main/resources/query_execution_request_settings.yml diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index 442497094b..5579818a4c 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -15,7 +15,6 @@ public class DataSourceType { public static final DataSourceType PROMETHEUS = new DataSourceType("PROMETHEUS"); public static final DataSourceType OPENSEARCH = new DataSourceType("OPENSEARCH"); - public static final DataSourceType SPARK = new DataSourceType("SPARK"); public static final DataSourceType S3GLUE = new DataSourceType("S3GLUE"); public static final DataSourceType SECURITY_LAKE = new DataSourceType("SECURITY_LAKE"); @@ -23,7 +22,7 @@ public class DataSourceType { private static final Map knownValues = new HashMap<>(); static { - register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE, SECURITY_LAKE); + register(PROMETHEUS, OPENSEARCH, S3GLUE, SECURITY_LAKE); } private final String name; diff --git a/core/src/test/java/org/opensearch/sql/datasource/model/DataSourceTypeTest.java b/core/src/test/java/org/opensearch/sql/datasource/model/DataSourceTypeTest.java index de487be2e8..d267d53f12 100644 --- a/core/src/test/java/org/opensearch/sql/datasource/model/DataSourceTypeTest.java +++ b/core/src/test/java/org/opensearch/sql/datasource/model/DataSourceTypeTest.java @@ -15,7 +15,6 @@ class DataSourceTypeTest { public void fromString_succeed() { testFromString("PROMETHEUS", DataSourceType.PROMETHEUS); testFromString("OPENSEARCH", DataSourceType.OPENSEARCH); - testFromString("SPARK", DataSourceType.SPARK); testFromString("S3GLUE", DataSourceType.S3GLUE); testFromString("prometheus", DataSourceType.PROMETHEUS); diff --git a/docs/user/ppl/admin/connectors/spark_connector.rst b/docs/user/ppl/admin/connectors/spark_connector.rst deleted file mode 100644 index 59a52998bc..0000000000 --- a/docs/user/ppl/admin/connectors/spark_connector.rst +++ /dev/null @@ -1,92 +0,0 @@ -.. highlight:: sh - -==================== -Spark Connector -==================== - -.. rubric:: Table of contents - -.. contents:: - :local: - :depth: 1 - - -Introduction -============ - -This page covers spark connector properties for dataSource configuration -and the nuances associated with spark connector. - - -Spark Connector Properties in DataSource Configuration -======================================================== -Spark Connector Properties. - -* ``spark.connector`` [Required]. - * This parameters provides the spark client information for connection. -* ``spark.sql.application`` [Optional]. - * This parameters provides the spark sql application jar. Default value is ``s3://spark-datasource/sql-job.jar``. -* ``emr.cluster`` [Required]. - * This parameters provides the emr cluster id information. -* ``emr.auth.type`` [Required] - * This parameters provides the authentication type information. - * Spark emr connector currently supports ``awssigv4`` authentication mechanism and following parameters are required. - * ``emr.auth.region``, ``emr.auth.access_key`` and ``emr.auth.secret_key`` -* ``spark.datasource.flint.*`` [Optional] - * This parameters provides the Opensearch domain host information for flint integration. - * ``spark.datasource.flint.integration`` [Optional] - * Default value for integration jar is ``s3://spark-datasource/flint-spark-integration-assembly-0.3.0-SNAPSHOT.jar``. - * ``spark.datasource.flint.host`` [Optional] - * Default value for host is ``localhost``. - * ``spark.datasource.flint.port`` [Optional] - * Default value for port is ``9200``. - * ``spark.datasource.flint.scheme`` [Optional] - * Default value for scheme is ``http``. - * ``spark.datasource.flint.auth`` [Optional] - * Default value for auth is ``false``. - * ``spark.datasource.flint.region`` [Optional] - * Default value for auth is ``us-west-2``. - -Example spark dataSource configuration -======================================== - -AWSSigV4 Auth:: - - [{ - "name" : "my_spark", - "connector": "spark", - "properties" : { - "spark.connector": "emr", - "emr.cluster" : "{{clusterId}}", - "emr.auth.type" : "awssigv4", - "emr.auth.region" : "us-east-1", - "emr.auth.access_key" : "{{accessKey}}" - "emr.auth.secret_key" : "{{secretKey}}" - "spark.datasource.flint.host" : "{{opensearchHost}}", - "spark.datasource.flint.port" : "{{opensearchPort}}", - "spark.datasource.flint.scheme" : "{{opensearchScheme}}", - "spark.datasource.flint.auth" : "{{opensearchAuth}}", - "spark.datasource.flint.region" : "{{opensearchRegion}}", - } - }] - - -Spark SQL Support -================== - -`sql` Function ----------------------------- -Spark connector offers `sql` function. This function can be used to run spark sql query. -The function takes spark sql query as input. Argument should be either passed by name or positionArguments should be either passed by name or position. -`source=my_spark.sql('select 1')` -or -`source=my_spark.sql(query='select 1')` -Example:: - - > source=my_spark.sql('select 1') - +---+ - | 1 | - |---+ - | 1 | - +---+ - diff --git a/plugin/build.gradle b/plugin/build.gradle index 639ff88ef8..4be19a89e9 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -160,7 +160,6 @@ dependencies { api project(':opensearch') api project(':prometheus') api project(':datasources') - api project(':spark') api project(':async-query') testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.14.9' diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 766edc42c0..21b747e9e2 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -98,7 +98,6 @@ import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner; import org.opensearch.sql.spark.scheduler.parser.OpenSearchScheduleQueryJobRequestParser; -import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportGetAsyncQueryResultAction; @@ -325,7 +324,6 @@ private DataSourceServiceImpl createDataSourceService() { new OpenSearchDataSourceFactory( new OpenSearchNodeClient(this.client), pluginSettings)) .add(new PrometheusStorageFactory(pluginSettings)) - .add(new SparkStorageFactory(this.client, pluginSettings)) .add(new GlueDataSourceFactory(pluginSettings)) .add(new SecurityLakeDataSourceFactory(pluginSettings)) .build(), diff --git a/settings.gradle b/settings.gradle index 9cf1715335..ba38e3aa42 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,7 +18,6 @@ include 'sql' include 'prometheus' include 'benchmarks' include 'datasources' -include 'spark' include 'async-query-core' include 'async-query' diff --git a/spark/build.gradle b/spark/build.gradle index deae891166..e69de29bb2 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -1,88 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -plugins { - id 'java-library' - id "io.freefair.lombok" - id 'jacoco' -} - -repositories { - mavenCentral() -} - -dependencies { - api project(':core') - implementation project(':datasources') - - implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" - implementation group: 'org.json', name: 'json', version: '20231013' - api group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: "${aws_java_sdk_version}" - api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: "${aws_java_sdk_version}" - implementation group: 'commons-io', name: 'commons-io', version: "${commons_io_version}" - - testImplementation(platform("org.junit:junit-bom:5.9.3")) - - testCompileOnly('org.junit.jupiter:junit-jupiter') - testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}" - testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${mockito_version}" - - testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") { - exclude group: 'org.hamcrest', module: 'hamcrest-core' - } - testRuntimeOnly("org.junit.platform:junit-platform-launcher") { - because 'allows tests to run from IDEs that bundle older version of launcher' - } -} - -test { - useJUnitPlatform { - includeEngines("junit-jupiter") - } - testLogging { - events "failed" - exceptionFormat "full" - } -} - -jacocoTestReport { - dependsOn test - executionData test - reports { - html.required = true - xml.required = true - } - afterEvaluate { - classDirectories.setFrom(files(classDirectories.files.collect { - })) - } -} - -jacocoTestCoverageVerification { - dependsOn test - executionData test - violationRules { - rule { - element = 'CLASS' - excludes = [ - 'org.opensearch.sql.spark.data.constants.*', - ] - limit { - counter = 'LINE' - minimum = 1.0 - } - limit { - counter = 'BRANCH' - minimum = 1.0 - } - } - } - afterEvaluate { - classDirectories.setFrom(files(classDirectories.files.collect { - })) - } -} -check.dependsOn jacocoTestCoverageVerification -jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java deleted file mode 100644 index 7b7fa1eadf..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.client; - -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; - -import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; -import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure; -import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest; -import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; -import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest; -import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig; -import com.amazonaws.services.elasticmapreduce.model.StepConfig; -import com.amazonaws.services.elasticmapreduce.model.StepStatus; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import lombok.SneakyThrows; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; -import org.opensearch.sql.spark.helper.FlintHelper; -import org.opensearch.sql.spark.response.SparkResponse; - -public class EmrClientImpl implements SparkClient { - // EMR-S will download JAR to local maven - public static final String SPARK_SQL_APPLICATION_JAR = - "file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.3.0-SNAPSHOT.jar"; - - private final AmazonElasticMapReduce emr; - private final String emrCluster; - private final FlintHelper flint; - private final String sparkApplicationJar; - private static final Logger logger = LogManager.getLogger(EmrClientImpl.class); - private final SparkResponse sparkResponse; - - /** - * Constructor for EMR Client Implementation. - * - * @param emr EMR helper - * @param flint Opensearch args for flint integration jar - * @param sparkResponse Response object to help with retrieving results from Opensearch index - */ - public EmrClientImpl( - AmazonElasticMapReduce emr, - String emrCluster, - FlintHelper flint, - SparkResponse sparkResponse, - String sparkApplicationJar) { - this.emr = emr; - this.emrCluster = emrCluster; - this.flint = flint; - this.sparkResponse = sparkResponse; - this.sparkApplicationJar = - sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar; - } - - @Override - public JSONObject sql(String query) throws IOException { - runEmrApplication(query); - return sparkResponse.getResultFromOpensearchIndex(); - } - - @VisibleForTesting - void runEmrApplication(String query) { - - HadoopJarStepConfig stepConfig = - new HadoopJarStepConfig() - .withJar("command-runner.jar") - .withArgs( - "spark-submit", - "--class", - "org.opensearch.sql.SQLJob", - "--jars", - flint.getFlintIntegrationJar(), - sparkApplicationJar, - query, - DEFAULT_RESULT_INDEX, - flint.getFlintHost(), - flint.getFlintPort(), - flint.getFlintScheme(), - flint.getFlintAuth(), - flint.getFlintRegion()); - - StepConfig emrstep = - new StepConfig() - .withName("Spark Application") - .withActionOnFailure(ActionOnFailure.CONTINUE) - .withHadoopJarStep(stepConfig); - - AddJobFlowStepsRequest request = - new AddJobFlowStepsRequest().withJobFlowId(emrCluster).withSteps(emrstep); - - AddJobFlowStepsResult result = emr.addJobFlowSteps(request); - logger.info("EMR step ID: " + result.getStepIds()); - - String stepId = result.getStepIds().get(0); - DescribeStepRequest stepRequest = - new DescribeStepRequest().withClusterId(emrCluster).withStepId(stepId); - - waitForStepExecution(stepRequest); - sparkResponse.setValue(stepId); - } - - @SneakyThrows - private void waitForStepExecution(DescribeStepRequest stepRequest) { - // Wait for the step to complete - boolean completed = false; - while (!completed) { - // Get the step status - StepStatus statusDetail = emr.describeStep(stepRequest).getStep().getStatus(); - // Check if the step has completed - if (statusDetail.getState().equals("COMPLETED")) { - completed = true; - logger.info("EMR step completed successfully."); - } else if (statusDetail.getState().equals("FAILED") - || statusDetail.getState().equals("CANCELLED")) { - logger.error("EMR step failed or cancelled."); - throw new RuntimeException("Spark SQL application failed."); - } else { - // Sleep for some time before checking the status again - Thread.sleep(2500); - } - } - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java deleted file mode 100644 index b38f04680b..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/client/SparkClient.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.client; - -import java.io.IOException; -import org.json.JSONObject; - -/** Interface class for Spark Client. */ -public interface SparkClient { - /** - * This method executes spark sql query. - * - * @param query spark sql query - * @return spark query response - */ - JSONObject sql(String query) throws IOException; -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java b/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java deleted file mode 100644 index 914aa80085..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/implementation/SparkSqlFunctionImplementation.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions.implementation; - -import static org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver.QUERY; - -import java.util.List; -import java.util.stream.Collectors; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.data.type.ExprType; -import org.opensearch.sql.exception.ExpressionEvaluationException; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.FunctionExpression; -import org.opensearch.sql.expression.NamedArgumentExpression; -import org.opensearch.sql.expression.env.Environment; -import org.opensearch.sql.expression.function.FunctionName; -import org.opensearch.sql.expression.function.TableFunctionImplementation; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkTable; -import org.opensearch.sql.storage.Table; - -/** Spark SQL function implementation. */ -public class SparkSqlFunctionImplementation extends FunctionExpression - implements TableFunctionImplementation { - - private final FunctionName functionName; - private final List arguments; - private final SparkClient sparkClient; - - /** - * Constructor for spark sql function. - * - * @param functionName name of the function - * @param arguments a list of expressions - * @param sparkClient spark client - */ - public SparkSqlFunctionImplementation( - FunctionName functionName, List arguments, SparkClient sparkClient) { - super(functionName, arguments); - this.functionName = functionName; - this.arguments = arguments; - this.sparkClient = sparkClient; - } - - @Override - public ExprValue valueOf(Environment valueEnv) { - throw new UnsupportedOperationException( - String.format( - "Spark defined function [%s] is only " - + "supported in SOURCE clause with spark connector catalog", - functionName)); - } - - @Override - public ExprType type() { - return ExprCoreType.STRUCT; - } - - @Override - public String toString() { - List args = - arguments.stream() - .map( - arg -> - String.format( - "%s=%s", - ((NamedArgumentExpression) arg).getArgName(), - ((NamedArgumentExpression) arg).getValue().toString())) - .collect(Collectors.toList()); - return String.format("%s(%s)", functionName, String.join(", ", args)); - } - - @Override - public Table applyArguments() { - return new SparkTable(sparkClient, buildQueryFromSqlFunction(arguments)); - } - - /** - * This method builds a spark query request. - * - * @param arguments spark sql function arguments - * @return spark query request - */ - private SparkQueryRequest buildQueryFromSqlFunction(List arguments) { - - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - arguments.forEach( - arg -> { - String argName = ((NamedArgumentExpression) arg).getArgName(); - Expression argValue = ((NamedArgumentExpression) arg).getValue(); - ExprValue literalValue = argValue.valueOf(); - if (argName.equals(QUERY)) { - sparkQueryRequest.setSql((String) literalValue.value()); - } else { - throw new ExpressionEvaluationException( - String.format("Invalid Function Argument:%s", argName)); - } - }); - return sparkQueryRequest; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java b/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java deleted file mode 100644 index a4f2a6c0fe..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/resolver/SparkSqlTableFunctionResolver.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions.resolver; - -import static org.opensearch.sql.data.type.ExprCoreType.STRING; - -import java.util.ArrayList; -import java.util.List; -import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.opensearch.sql.exception.SemanticCheckException; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.NamedArgumentExpression; -import org.opensearch.sql.expression.function.FunctionBuilder; -import org.opensearch.sql.expression.function.FunctionName; -import org.opensearch.sql.expression.function.FunctionResolver; -import org.opensearch.sql.expression.function.FunctionSignature; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; - -/** Function resolver for sql function of spark connector. */ -@RequiredArgsConstructor -public class SparkSqlTableFunctionResolver implements FunctionResolver { - private final SparkClient sparkClient; - - public static final String SQL = "sql"; - public static final String QUERY = "query"; - - @Override - public Pair resolve(FunctionSignature unresolvedSignature) { - FunctionName functionName = FunctionName.of(SQL); - FunctionSignature functionSignature = new FunctionSignature(functionName, List.of(STRING)); - final List argumentNames = List.of(QUERY); - - FunctionBuilder functionBuilder = - (functionProperties, arguments) -> { - Boolean argumentsPassedByName = - arguments.stream() - .noneMatch( - arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); - Boolean argumentsPassedByPosition = - arguments.stream() - .allMatch( - arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); - if (!(argumentsPassedByName || argumentsPassedByPosition)) { - throw new SemanticCheckException( - "Arguments should be either passed by name or position"); - } - - if (arguments.size() != argumentNames.size()) { - throw new SemanticCheckException( - String.format( - "Missing arguments:[%s]", - String.join( - ",", argumentNames.subList(arguments.size(), argumentNames.size())))); - } - - if (argumentsPassedByPosition) { - List namedArguments = new ArrayList<>(); - for (int i = 0; i < arguments.size(); i++) { - namedArguments.add( - new NamedArgumentExpression( - argumentNames.get(i), - ((NamedArgumentExpression) arguments.get(i)).getValue())); - } - return new SparkSqlFunctionImplementation(functionName, namedArguments, sparkClient); - } - return new SparkSqlFunctionImplementation(functionName, arguments, sparkClient); - }; - return Pair.of(functionSignature, functionBuilder); - } - - @Override - public FunctionName getFunctionName() { - return FunctionName.of(SQL); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java deleted file mode 100644 index aea8f72f36..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanBuilder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions.scan; - -import lombok.AllArgsConstructor; -import org.opensearch.sql.planner.logical.LogicalProject; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.TableScanOperator; -import org.opensearch.sql.storage.read.TableScanBuilder; - -/** TableScanBuilder for sql function of spark connector. */ -@AllArgsConstructor -public class SparkSqlFunctionTableScanBuilder extends TableScanBuilder { - - private final SparkClient sparkClient; - - private final SparkQueryRequest sparkQueryRequest; - - @Override - public TableScanOperator build() { - return new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - } - - @Override - public boolean pushDownProject(LogicalProject project) { - return true; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanOperator.java b/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanOperator.java deleted file mode 100644 index a2e44affd5..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/scan/SparkSqlFunctionTableScanOperator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions.scan; - -import java.io.IOException; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Locale; -import lombok.RequiredArgsConstructor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.response.DefaultSparkSqlFunctionResponseHandle; -import org.opensearch.sql.spark.functions.response.SparkSqlFunctionResponseHandle; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.TableScanOperator; - -/** This a table scan operator to handle sql table function. */ -@RequiredArgsConstructor -public class SparkSqlFunctionTableScanOperator extends TableScanOperator { - private final SparkClient sparkClient; - private final SparkQueryRequest request; - private SparkSqlFunctionResponseHandle sparkResponseHandle; - private static final Logger LOG = LogManager.getLogger(); - - @Override - public void open() { - super.open(); - this.sparkResponseHandle = - AccessController.doPrivileged( - (PrivilegedAction) - () -> { - try { - JSONObject responseObject = sparkClient.sql(request.getSql()); - return new DefaultSparkSqlFunctionResponseHandle(responseObject); - } catch (IOException e) { - LOG.error(e.getMessage()); - throw new RuntimeException( - String.format("Error fetching data from spark server: %s", e.getMessage())); - } - }); - } - - @Override - public boolean hasNext() { - return this.sparkResponseHandle.hasNext(); - } - - @Override - public ExprValue next() { - return this.sparkResponseHandle.next(); - } - - @Override - public String explain() { - return String.format(Locale.ROOT, "sql(%s)", request.getSql()); - } - - @Override - public ExecutionEngine.Schema schema() { - return this.sparkResponseHandle.schema(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/helper/FlintHelper.java b/spark/src/main/java/org/opensearch/sql/spark/helper/FlintHelper.java deleted file mode 100644 index 206ff4aed4..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/helper/FlintHelper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.helper; - -import lombok.Getter; - -public class FlintHelper { - // TODO should be replaced with mvn jar. - public static final String FLINT_INTEGRATION_JAR = - "s3://spark-datasource/flint-spark-integration-assembly-0.3.0-SNAPSHOT.jar"; - public static final String FLINT_DEFAULT_HOST = "localhost"; - public static final String FLINT_DEFAULT_PORT = "9200"; - public static final String FLINT_DEFAULT_SCHEME = "http"; - public static final String FLINT_DEFAULT_AUTH = "noauth"; - public static final String FLINT_DEFAULT_REGION = "us-west-2"; - - @Getter private final String flintIntegrationJar; - @Getter private final String flintHost; - @Getter private final String flintPort; - @Getter private final String flintScheme; - @Getter private final String flintAuth; - @Getter private final String flintRegion; - - /** - * Arguments required to write data to opensearch index using flint integration. - * - * @param flintHost Opensearch host for flint - * @param flintPort Opensearch port for flint integration - * @param flintScheme Opensearch scheme for flint integration - * @param flintAuth Opensearch auth for flint integration - * @param flintRegion Opensearch region for flint integration - */ - public FlintHelper( - String flintIntegrationJar, - String flintHost, - String flintPort, - String flintScheme, - String flintAuth, - String flintRegion) { - this.flintIntegrationJar = - flintIntegrationJar == null ? FLINT_INTEGRATION_JAR : flintIntegrationJar; - this.flintHost = flintHost != null ? flintHost : FLINT_DEFAULT_HOST; - this.flintPort = flintPort != null ? flintPort : FLINT_DEFAULT_PORT; - this.flintScheme = flintScheme != null ? flintScheme : FLINT_DEFAULT_SCHEME; - this.flintAuth = flintAuth != null ? flintAuth : FLINT_DEFAULT_AUTH; - this.flintRegion = flintRegion != null ? flintRegion : FLINT_DEFAULT_REGION; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java b/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java deleted file mode 100644 index 94c9795161..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/request/SparkQueryRequest.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.request; - -import lombok.Data; - -/** Spark query request. */ -@Data -public class SparkQueryRequest { - - /** SQL. */ - private String sql; -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java b/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java deleted file mode 100644 index e225804043..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.response; - -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; - -import com.google.common.annotations.VisibleForTesting; -import lombok.Data; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; -import org.opensearch.ResourceNotFoundException; -import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.delete.DeleteRequest; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.Client; -import org.opensearch.common.action.ActionFuture; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.SearchHit; -import org.opensearch.search.builder.SearchSourceBuilder; - -@Data -public class SparkResponse { - private final Client client; - private String value; - private final String field; - private static final Logger LOG = LogManager.getLogger(); - - /** - * Response for spark sql query. - * - * @param client Opensearch client - * @param value Identifier field value - * @param field Identifier field name - */ - public SparkResponse(Client client, String value, String field) { - this.client = client; - this.value = value; - this.field = field; - } - - public JSONObject getResultFromOpensearchIndex() { - return searchInSparkIndex(QueryBuilders.termQuery(field, value)); - } - - private JSONObject searchInSparkIndex(QueryBuilder query) { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(DEFAULT_RESULT_INDEX); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(query); - searchRequest.source(searchSourceBuilder); - ActionFuture searchResponseActionFuture; - try { - searchResponseActionFuture = client.search(searchRequest); - } catch (Exception e) { - throw new RuntimeException(e); - } - SearchResponse searchResponse = searchResponseActionFuture.actionGet(); - if (searchResponse.status().getStatus() != 200) { - throw new RuntimeException( - "Fetching result from " - + DEFAULT_RESULT_INDEX - + " index failed with status : " - + searchResponse.status()); - } else { - JSONObject data = new JSONObject(); - for (SearchHit searchHit : searchResponse.getHits().getHits()) { - data.put("data", searchHit.getSourceAsMap()); - deleteInSparkIndex(searchHit.getId()); - } - return data; - } - } - - @VisibleForTesting - void deleteInSparkIndex(String id) { - DeleteRequest deleteRequest = new DeleteRequest(DEFAULT_RESULT_INDEX); - deleteRequest.id(id); - ActionFuture deleteResponseActionFuture; - try { - deleteResponseActionFuture = client.delete(deleteRequest); - } catch (Exception e) { - throw new RuntimeException(e); - } - DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet(); - if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) { - LOG.debug("Spark result successfully deleted ", id); - } else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) { - throw new ResourceNotFoundException("Spark result with id " + id + " doesn't exist"); - } else { - throw new RuntimeException( - "Deleting spark result information failed with : " - + deleteResponse.getResult().getLowercase()); - } - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkScan.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkScan.java deleted file mode 100644 index 395e1685a6..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkScan.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.TableScanOperator; - -/** Spark scan operator. */ -@EqualsAndHashCode(onlyExplicitlyIncluded = true, callSuper = false) -@ToString(onlyExplicitlyIncluded = true) -public class SparkScan extends TableScanOperator { - - private final SparkClient sparkClient; - - @EqualsAndHashCode.Include @Getter @Setter @ToString.Include private SparkQueryRequest request; - - /** - * Constructor. - * - * @param sparkClient sparkClient. - */ - public SparkScan(SparkClient sparkClient) { - this.sparkClient = sparkClient; - this.request = new SparkQueryRequest(); - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public ExprValue next() { - return null; - } - - @Override - public String explain() { - return getRequest().toString(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java deleted file mode 100644 index 84c9c05e79..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import java.util.Collection; -import java.util.Collections; -import lombok.RequiredArgsConstructor; -import org.opensearch.sql.DataSourceSchemaName; -import org.opensearch.sql.expression.function.FunctionResolver; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; -import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.Table; - -/** Spark storage engine implementation. */ -@RequiredArgsConstructor -public class SparkStorageEngine implements StorageEngine { - private final SparkClient sparkClient; - - @Override - public Collection getFunctions() { - return Collections.singletonList(new SparkSqlTableFunctionResolver(sparkClient)); - } - - @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { - throw new RuntimeException("Unable to get table from storage engine."); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java deleted file mode 100644 index 4495eb0fac..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageFactory.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; -import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; -import java.security.AccessController; -import java.security.InvalidParameterException; -import java.security.PrivilegedAction; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import org.opensearch.client.Client; -import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.datasources.auth.AuthenticationType; -import org.opensearch.sql.spark.client.EmrClientImpl; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.helper.FlintHelper; -import org.opensearch.sql.spark.response.SparkResponse; -import org.opensearch.sql.storage.DataSourceFactory; -import org.opensearch.sql.storage.StorageEngine; - -/** Storage factory implementation for spark connector. */ -@RequiredArgsConstructor -public class SparkStorageFactory implements DataSourceFactory { - private final Client client; - private final Settings settings; - - public static final String EMR = "emr"; - public static final String STEP_ID_FIELD = "stepId.keyword"; - // Spark datasource configuration properties - public static final String CONNECTOR_TYPE = "spark.connector"; - public static final String SPARK_SQL_APPLICATION = "spark.sql.application"; - - // EMR configuration properties - public static final String EMR_CLUSTER = "emr.cluster"; - public static final String EMR_AUTH_TYPE = "emr.auth.type"; - public static final String EMR_REGION = "emr.auth.region"; - public static final String EMR_ACCESS_KEY = "emr.auth.access_key"; - public static final String EMR_SECRET_KEY = "emr.auth.secret_key"; - - // Flint integration jar configuration properties - public static final String FLINT_INTEGRATION = "spark.datasource.flint.integration"; - public static final String FLINT_HOST = "spark.datasource.flint.host"; - public static final String FLINT_PORT = "spark.datasource.flint.port"; - public static final String FLINT_SCHEME = "spark.datasource.flint.scheme"; - public static final String FLINT_AUTH = "spark.datasource.flint.auth"; - public static final String FLINT_REGION = "spark.datasource.flint.region"; - - @Override - public DataSourceType getDataSourceType() { - return DataSourceType.SPARK; - } - - @Override - public DataSource createDataSource(DataSourceMetadata metadata) { - return new DataSource( - metadata.getName(), DataSourceType.SPARK, getStorageEngine(metadata.getProperties())); - } - - /** - * This function gets spark storage engine. - * - * @param requiredConfig spark config options - * @return spark storage engine object - */ - StorageEngine getStorageEngine(Map requiredConfig) { - SparkClient sparkClient; - if (requiredConfig.get(CONNECTOR_TYPE).equals(EMR)) { - sparkClient = - AccessController.doPrivileged( - (PrivilegedAction) - () -> { - validateEMRConfigProperties(requiredConfig); - return new EmrClientImpl( - getEMRClient( - requiredConfig.get(EMR_ACCESS_KEY), - requiredConfig.get(EMR_SECRET_KEY), - requiredConfig.get(EMR_REGION)), - requiredConfig.get(EMR_CLUSTER), - new FlintHelper( - requiredConfig.get(FLINT_INTEGRATION), - requiredConfig.get(FLINT_HOST), - requiredConfig.get(FLINT_PORT), - requiredConfig.get(FLINT_SCHEME), - requiredConfig.get(FLINT_AUTH), - requiredConfig.get(FLINT_REGION)), - new SparkResponse(client, null, STEP_ID_FIELD), - requiredConfig.get(SPARK_SQL_APPLICATION)); - }); - } else { - throw new InvalidParameterException("Spark connector type is invalid."); - } - return new SparkStorageEngine(sparkClient); - } - - private void validateEMRConfigProperties(Map dataSourceMetadataConfig) - throws IllegalArgumentException { - if (dataSourceMetadataConfig.get(EMR_CLUSTER) == null - || dataSourceMetadataConfig.get(EMR_AUTH_TYPE) == null) { - throw new IllegalArgumentException("EMR config properties are missing."); - } else if (dataSourceMetadataConfig - .get(EMR_AUTH_TYPE) - .equals(AuthenticationType.AWSSIGV4AUTH.getName()) - && (dataSourceMetadataConfig.get(EMR_ACCESS_KEY) == null - || dataSourceMetadataConfig.get(EMR_SECRET_KEY) == null)) { - throw new IllegalArgumentException("EMR auth keys are missing."); - } else if (!dataSourceMetadataConfig - .get(EMR_AUTH_TYPE) - .equals(AuthenticationType.AWSSIGV4AUTH.getName())) { - throw new IllegalArgumentException("Invalid auth type."); - } - } - - private AmazonElasticMapReduce getEMRClient( - String emrAccessKey, String emrSecretKey, String emrRegion) { - return AmazonElasticMapReduceClientBuilder.standard() - .withCredentials( - new AWSStaticCredentialsProvider(new BasicAWSCredentials(emrAccessKey, emrSecretKey))) - .withRegion(emrRegion) - .build(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java deleted file mode 100644 index 731c3df672..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkTable.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import java.util.HashMap; -import java.util.Map; -import lombok.Getter; -import org.opensearch.sql.data.type.ExprType; -import org.opensearch.sql.planner.DefaultImplementor; -import org.opensearch.sql.planner.logical.LogicalPlan; -import org.opensearch.sql.planner.physical.PhysicalPlan; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.Table; -import org.opensearch.sql.storage.read.TableScanBuilder; - -/** Spark table implementation. This can be constructed from SparkQueryRequest. */ -public class SparkTable implements Table { - - private final SparkClient sparkClient; - - @Getter private final SparkQueryRequest sparkQueryRequest; - - /** Constructor for entire Sql Request. */ - public SparkTable(SparkClient sparkService, SparkQueryRequest sparkQueryRequest) { - this.sparkClient = sparkService; - this.sparkQueryRequest = sparkQueryRequest; - } - - @Override - public boolean exists() { - throw new UnsupportedOperationException( - "Exists operation is not supported in spark datasource"); - } - - @Override - public void create(Map schema) { - throw new UnsupportedOperationException( - "Create operation is not supported in spark datasource"); - } - - @Override - public Map getFieldTypes() { - return new HashMap<>(); - } - - @Override - public PhysicalPlan implement(LogicalPlan plan) { - SparkScan metricScan = new SparkScan(sparkClient); - metricScan.setRequest(sparkQueryRequest); - return plan.accept(new DefaultImplementor(), metricScan); - } - - @Override - public TableScanBuilder createScanBuilder() { - return new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrClientImplTest.java deleted file mode 100644 index 93dc0d6bc8..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrClientImplTest.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.client; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.when; -import static org.opensearch.sql.spark.constants.TestConstants.EMR_CLUSTER_ID; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; -import static org.opensearch.sql.spark.utils.TestUtils.getJson; - -import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; -import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; -import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult; -import com.amazonaws.services.elasticmapreduce.model.Step; -import com.amazonaws.services.elasticmapreduce.model.StepStatus; -import lombok.SneakyThrows; -import org.json.JSONObject; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.spark.helper.FlintHelper; -import org.opensearch.sql.spark.response.SparkResponse; - -@ExtendWith(MockitoExtension.class) -public class EmrClientImplTest { - - @Mock private AmazonElasticMapReduce emr; - @Mock private FlintHelper flint; - @Mock private SparkResponse sparkResponse; - - @Test - @SneakyThrows - void testRunEmrApplication() { - AddJobFlowStepsResult addStepsResult = new AddJobFlowStepsResult().withStepIds(EMR_CLUSTER_ID); - when(emr.addJobFlowSteps(any())).thenReturn(addStepsResult); - - StepStatus stepStatus = new StepStatus(); - stepStatus.setState("COMPLETED"); - Step step = new Step(); - step.setStatus(stepStatus); - DescribeStepResult describeStepResult = new DescribeStepResult(); - describeStepResult.setStep(step); - when(emr.describeStep(any())).thenReturn(describeStepResult); - - EmrClientImpl emrClientImpl = - new EmrClientImpl(emr, EMR_CLUSTER_ID, flint, sparkResponse, null); - emrClientImpl.runEmrApplication(QUERY); - } - - @Test - @SneakyThrows - void testRunEmrApplicationFailed() { - AddJobFlowStepsResult addStepsResult = new AddJobFlowStepsResult().withStepIds(EMR_CLUSTER_ID); - when(emr.addJobFlowSteps(any())).thenReturn(addStepsResult); - - StepStatus stepStatus = new StepStatus(); - stepStatus.setState("FAILED"); - Step step = new Step(); - step.setStatus(stepStatus); - DescribeStepResult describeStepResult = new DescribeStepResult(); - describeStepResult.setStep(step); - when(emr.describeStep(any())).thenReturn(describeStepResult); - - EmrClientImpl emrClientImpl = - new EmrClientImpl(emr, EMR_CLUSTER_ID, flint, sparkResponse, null); - RuntimeException exception = - Assertions.assertThrows( - RuntimeException.class, () -> emrClientImpl.runEmrApplication(QUERY)); - Assertions.assertEquals("Spark SQL application failed.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testRunEmrApplicationCancelled() { - AddJobFlowStepsResult addStepsResult = new AddJobFlowStepsResult().withStepIds(EMR_CLUSTER_ID); - when(emr.addJobFlowSteps(any())).thenReturn(addStepsResult); - - StepStatus stepStatus = new StepStatus(); - stepStatus.setState("CANCELLED"); - Step step = new Step(); - step.setStatus(stepStatus); - DescribeStepResult describeStepResult = new DescribeStepResult(); - describeStepResult.setStep(step); - when(emr.describeStep(any())).thenReturn(describeStepResult); - - EmrClientImpl emrClientImpl = - new EmrClientImpl(emr, EMR_CLUSTER_ID, flint, sparkResponse, null); - RuntimeException exception = - Assertions.assertThrows( - RuntimeException.class, () -> emrClientImpl.runEmrApplication(QUERY)); - Assertions.assertEquals("Spark SQL application failed.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testRunEmrApplicationRunnning() { - AddJobFlowStepsResult addStepsResult = new AddJobFlowStepsResult().withStepIds(EMR_CLUSTER_ID); - when(emr.addJobFlowSteps(any())).thenReturn(addStepsResult); - - StepStatus runningStatus = new StepStatus(); - runningStatus.setState("RUNNING"); - Step runningStep = new Step(); - runningStep.setStatus(runningStatus); - DescribeStepResult runningDescribeStepResult = new DescribeStepResult(); - runningDescribeStepResult.setStep(runningStep); - - StepStatus completedStatus = new StepStatus(); - completedStatus.setState("COMPLETED"); - Step completedStep = new Step(); - completedStep.setStatus(completedStatus); - DescribeStepResult completedDescribeStepResult = new DescribeStepResult(); - completedDescribeStepResult.setStep(completedStep); - - when(emr.describeStep(any())) - .thenReturn(runningDescribeStepResult) - .thenReturn(completedDescribeStepResult); - - EmrClientImpl emrClientImpl = - new EmrClientImpl(emr, EMR_CLUSTER_ID, flint, sparkResponse, null); - emrClientImpl.runEmrApplication(QUERY); - } - - @Test - @SneakyThrows - void testSql() { - AddJobFlowStepsResult addStepsResult = new AddJobFlowStepsResult().withStepIds(EMR_CLUSTER_ID); - when(emr.addJobFlowSteps(any())).thenReturn(addStepsResult); - - StepStatus runningStatus = new StepStatus(); - runningStatus.setState("RUNNING"); - Step runningStep = new Step(); - runningStep.setStatus(runningStatus); - DescribeStepResult runningDescribeStepResult = new DescribeStepResult(); - runningDescribeStepResult.setStep(runningStep); - - StepStatus completedStatus = new StepStatus(); - completedStatus.setState("COMPLETED"); - Step completedStep = new Step(); - completedStep.setStatus(completedStatus); - DescribeStepResult completedDescribeStepResult = new DescribeStepResult(); - completedDescribeStepResult.setStep(completedStep); - - when(emr.describeStep(any())) - .thenReturn(runningDescribeStepResult) - .thenReturn(completedDescribeStepResult); - when(sparkResponse.getResultFromOpensearchIndex()) - .thenReturn(new JSONObject(getJson("select_query_response.json"))); - - EmrClientImpl emrClientImpl = - new EmrClientImpl(emr, EMR_CLUSTER_ID, flint, sparkResponse, null); - emrClientImpl.sql(QUERY); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java b/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java deleted file mode 100644 index 09a3163d98..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.constants; - -public class TestConstants { - public static final String QUERY = "select 1"; - public static final String TEST_DATASOURCE_NAME = "test_datasource_name"; - public static final String EMR_CLUSTER_ID = "j-123456789"; - public static final String EMR_JOB_ID = "job-123xxx"; - public static final String EMRS_APPLICATION_ID = "app-xxxxx"; - public static final String EMRS_EXECUTION_ROLE = "execution_role"; - public static final String EMRS_DATASOURCE_ROLE = "datasource_role"; - public static final String EMRS_JOB_NAME = "job_name"; - public static final String SPARK_SUBMIT_PARAMETERS = "--conf org.flint.sql.SQLJob"; - public static final String TEST_CLUSTER_NAME = "TEST_CLUSTER"; - public static final String MOCK_SESSION_ID = "s-0123456"; - public static final String MOCK_STATEMENT_ID = "st-0123456"; - public static final String ENTRY_POINT_START_JAR = - "file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.3.0-SNAPSHOT.jar"; - public static final String DEFAULT_RESULT_INDEX = "query_execution_result_ds1"; - public static final String US_EAST_REGION = "us-east-1"; - public static final String US_WEST_REGION = "us-west-1"; -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/data/type/SparkDataTypeTest.java b/spark/src/test/java/org/opensearch/sql/spark/data/type/SparkDataTypeTest.java deleted file mode 100644 index ff6cee2a5e..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/data/type/SparkDataTypeTest.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.data.type; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.jupiter.api.Test; - -class SparkDataTypeTest { - @Test - public void testTypeName() { - SparkDataType sparkDataType = new SparkDataType("TYPE_NAME"); - - assertEquals("TYPE_NAME", sparkDataType.typeName()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/data/value/SparkExprValueTest.java b/spark/src/test/java/org/opensearch/sql/spark/data/value/SparkExprValueTest.java deleted file mode 100644 index e58f240f5c..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/data/value/SparkExprValueTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.data.value; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.Test; -import org.opensearch.sql.spark.data.type.SparkDataType; - -class SparkExprValueTest { - @Test - public void type() { - assertEquals( - new SparkDataType("char"), new SparkExprValue(new SparkDataType("char"), "str").type()); - } - - @Test - public void unsupportedCompare() { - SparkDataType type = new SparkDataType("char"); - - assertThrows( - UnsupportedOperationException.class, - () -> new SparkExprValue(type, "str").compare(new SparkExprValue(type, "str"))); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java deleted file mode 100644 index 120747e0d3..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionImplementationTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import java.util.List; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.exception.ExpressionEvaluationException; -import org.opensearch.sql.expression.DSL; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.function.FunctionName; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkTable; - -@ExtendWith(MockitoExtension.class) -public class SparkSqlFunctionImplementationTest { - @Mock private SparkClient client; - - @Test - void testValueOfAndTypeToString() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList = - List.of(DSL.namedArgument("query", DSL.literal(QUERY))); - SparkSqlFunctionImplementation sparkSqlFunctionImplementation = - new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - UnsupportedOperationException exception = - assertThrows( - UnsupportedOperationException.class, () -> sparkSqlFunctionImplementation.valueOf()); - assertEquals( - "Spark defined function [sql] is only " - + "supported in SOURCE clause with spark connector catalog", - exception.getMessage()); - assertEquals("sql(query=\"select 1\")", sparkSqlFunctionImplementation.toString()); - assertEquals(ExprCoreType.STRUCT, sparkSqlFunctionImplementation.type()); - } - - @Test - void testApplyArguments() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList = - List.of(DSL.namedArgument("query", DSL.literal(QUERY))); - SparkSqlFunctionImplementation sparkSqlFunctionImplementation = - new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - SparkTable sparkTable = (SparkTable) sparkSqlFunctionImplementation.applyArguments(); - assertNotNull(sparkTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest = sparkTable.getSparkQueryRequest(); - assertEquals(QUERY, sparkQueryRequest.getSql()); - } - - @Test - void testApplyArgumentsException() { - FunctionName functionName = new FunctionName("sql"); - List namedArgumentExpressionList = - List.of( - DSL.namedArgument("query", DSL.literal(QUERY)), - DSL.namedArgument("tmp", DSL.literal(12345))); - SparkSqlFunctionImplementation sparkSqlFunctionImplementation = - new SparkSqlFunctionImplementation(functionName, namedArgumentExpressionList, client); - ExpressionEvaluationException exception = - assertThrows( - ExpressionEvaluationException.class, - () -> sparkSqlFunctionImplementation.applyArguments()); - assertEquals("Invalid Function Argument:tmp", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java deleted file mode 100644 index 212056eb15..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanBuilderTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions; - -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; -import org.opensearch.sql.planner.logical.LogicalProject; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanOperator; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.TableScanOperator; - -public class SparkSqlFunctionTableScanBuilderTest { - @Mock private SparkClient sparkClient; - - @Mock private LogicalProject logicalProject; - - @Test - void testBuild() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanBuilder sparkSqlFunctionTableScanBuilder = - new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); - TableScanOperator sqlFunctionTableScanOperator = sparkSqlFunctionTableScanBuilder.build(); - Assertions.assertTrue( - sqlFunctionTableScanOperator instanceof SparkSqlFunctionTableScanOperator); - } - - @Test - void testPushProject() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanBuilder sparkSqlFunctionTableScanBuilder = - new SparkSqlFunctionTableScanBuilder(sparkClient, sparkQueryRequest); - Assertions.assertTrue(sparkSqlFunctionTableScanBuilder.pushDownProject(logicalProject)); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanOperatorTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanOperatorTest.java deleted file mode 100644 index d44e3d271a..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlFunctionTableScanOperatorTest.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; -import static org.opensearch.sql.data.model.ExprValueUtils.nullValue; -import static org.opensearch.sql.data.model.ExprValueUtils.stringValue; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; -import static org.opensearch.sql.spark.utils.TestUtils.getJson; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import lombok.SneakyThrows; -import org.json.JSONArray; -import org.json.JSONObject; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.data.model.ExprBooleanValue; -import org.opensearch.sql.data.model.ExprByteValue; -import org.opensearch.sql.data.model.ExprDateValue; -import org.opensearch.sql.data.model.ExprDoubleValue; -import org.opensearch.sql.data.model.ExprFloatValue; -import org.opensearch.sql.data.model.ExprIntegerValue; -import org.opensearch.sql.data.model.ExprLongValue; -import org.opensearch.sql.data.model.ExprNullValue; -import org.opensearch.sql.data.model.ExprShortValue; -import org.opensearch.sql.data.model.ExprStringValue; -import org.opensearch.sql.data.model.ExprTimestampValue; -import org.opensearch.sql.data.model.ExprTupleValue; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.data.type.SparkDataType; -import org.opensearch.sql.spark.data.value.SparkExprValue; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanOperator; -import org.opensearch.sql.spark.request.SparkQueryRequest; - -@ExtendWith(MockitoExtension.class) -public class SparkSqlFunctionTableScanOperatorTest { - - @Mock private SparkClient sparkClient; - - @Test - @SneakyThrows - void testEmptyQueryWithException() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenThrow(new IOException("Error Message")); - RuntimeException runtimeException = - assertThrows(RuntimeException.class, sparkSqlFunctionTableScanOperator::open); - assertEquals( - "Error fetching data from spark server: Error Message", runtimeException.getMessage()); - } - - @Test - @SneakyThrows - void testClose() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - sparkSqlFunctionTableScanOperator.close(); - } - - @Test - @SneakyThrows - void testExplain() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - Assertions.assertEquals("sql(select 1)", sparkSqlFunctionTableScanOperator.explain()); - } - - @Test - @SneakyThrows - void testQueryResponseIterator() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("select_query_response.json"))); - sparkSqlFunctionTableScanOperator.open(); - assertTrue(sparkSqlFunctionTableScanOperator.hasNext()); - ExprTupleValue firstRow = - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("1", new ExprIntegerValue(1)); - } - }); - assertEquals(firstRow, sparkSqlFunctionTableScanOperator.next()); - Assertions.assertFalse(sparkSqlFunctionTableScanOperator.hasNext()); - } - - @Test - @SneakyThrows - void testQueryResponseAllTypes() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("all_data_type.json"))); - sparkSqlFunctionTableScanOperator.open(); - assertTrue(sparkSqlFunctionTableScanOperator.hasNext()); - ExprTupleValue firstRow = - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("boolean", ExprBooleanValue.of(true)); - put("long", new ExprLongValue(922337203)); - put("integer", new ExprIntegerValue(2147483647)); - put("short", new ExprShortValue(32767)); - put("byte", new ExprByteValue(127)); - put("double", new ExprDoubleValue(9223372036854.775807)); - put("float", new ExprFloatValue(21474.83647)); - put("timestamp", new ExprDateValue("2023-07-01 10:31:30")); - put("date", new ExprTimestampValue("2023-07-01 10:31:30")); - put("string", new ExprStringValue("ABC")); - put("char", new SparkExprValue(new SparkDataType("char"), "A")); - } - }); - assertEquals(firstRow, sparkSqlFunctionTableScanOperator.next()); - Assertions.assertFalse(sparkSqlFunctionTableScanOperator.hasNext()); - } - - @Test - @SneakyThrows - void testQueryResponseSparkDataType() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("spark_data_type.json"))); - sparkSqlFunctionTableScanOperator.open(); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put( - "struct_column", - new SparkExprValue( - new SparkDataType("struct"), - new JSONObject("{\"struct_value\":\"value\"}}").toMap())); - put( - "array_column", - new SparkExprValue( - new SparkDataType("array"), new JSONArray("[1,2]").toList())); - } - }), - sparkSqlFunctionTableScanOperator.next()); - } - - @Test - @SneakyThrows - void testQuerySchema() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("select_query_response.json"))); - sparkSqlFunctionTableScanOperator.open(); - ArrayList columns = new ArrayList<>(); - columns.add(new ExecutionEngine.Schema.Column("1", "1", ExprCoreType.INTEGER)); - ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns); - assertEquals(expectedSchema, sparkSqlFunctionTableScanOperator.schema()); - } - - /** https://github.com/opensearch-project/sql/issues/2210. */ - @Test - @SneakyThrows - void issue2210() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())).thenReturn(new JSONObject(getJson("issue2210.json"))); - sparkSqlFunctionTableScanOperator.open(); - assertTrue(sparkSqlFunctionTableScanOperator.hasNext()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("col_name", stringValue("day")); - put("data_type", stringValue("int")); - put("comment", nullValue()); - } - }), - sparkSqlFunctionTableScanOperator.next()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("col_name", stringValue("# Partition Information")); - put("data_type", stringValue("")); - put("comment", stringValue("")); - } - }), - sparkSqlFunctionTableScanOperator.next()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("col_name", stringValue("# col_name")); - put("data_type", stringValue("data_type")); - put("comment", stringValue("comment")); - } - }), - sparkSqlFunctionTableScanOperator.next()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("col_name", stringValue("day")); - put("data_type", stringValue("int")); - put("comment", nullValue()); - } - }), - sparkSqlFunctionTableScanOperator.next()); - Assertions.assertFalse(sparkSqlFunctionTableScanOperator.hasNext()); - } - - @Test - @SneakyThrows - public void issue2367MissingFields() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - - SparkSqlFunctionTableScanOperator sparkSqlFunctionTableScanOperator = - new SparkSqlFunctionTableScanOperator(sparkClient, sparkQueryRequest); - - when(sparkClient.sql(any())) - .thenReturn( - new JSONObject( - "{\n" - + " \"data\": {\n" - + " \"result\": [\n" - + " \"{}\",\n" - + " \"{'srcPort':20641}\"\n" - + " ],\n" - + " \"schema\": [\n" - + " \"{'column_name':'srcPort','data_type':'long'}\"\n" - + " ]\n" - + " }\n" - + "}")); - sparkSqlFunctionTableScanOperator.open(); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("srcPort", ExprNullValue.of()); - } - }), - sparkSqlFunctionTableScanOperator.next()); - assertEquals( - new ExprTupleValue( - new LinkedHashMap<>() { - { - put("srcPort", new ExprLongValue(20641L)); - } - }), - sparkSqlFunctionTableScanOperator.next()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java b/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java deleted file mode 100644 index a828ac76c4..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/functions/SparkSqlTableFunctionResolverTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.functions; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.opensearch.sql.data.type.ExprCoreType.STRING; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import java.util.List; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.exception.SemanticCheckException; -import org.opensearch.sql.expression.DSL; -import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.function.FunctionBuilder; -import org.opensearch.sql.expression.function.FunctionName; -import org.opensearch.sql.expression.function.FunctionProperties; -import org.opensearch.sql.expression.function.FunctionSignature; -import org.opensearch.sql.expression.function.TableFunctionImplementation; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation; -import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.spark.storage.SparkTable; - -@ExtendWith(MockitoExtension.class) -public class SparkSqlTableFunctionResolverTest { - @Mock private SparkClient client; - - @Mock private FunctionProperties functionProperties; - - @Test - void testResolve() { - SparkSqlTableFunctionResolver sqlTableFunctionResolver = - new SparkSqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions = List.of(DSL.namedArgument("query", DSL.literal(QUERY))); - FunctionSignature functionSignature = - new FunctionSignature( - functionName, expressions.stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution = - sqlTableFunctionResolver.resolve(functionSignature); - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - FunctionBuilder functionBuilder = resolution.getValue(); - TableFunctionImplementation functionImplementation = - (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); - assertTrue(functionImplementation instanceof SparkSqlFunctionImplementation); - SparkTable sparkTable = (SparkTable) functionImplementation.applyArguments(); - assertNotNull(sparkTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest = sparkTable.getSparkQueryRequest(); - assertEquals(QUERY, sparkQueryRequest.getSql()); - } - - @Test - void testArgumentsPassedByPosition() { - SparkSqlTableFunctionResolver sqlTableFunctionResolver = - new SparkSqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions = List.of(DSL.namedArgument(null, DSL.literal(QUERY))); - FunctionSignature functionSignature = - new FunctionSignature( - functionName, expressions.stream().map(Expression::type).collect(Collectors.toList())); - - Pair resolution = - sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - FunctionBuilder functionBuilder = resolution.getValue(); - TableFunctionImplementation functionImplementation = - (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); - assertTrue(functionImplementation instanceof SparkSqlFunctionImplementation); - SparkTable sparkTable = (SparkTable) functionImplementation.applyArguments(); - assertNotNull(sparkTable.getSparkQueryRequest()); - SparkQueryRequest sparkQueryRequest = sparkTable.getSparkQueryRequest(); - assertEquals(QUERY, sparkQueryRequest.getSql()); - } - - @Test - void testMixedArgumentTypes() { - SparkSqlTableFunctionResolver sqlTableFunctionResolver = - new SparkSqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions = - List.of( - DSL.namedArgument("query", DSL.literal(QUERY)), - DSL.namedArgument(null, DSL.literal(12345))); - FunctionSignature functionSignature = - new FunctionSignature( - functionName, expressions.stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution = - sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - SemanticCheckException exception = - assertThrows( - SemanticCheckException.class, - () -> resolution.getValue().apply(functionProperties, expressions)); - - assertEquals("Arguments should be either passed by name or position", exception.getMessage()); - } - - @Test - void testWrongArgumentsSizeWhenPassedByName() { - SparkSqlTableFunctionResolver sqlTableFunctionResolver = - new SparkSqlTableFunctionResolver(client); - FunctionName functionName = FunctionName.of("sql"); - List expressions = List.of(); - FunctionSignature functionSignature = - new FunctionSignature( - functionName, expressions.stream().map(Expression::type).collect(Collectors.toList())); - Pair resolution = - sqlTableFunctionResolver.resolve(functionSignature); - - assertEquals(functionName, resolution.getKey().getFunctionName()); - assertEquals(functionName, sqlTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING), resolution.getKey().getParamTypeList()); - SemanticCheckException exception = - assertThrows( - SemanticCheckException.class, - () -> resolution.getValue().apply(functionProperties, expressions)); - - assertEquals("Missing arguments:[query]", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java deleted file mode 100644 index bad26a2792..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.response; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; -import static org.opensearch.sql.spark.constants.TestConstants.EMR_CLUSTER_ID; - -import java.util.Map; -import org.apache.lucene.search.TotalHits; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.ResourceNotFoundException; -import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.Client; -import org.opensearch.common.action.ActionFuture; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.search.SearchHit; -import org.opensearch.search.SearchHits; - -@ExtendWith(MockitoExtension.class) -public class SparkResponseTest { - @Mock private Client client; - @Mock private SearchResponse searchResponse; - @Mock private DeleteResponse deleteResponse; - @Mock private SearchHit searchHit; - @Mock private ActionFuture searchResponseActionFuture; - @Mock private ActionFuture deleteResponseActionFuture; - - @Test - public void testGetResultFromOpensearchIndex() { - when(client.search(any())).thenReturn(searchResponseActionFuture); - when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); - when(searchResponse.status()).thenReturn(RestStatus.OK); - when(searchResponse.getHits()) - .thenReturn( - new SearchHits( - new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); - Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_CLUSTER_ID)); - - when(client.delete(any())).thenReturn(deleteResponseActionFuture); - when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); - when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED); - - SparkResponse sparkResponse = new SparkResponse(client, EMR_CLUSTER_ID, "stepId"); - assertFalse(sparkResponse.getResultFromOpensearchIndex().isEmpty()); - } - - @Test - public void testInvalidSearchResponse() { - when(client.search(any())).thenReturn(searchResponseActionFuture); - when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); - when(searchResponse.status()).thenReturn(RestStatus.NO_CONTENT); - - SparkResponse sparkResponse = new SparkResponse(client, EMR_CLUSTER_ID, "stepId"); - RuntimeException exception = - assertThrows(RuntimeException.class, () -> sparkResponse.getResultFromOpensearchIndex()); - Assertions.assertEquals( - "Fetching result from " - + DEFAULT_RESULT_INDEX - + " index failed with status : " - + RestStatus.NO_CONTENT, - exception.getMessage()); - } - - @Test - public void testSearchFailure() { - when(client.search(any())).thenThrow(RuntimeException.class); - SparkResponse sparkResponse = new SparkResponse(client, EMR_CLUSTER_ID, "stepId"); - assertThrows(RuntimeException.class, () -> sparkResponse.getResultFromOpensearchIndex()); - } - - @Test - public void testDeleteFailure() { - when(client.delete(any())).thenThrow(RuntimeException.class); - SparkResponse sparkResponse = new SparkResponse(client, EMR_CLUSTER_ID, "stepId"); - assertThrows(RuntimeException.class, () -> sparkResponse.deleteInSparkIndex("id")); - } - - @Test - public void testNotFoundDeleteResponse() { - when(client.delete(any())).thenReturn(deleteResponseActionFuture); - when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); - when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); - - SparkResponse sparkResponse = new SparkResponse(client, EMR_CLUSTER_ID, "stepId"); - RuntimeException exception = - assertThrows( - ResourceNotFoundException.class, () -> sparkResponse.deleteInSparkIndex("123")); - Assertions.assertEquals("Spark result with id 123 doesn't exist", exception.getMessage()); - } - - @Test - public void testInvalidDeleteResponse() { - when(client.delete(any())).thenReturn(deleteResponseActionFuture); - when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); - when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); - - SparkResponse sparkResponse = new SparkResponse(client, EMR_CLUSTER_ID, "stepId"); - RuntimeException exception = - assertThrows(RuntimeException.class, () -> sparkResponse.deleteInSparkIndex("123")); - Assertions.assertEquals( - "Deleting spark result information failed with : noop", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkScanTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkScanTest.java deleted file mode 100644 index 971db3c33c..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkScanTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import lombok.SneakyThrows; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.spark.client.SparkClient; - -@ExtendWith(MockitoExtension.class) -public class SparkScanTest { - @Mock private SparkClient sparkClient; - - @Test - @SneakyThrows - void testQueryResponseIteratorForQueryRangeFunction() { - SparkScan sparkScan = new SparkScan(sparkClient); - sparkScan.getRequest().setSql(QUERY); - Assertions.assertFalse(sparkScan.hasNext()); - assertNull(sparkScan.next()); - } - - @Test - @SneakyThrows - void testExplain() { - SparkScan sparkScan = new SparkScan(sparkClient); - sparkScan.getRequest().setSql(QUERY); - assertEquals("SparkQueryRequest(sql=select 1)", sparkScan.explain()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java deleted file mode 100644 index 5e7ec76cdb..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Collection; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.DataSourceSchemaName; -import org.opensearch.sql.expression.function.FunctionResolver; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver; - -@ExtendWith(MockitoExtension.class) -public class SparkStorageEngineTest { - @Mock private SparkClient client; - - @Test - public void getFunctions() { - SparkStorageEngine engine = new SparkStorageEngine(client); - Collection functionResolverCollection = engine.getFunctions(); - assertNotNull(functionResolverCollection); - assertEquals(1, functionResolverCollection.size()); - assertTrue( - functionResolverCollection.iterator().next() instanceof SparkSqlTableFunctionResolver); - } - - @Test - public void getTable() { - SparkStorageEngine engine = new SparkStorageEngine(client); - RuntimeException exception = - assertThrows( - RuntimeException.class, - () -> engine.getTable(new DataSourceSchemaName("spark", "default"), "")); - assertEquals("Unable to get table from storage engine.", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java deleted file mode 100644 index ebe3c8f3a9..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.opensearch.sql.spark.constants.TestConstants.EMR_CLUSTER_ID; - -import java.security.InvalidParameterException; -import java.util.HashMap; -import lombok.SneakyThrows; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.client.Client; -import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.storage.StorageEngine; - -@ExtendWith(MockitoExtension.class) -public class SparkStorageFactoryTest { - @Mock private Settings settings; - - @Mock private Client client; - - @Test - void testGetConnectorType() { - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - Assertions.assertEquals(DataSourceType.SPARK, sparkStorageFactory.getDataSourceType()); - } - - @Test - @SneakyThrows - void testGetStorageEngine() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - properties.put("emr.auth.access_key", "access_key"); - properties.put("emr.auth.secret_key", "secret_key"); - properties.put("emr.auth.region", "region"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - StorageEngine storageEngine = sparkStorageFactory.getStorageEngine(properties); - Assertions.assertTrue(storageEngine instanceof SparkStorageEngine); - } - - @Test - @SneakyThrows - void testInvalidConnectorType() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "random"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - InvalidParameterException exception = - Assertions.assertThrows( - InvalidParameterException.class, - () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("Spark connector type is invalid.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testMissingAuth() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("EMR config properties are missing.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testUnsupportedEmrAuth() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "basic"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("Invalid auth type.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testMissingCluster() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.auth.type", "awssigv4"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("EMR config properties are missing.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testMissingAuthKeys() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("EMR auth keys are missing.", exception.getMessage()); - } - - @Test - @SneakyThrows - void testMissingAuthSecretKey() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - properties.put("emr.auth.access_key", "test"); - SparkStorageFactory sparkStorageFactory = new SparkStorageFactory(client, settings); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkStorageFactory.getStorageEngine(properties)); - Assertions.assertEquals("EMR auth keys are missing.", exception.getMessage()); - } - - @Test - void testCreateDataSourceSuccess() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - properties.put("emr.auth.access_key", "access_key"); - properties.put("emr.auth.secret_key", "secret_key"); - properties.put("emr.auth.region", "region"); - properties.put("spark.datasource.flint.host", "localhost"); - properties.put("spark.datasource.flint.port", "9200"); - properties.put("spark.datasource.flint.scheme", "http"); - properties.put("spark.datasource.flint.auth", "false"); - properties.put("spark.datasource.flint.region", "us-west-2"); - - DataSourceMetadata metadata = - new DataSourceMetadata.Builder() - .setName("spark") - .setConnector(DataSourceType.SPARK) - .setProperties(properties) - .build(); - - DataSource dataSource = new SparkStorageFactory(client, settings).createDataSource(metadata); - Assertions.assertTrue(dataSource.getStorageEngine() instanceof SparkStorageEngine); - } - - @Test - void testSetSparkJars() { - HashMap properties = new HashMap<>(); - properties.put("spark.connector", "emr"); - properties.put("spark.sql.application", "s3://spark/spark-sql-job.jar"); - properties.put("emr.cluster", EMR_CLUSTER_ID); - properties.put("emr.auth.type", "awssigv4"); - properties.put("emr.auth.access_key", "access_key"); - properties.put("emr.auth.secret_key", "secret_key"); - properties.put("emr.auth.region", "region"); - properties.put("spark.datasource.flint.integration", "s3://spark/flint-spark-integration.jar"); - - DataSourceMetadata metadata = - new DataSourceMetadata.Builder() - .setName("spark") - .setConnector(DataSourceType.SPARK) - .setProperties(properties) - .build(); - - DataSource dataSource = new SparkStorageFactory(client, settings).createDataSource(metadata); - Assertions.assertTrue(dataSource.getStorageEngine() instanceof SparkStorageEngine); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java deleted file mode 100644 index a70d4ba69e..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkTableTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.opensearch.sql.spark.constants.TestConstants.QUERY; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import lombok.SneakyThrows; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.data.type.ExprType; -import org.opensearch.sql.planner.physical.PhysicalPlan; -import org.opensearch.sql.spark.client.SparkClient; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanBuilder; -import org.opensearch.sql.spark.functions.scan.SparkSqlFunctionTableScanOperator; -import org.opensearch.sql.spark.request.SparkQueryRequest; -import org.opensearch.sql.storage.read.TableScanBuilder; - -@ExtendWith(MockitoExtension.class) -public class SparkTableTest { - @Mock private SparkClient client; - - @Test - void testUnsupportedOperation() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - SparkTable sparkTable = new SparkTable(client, sparkQueryRequest); - - assertThrows(UnsupportedOperationException.class, sparkTable::exists); - assertThrows( - UnsupportedOperationException.class, () -> sparkTable.create(Collections.emptyMap())); - } - - @Test - void testCreateScanBuilderWithSqlTableFunction() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - SparkTable sparkTable = new SparkTable(client, sparkQueryRequest); - TableScanBuilder tableScanBuilder = sparkTable.createScanBuilder(); - Assertions.assertNotNull(tableScanBuilder); - Assertions.assertTrue(tableScanBuilder instanceof SparkSqlFunctionTableScanBuilder); - } - - @Test - @SneakyThrows - void testGetFieldTypesFromSparkQueryRequest() { - SparkTable sparkTable = new SparkTable(client, new SparkQueryRequest()); - Map expectedFieldTypes = new HashMap<>(); - Map fieldTypes = sparkTable.getFieldTypes(); - - assertEquals(expectedFieldTypes, fieldTypes); - verifyNoMoreInteractions(client); - assertNotNull(sparkTable.getSparkQueryRequest()); - } - - @Test - void testImplementWithSqlFunction() { - SparkQueryRequest sparkQueryRequest = new SparkQueryRequest(); - sparkQueryRequest.setSql(QUERY); - SparkTable sparkMetricTable = new SparkTable(client, sparkQueryRequest); - PhysicalPlan plan = - sparkMetricTable.implement(new SparkSqlFunctionTableScanBuilder(client, sparkQueryRequest)); - assertTrue(plan instanceof SparkSqlFunctionTableScanOperator); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java b/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java deleted file mode 100644 index 4336b13aa9..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.utils; - -import java.io.IOException; -import java.util.Objects; - -public class TestUtils { - public static String getJson(String filename) throws IOException { - ClassLoader classLoader = TestUtils.class.getClassLoader(); - return new String( - Objects.requireNonNull(classLoader.getResourceAsStream(filename)).readAllBytes()); - } -} diff --git a/spark/src/test/resources/all_data_type.json b/spark/src/test/resources/all_data_type.json deleted file mode 100644 index a046912319..0000000000 --- a/spark/src/test/resources/all_data_type.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "data": { - "result": [ - "{'boolean':true,'long':922337203,'integer':2147483647,'short':32767,'byte':127,'double':9223372036854.775807,'float':21474.83647,'timestamp':'2023-07-01 10:31:30','date':'2023-07-01 10:31:30','string':'ABC','char':'A'}" - ], - "schema": [ - "{'column_name':'boolean','data_type':'boolean'}", - "{'column_name':'long','data_type':'long'}", - "{'column_name':'integer','data_type':'integer'}", - "{'column_name':'short','data_type':'short'}", - "{'column_name':'byte','data_type':'byte'}", - "{'column_name':'double','data_type':'double'}", - "{'column_name':'float','data_type':'float'}", - "{'column_name':'timestamp','data_type':'timestamp'}", - "{'column_name':'date','data_type':'date'}", - "{'column_name':'string','data_type':'string'}", - "{'column_name':'char','data_type':'char'}" - ], - "stepId": "s-123456789", - "applicationId": "application-abc" - } -} diff --git a/spark/src/test/resources/issue2210.json b/spark/src/test/resources/issue2210.json deleted file mode 100644 index dec24efdc2..0000000000 --- a/spark/src/test/resources/issue2210.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "data": { - "result": [ - "{'col_name':'day','data_type':'int'}", - "{'col_name':'# Partition Information','data_type':'','comment':''}", - "{'col_name':'# col_name','data_type':'data_type','comment':'comment'}", - "{'col_name':'day','data_type':'int'}" - ], - "schema": [ - "{'column_name':'col_name','data_type':'string'}", - "{'column_name':'data_type','data_type':'string'}", - "{'column_name':'comment','data_type':'string'}" - ], - "stepId": "s-123456789", - "applicationId": "application-abc" - } -} diff --git a/spark/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/spark/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker deleted file mode 100644 index ca6ee9cea8..0000000000 --- a/spark/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker +++ /dev/null @@ -1 +0,0 @@ -mock-maker-inline \ No newline at end of file diff --git a/spark/src/test/resources/select_query_response.json b/spark/src/test/resources/select_query_response.json deleted file mode 100644 index 24cb06b49e..0000000000 --- a/spark/src/test/resources/select_query_response.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "data": { - "result": [ - "{'1':1}" - ], - "schema": [ - "{'column_name':'1','data_type':'integer'}" - ], - "stepId": "s-123456789", - "applicationId": "application-abc" - } -} diff --git a/spark/src/test/resources/spark_data_type.json b/spark/src/test/resources/spark_data_type.json deleted file mode 100644 index 79bd047f27..0000000000 --- a/spark/src/test/resources/spark_data_type.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "data": { - "result": [ - "{'struct_column':{'struct_value':'value'},'array_column':[1,2]}" - ], - "schema": [ - "{'column_name':'struct_column','data_type':'struct'}", - "{'column_name':'array_column','data_type':'array'}" - ], - "stepId": "s-123456789", - "applicationId": "application-abc" - } -}