diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index c710ef929..ab3be13ff 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -65,6 +65,7 @@ jobs: with: repository: cdapio/cdap-e2e-tests path: e2e + ref: release/6.11 - name: Cache uses: actions/cache@v4 diff --git a/amazon-redshift-plugin/pom.xml b/amazon-redshift-plugin/pom.xml index 17aa5e48b..17a18caa9 100644 --- a/amazon-redshift-plugin/pom.xml +++ b/amazon-redshift-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Amazon Redshift plugin diff --git a/aurora-mysql-plugin/pom.xml b/aurora-mysql-plugin/pom.xml index df38e7267..654bde42b 100644 --- a/aurora-mysql-plugin/pom.xml +++ b/aurora-mysql-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Aurora DB MySQL plugin diff --git a/aurora-postgresql-plugin/pom.xml b/aurora-postgresql-plugin/pom.xml index cb803d1ba..fd508c8fd 100644 --- a/aurora-postgresql-plugin/pom.xml +++ b/aurora-postgresql-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Aurora DB PostgreSQL plugin diff --git a/cloudsql-mysql-plugin/pom.xml b/cloudsql-mysql-plugin/pom.xml index 44dd1fe8c..012acc177 100644 --- a/cloudsql-mysql-plugin/pom.xml +++ b/cloudsql-mysql-plugin/pom.xml @@ -20,17 +20,51 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 CloudSQL MySQL plugin cloudsql-mysql-plugin 4.0.0 + CloudSQL MySQL database plugins + https://github.com/data-integrations/database-plugins + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + CDAP + cdap-dev@googlegroups.com + CDAP + http://cdap.io + + + + + scm:git:https://github.com/cdapio/hydrator-plugins.git + scm:git:git@github.com:cdapio/hydrator-plugins.git + https://github.com/cdapio/hydrator-plugins.git + HEAD + io.cdap.cdap cdap-etl-api + ${cdap.version} + provided + + + io.cdap.cdap + cdap-api + ${cdap.version} provided @@ -41,11 +75,12 @@ io.cdap.plugin hydrator-common + ${cdap.plugin.version} io.cdap.plugin mysql-plugin - 1.12.0-SNAPSHOT + ${project.version} @@ -59,24 +94,26 @@ io.cdap.cdap hydrator-test + ${cdap.version} + test io.cdap.cdap cdap-data-pipeline3_2.12 + ${cdap.version} test junit junit - - - io.cdap.cdap - cdap-api - provided + ${junit.version} + test org.mockito mockito-core + ${mockito.version} + test org.jetbrains diff --git a/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties b/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties index 5ff3357f2..d4fc1de28 100644 --- a/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties +++ b/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties @@ -12,10 +12,12 @@ errorMessageNumberOfSplits=Split-By Field Name must be specified if Number of Sp errorMessageBoundingQuery=Bounding Query must be specified if Number of Splits is not set to 1. Specify the Bounding Query. errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'URLDecoder: Illegal hex characters in escape (%) pattern - For input string: "$^"' errorMessageInvalidTableName=Table 'Invalidtable' does not exist. Ensure table 'Invalidtable' is set correctly and -errorMessageConnectionName=Connection Name must be in the format :: to connect to a public CloudSQL PostgreSQL instance. +errorMessageConnectionName=Connection Name must be in the format :: to connect to a public CloudSQL MySQL instance. validationSuccessMessage=No errors found. validationErrorMessage=COUNT ERROR found -errorLogsMessageInvalidTableName=Spark program 'phase-1' failed with error: Errors were encountered during validation. \ - Table -errorLogsMessageInvalidCredentials =Spark program 'phase-1' failed with error: Errors were encountered during validation. -errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'table' at line 1. Please check the system logs for more details. +errorLogsMessageInvalidTableName=Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : io.cdap.cdap.etl.api.validation.ValidationException: Errors were encountered during validation. \ + Table 'Table123' does not exist.. Please check the system logs for more details. +errorLogsMessageInvalidCredentials =Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : io.cdap.cdap.etl.api.validation.ValidationException: Errors were encountered during validation. \ + Exception while trying to validate schema of database table +errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : java.io.IOException: You have an error in your SQL syntax; \ + check the manual that corresponds to your MySQL server version for the right syntax to use near 'table' at line 1. Please check the system logs for more details. diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java index 0608edb75..770dd9030 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java @@ -55,7 +55,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { CloudSQLUtil.checkConnectionName( failureCollector, cloudsqlMysqlActionConfig.instanceType, - cloudsqlMysqlActionConfig.connectionName); + cloudsqlMysqlActionConfig.connectionName, + CloudSQLUtil.CLOUDSQL_MYSQL); } super.configurePipeline(pipelineConfigurer); diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java index 86a8e6f52..6cd1b0031 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java @@ -74,7 +74,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { CloudSQLUtil.checkConnectionName( failureCollector, cloudsqlMysqlSinkConfig.connection.getInstanceType(), - cloudsqlMysqlSinkConfig.connection.getConnectionName()); + cloudsqlMysqlSinkConfig.connection.getConnectionName(), + CloudSQLUtil.CLOUDSQL_MYSQL); } super.configurePipeline(pipelineConfigurer); @@ -108,6 +109,11 @@ protected String getErrorDetailsProviderClassName() { return CloudSQLMySQLErrorDetailsProvider.class.getName(); } + @Override + protected String getExternalDocumentationLink() { + return DBUtils.CLOUDSQLMYSQL_SUPPORTED_DOC_URL; + } + @Override protected LineageRecorder getLineageRecorder(BatchSinkContext context) { String host; diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java index 8273169c0..201360c67 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java @@ -70,7 +70,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { CloudSQLUtil.checkConnectionName( failureCollector, cloudsqlMysqlSourceConfig.connection.getInstanceType(), - cloudsqlMysqlSourceConfig.connection.getConnectionName()); + cloudsqlMysqlSourceConfig.connection.getConnectionName(), + CloudSQLUtil.CLOUDSQL_MYSQL); } super.configurePipeline(pipelineConfigurer); diff --git a/cloudsql-postgresql-plugin/pom.xml b/cloudsql-postgresql-plugin/pom.xml index 8faab79ba..d219d5f65 100644 --- a/cloudsql-postgresql-plugin/pom.xml +++ b/cloudsql-postgresql-plugin/pom.xml @@ -20,17 +20,51 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 CloudSQL PostgreSQL plugin cloudsql-postgresql-plugin 4.0.0 + CloudSQL PostgreSQL database plugins + https://github.com/data-integrations/database-plugins + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + CDAP + cdap-dev@googlegroups.com + CDAP + http://cdap.io + + + + + scm:git:https://github.com/cdapio/hydrator-plugins.git + scm:git:git@github.com:cdapio/hydrator-plugins.git + https://github.com/cdapio/hydrator-plugins.git + HEAD + io.cdap.cdap cdap-etl-api + ${cdap.version} + provided + + + io.cdap.cdap + cdap-api + ${cdap.version} provided @@ -41,6 +75,7 @@ io.cdap.plugin hydrator-common + ${cdap.plugin.version} io.cdap.plugin @@ -63,24 +98,26 @@ io.cdap.cdap hydrator-test + ${cdap.version} + test io.cdap.cdap cdap-data-pipeline3_2.12 + ${cdap.version} test junit junit - - - io.cdap.cdap - cdap-api - provided + ${junit.version} + test org.mockito mockito-core + ${mockito.version} + test org.jetbrains diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java index 1a3f8ad7b..5b13759f6 100644 --- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java @@ -55,7 +55,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { CloudSQLUtil.checkConnectionName( failureCollector, cloudsqlPostgresqlActionConfig.instanceType, - cloudsqlPostgresqlActionConfig.connectionName); + cloudsqlPostgresqlActionConfig.connectionName, + CloudSQLUtil.CLOUDSQL_POSTGRESQL); } super.configurePipeline(pipelineConfigurer); diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java index c8ca0d6dc..060b67f82 100644 --- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java @@ -81,7 +81,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { CloudSQLUtil.checkConnectionName( failureCollector, cloudsqlPostgresqlSinkConfig.connection.getInstanceType(), - cloudsqlPostgresqlSinkConfig.connection.getConnectionName()); + cloudsqlPostgresqlSinkConfig.connection.getConnectionName(), + CloudSQLUtil.CLOUDSQL_POSTGRESQL); } super.configurePipeline(pipelineConfigurer); @@ -152,6 +153,11 @@ protected String getErrorDetailsProviderClassName() { return CloudSQLPostgreSQLErrorDetailsProvider.class.getName(); } + @Override + protected String getExternalDocumentationLink() { + return DBUtils.CLOUDSQLPOSTGRES_SUPPORTED_DOC_URL; + } + /** CloudSQL PostgreSQL sink config. */ public static class CloudSQLPostgreSQLSinkConfig extends AbstractDBSpecificSinkConfig { diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java index e32651a9a..db3f2d708 100644 --- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java @@ -70,7 +70,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { CloudSQLUtil.checkConnectionName( failureCollector, cloudsqlPostgresqlSourceConfig.connection.getInstanceType(), - cloudsqlPostgresqlSourceConfig.connection.getConnectionName()); + cloudsqlPostgresqlSourceConfig.connection.getConnectionName(), + CloudSQLUtil.CLOUDSQL_POSTGRESQL); } super.configurePipeline(pipelineConfigurer); diff --git a/database-commons/pom.xml b/database-commons/pom.xml index 683dd2f43..8ee1e295b 100644 --- a/database-commons/pom.xml +++ b/database-commons/pom.xml @@ -20,39 +20,76 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Database Commons database-commons 4.0.0 + Database Commons + https://github.com/data-integrations/database-plugins + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + CDAP + cdap-dev@googlegroups.com + CDAP + http://cdap.io + + + + + scm:git:https://github.com/cdapio/hydrator-plugins.git + scm:git:git@github.com:cdapio/hydrator-plugins.git + https://github.com/cdapio/hydrator-plugins.git + HEAD + io.cdap.cdap cdap-etl-api + ${cdap.version} + provided io.cdap.plugin hydrator-common + ${cdap.plugin.version} com.google.guava guava + ${guava.version} io.cdap.cdap hydrator-test + ${cdap.version} + test io.cdap.cdap cdap-data-pipeline3_2.12 + ${cdap.version} + test junit junit + ${junit.version} + test com.mockrunner @@ -63,6 +100,8 @@ org.mockito mockito-core + ${mockito.version} + test diff --git a/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java index 588ed78b8..c5320e25e 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java @@ -45,6 +45,7 @@ public abstract class ConnectionConfig extends PluginConfig implements DatabaseC public static final String CONNECTION_ARGUMENTS = "connectionArguments"; public static final String JDBC_PLUGIN_NAME = "jdbcPluginName"; public static final String JDBC_PLUGIN_TYPE = "jdbc"; + public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel"; @Name(JDBC_PLUGIN_NAME) @Description("Name of the JDBC driver to use. This is the value of the 'jdbcPluginName' key defined in the JSON " + diff --git a/database-commons/src/main/java/io/cdap/plugin/db/DBErrorDetailsProvider.java b/database-commons/src/main/java/io/cdap/plugin/db/DBErrorDetailsProvider.java deleted file mode 100644 index cc731d6ac..000000000 --- a/database-commons/src/main/java/io/cdap/plugin/db/DBErrorDetailsProvider.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright © 2024 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.plugin.db; - -import com.google.common.base.Strings; -import com.google.common.base.Throwables; -import io.cdap.cdap.api.exception.ErrorCategory; -import io.cdap.cdap.api.exception.ErrorCodeType; -import io.cdap.cdap.api.exception.ErrorType; -import io.cdap.cdap.api.exception.ErrorUtils; -import io.cdap.cdap.api.exception.ProgramFailureException; -import io.cdap.cdap.etl.api.exception.ErrorContext; -import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; - -import java.sql.SQLException; -import java.util.List; - -/** - * A custom ErrorDetailsProvider for Database plugins. - */ -public class DBErrorDetailsProvider implements ErrorDetailsProvider { - - public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { - List causalChain = Throwables.getCausalChain(e); - for (Throwable t : causalChain) { - if (t instanceof ProgramFailureException) { - // if causal chain already has program failure exception, return null to avoid double wrap. - return null; - } - if (t instanceof SQLException) { - return getProgramFailureException((SQLException) t, errorContext); - } - if (t instanceof IllegalArgumentException) { - return getProgramFailureException((IllegalArgumentException) t, errorContext); - } - if (t instanceof IllegalStateException) { - return getProgramFailureException((IllegalStateException) t, errorContext); - } - } - return null; - } - - /** - * Get a ProgramFailureException with the given error - * information from {@link SQLException}. - * - * @param e The SQLException to get the error information from. - * @return A ProgramFailureException with the given error information. - */ - private ProgramFailureException getProgramFailureException(SQLException e, ErrorContext errorContext) { - String errorMessage = e.getMessage(); - String sqlState = e.getSQLState(); - int errorCode = e.getErrorCode(); - String errorMessageWithDetails = String.format( - "Error occurred in the phase: '%s' with sqlState: '%s', errorCode: '%s', errorMessage: %s", - errorContext.getPhase(), sqlState, errorCode, errorMessage); - String externalDocumentationLink = getExternalDocumentationLink(); - if (!Strings.isNullOrEmpty(externalDocumentationLink)) { - if (!errorMessageWithDetails.endsWith(".")) { - errorMessageWithDetails = errorMessageWithDetails + "."; - } - errorMessageWithDetails = String.format("%s For more details, see %s", errorMessageWithDetails, - externalDocumentationLink); - } - return ErrorUtils.getProgramFailureException(Strings.isNullOrEmpty(sqlState) ? - new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN) : getErrorCategoryFromSqlState(sqlState), - errorMessage, errorMessageWithDetails, getErrorTypeFromErrorCode(errorCode, sqlState), true, - ErrorCodeType.SQLSTATE, sqlState, externalDocumentationLink, e); - } - - /** - * Get a ProgramFailureException with the given error - * information from {@link IllegalArgumentException}. - * - * @param e The IllegalArgumentException to get the error information from. - * @return A ProgramFailureException with the given error information. - */ - private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) { - String errorMessage = e.getMessage(); - String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; - return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - errorMessage, - String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); - } - - /** - * Get a ProgramFailureException with the given error - * information from {@link IllegalStateException}. - * - * @param e The IllegalStateException to get the error information from. - * @return A ProgramFailureException with the given error information. - */ - private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { - String errorMessage = e.getMessage(); - String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; - return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - errorMessage, - String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); - } - - /** - * Get the external documentation link for the client errors if available. - * - * @return The external documentation link as a {@link String}. - */ - protected String getExternalDocumentationLink() { - return null; - } - - protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) { - return ErrorType.UNKNOWN; - } - - protected ErrorCategory getErrorCategoryFromSqlState(String sqlState) { - return new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN); - } -} diff --git a/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBAction.java b/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBAction.java index a2be9cbf0..0eaac3148 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBAction.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBAction.java @@ -16,12 +16,15 @@ package io.cdap.plugin.db.action; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; +import io.cdap.plugin.common.db.DBErrorDetailsProvider; import io.cdap.plugin.util.DBUtils; import java.sql.Driver; +import java.sql.SQLException; /** * Action that runs a db command. @@ -40,7 +43,18 @@ public AbstractDBAction(QueryConfig config, Boolean enableAutoCommit) { public void run(ActionContext context) throws Exception { Class driverClass = context.loadPluginClass(JDBC_PLUGIN_ID); DBRun executeQuery = new DBRun(config, driverClass, enableAutoCommit); - executeQuery.run(); + try { + executeQuery.run(); + } catch (Exception e) { + if (e instanceof SQLException) { + DBErrorDetailsProvider dbe = new DBErrorDetailsProvider(); + throw dbe.getProgramFailureException((SQLException) e, null); + } + FailureCollector collector = context.getFailureCollector(); + collector.addFailure("Failed to execute query with message: " + e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } } @Override diff --git a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java index 5c6b08031..8de0e4d70 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java @@ -20,8 +20,9 @@ import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.TransactionIsolationLevel; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; @@ -42,6 +43,12 @@ public abstract class AbstractDBSpecificConnectorConfig extends AbstractDBConnec @Nullable protected Integer port; + @Name(ConnectionConfig.TRANSACTION_ISOLATION_LEVEL) + @Description("The transaction isolation level for the database session.") + @Macro + @Nullable + protected String transactionIsolationLevel; + public String getHost() { return host; } @@ -55,4 +62,21 @@ public int getPort() { public boolean canConnect() { return super.canConnect() && !containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT); } + + @Override + public Map getAdditionalArguments() { + Map additonalArguments = new HashMap<>(); + if (getTransactionIsolationLevel() != null) { + additonalArguments.put(TransactionIsolationLevel.CONF_KEY, getTransactionIsolationLevel()); + } + return additonalArguments; + } + + public String getTransactionIsolationLevel() { + if (transactionIsolationLevel == null) { + return null; + } + return TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name(); + } } + diff --git a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java index 26a95405b..0bb4bf123 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java @@ -25,6 +25,10 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; @@ -43,7 +47,6 @@ import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.ConnectionConfigAccessor; import io.cdap.plugin.db.DBConfig; -import io.cdap.plugin.db.DBErrorDetailsProvider; import io.cdap.plugin.db.DBRecord; import io.cdap.plugin.db.Operation; import io.cdap.plugin.db.SchemaReader; @@ -172,7 +175,17 @@ public void validateOperations(FailureCollector collector, T dbSinkConfig, @Null * @return ErrorDetailsProvider class name */ protected String getErrorDetailsProviderClassName() { - return DBErrorDetailsProvider.class.getName(); + return null; + } + + /** + * Returns the external documentation link. + * Override this method to provide a custom external documentation link. + * + * @return external documentation link + */ + protected String getExternalDocumentationLink() { + return null; } @Override @@ -215,6 +228,7 @@ public void prepareRun(BatchSinkContext context) { configAccessor.setInitQueries(dbSinkConfig.getInitQueries()); configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName()); configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString); + configAccessor.getConfiguration().set(ETLDBOutputFormat.STAGE_NAME, context.getStageName()); String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName() : dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName(); configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName); @@ -240,7 +254,9 @@ public void prepareRun(BatchSinkContext context) { context.getArguments().get(ETLDBOutputFormat.COMMIT_BATCH_SIZE)); } // set error details provider - context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName())); + if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) { + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName())); + } addOutputContext(context); } protected void addOutputContext(BatchSinkContext context) { @@ -296,8 +312,23 @@ private Schema inferSchema(Class driverClass) { inferredFields.addAll(getSchemaReader().getSchemaFields(rs)); } } catch (SQLException e) { - throw new InvalidStageException("Error while reading table metadata", e); - + // wrap exception to ensure SQLException-child instances not exposed to contexts w/o jdbc driver in classpath + String errorMessage = + String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(), + e.getSQLState(), e.getErrorCode()); + String errorMessageWithDetails = String.format("Error while reading table metadata." + + "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState()); + String externalDocumentationLink = getExternalDocumentationLink(); + if (!Strings.isNullOrEmpty(externalDocumentationLink)) { + if (!errorMessage.endsWith(".")) { + errorMessage = errorMessage + "."; + } + errorMessage = String.format("%s For more details, see %s", errorMessageWithDetails, errorMessage); + } + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE, + e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(), + e.getSQLState(), e.getErrorCode())); } } catch (IllegalAccessException | InstantiationException | SQLException e) { throw new InvalidStageException("JDBC Driver unavailable: " + dbSinkConfig.getJdbcPluginName(), e); diff --git a/database-commons/src/main/java/io/cdap/plugin/db/sink/ETLDBOutputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/sink/ETLDBOutputFormat.java index ad2b91ab1..ad196386c 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/sink/ETLDBOutputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/sink/ETLDBOutputFormat.java @@ -25,6 +25,8 @@ import io.cdap.plugin.db.TransactionIsolationLevel; import io.cdap.plugin.util.DBUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; @@ -43,6 +45,7 @@ import java.sql.Statement; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import static io.cdap.plugin.db.ConnectionConfigAccessor.OPERATION_NAME; import static io.cdap.plugin.db.ConnectionConfigAccessor.RELATION_TABLE_KEY; @@ -56,15 +59,92 @@ public class ETLDBOutputFormat extends DBOutputFormat { // Batch size before submitting a batch to the SQL engine. If set to 0, no batches will be submitted until commit. public static final String COMMIT_BATCH_SIZE = "io.cdap.plugin.db.output.commit.batch.size"; + public static final String STAGE_NAME = "io.cdap.plugin.db.output.stage_name"; public static final int DEFAULT_COMMIT_BATCH_SIZE = 1000; private static final Character ESCAPE_CHAR = '"'; + // Format for connection map's key will be "taskAttemptId_stageName" + private static final String CONNECTION_MAP_KEY_FORMAT = "%s_%s"; + + // CONNECTION_MAP will be used to store connections with "taskAttemptId_stageName" as key and + // connection object as value. Making it static to be accessed from multiple task attempts within same executor. + private static final Map CONNECTION_MAP = new ConcurrentHashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(ETLDBOutputFormat.class); private Configuration conf; private Driver driver; private JDBCDriverShim driverShim; + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new OutputCommitter() { + @Override + public void setupJob(JobContext jobContext) throws IOException { + // do nothing + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + // do nothing + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { + return true; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + conf = context.getConfiguration(); + String stageName = conf.get(STAGE_NAME); + String connectionId = getConnectionMapKeyFormat(context.getTaskAttemptID().toString(), stageName); + Connection connection; + if ((connection = CONNECTION_MAP.remove(connectionId)) != null) { + try { + connection.commit(); + } catch (SQLException e) { + try { + connection.rollback(); + } catch (SQLException ex) { + LOG.warn(StringUtils.stringifyException(ex)); + } + throw new IOException(e); + } finally { + try { + connection.close(); + LOG.debug("Connection Closed after committing the task with taskAttemptId {}", connectionId); + } catch (SQLException ex) { + LOG.warn(StringUtils.stringifyException(ex)); + } + } + } + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + conf = context.getConfiguration(); + String stageName = conf.get(STAGE_NAME); + String connectionId = getConnectionMapKeyFormat(context.getTaskAttemptID().toString(), stageName); + Connection connection; + if ((connection = CONNECTION_MAP.remove(connectionId)) != null) { + try { + connection.rollback(); + } catch (SQLException e) { + throw new IOException(e); + } finally { + try { + connection.close(); + LOG.debug("Connection Closed after rollback the task with taskAttemptId {}", connectionId); + } catch (SQLException ex) { + LOG.warn(StringUtils.stringifyException(ex)); + } + } + } + } + }; + } + @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { conf = context.getConfiguration(); @@ -81,6 +161,11 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOE try { Connection connection = getConnection(conf); + String stageName = conf.get(STAGE_NAME); + // If using multiple sinks, task attemptID can be same in that case, appending stage in the end for uniqueness. + String connectionId = getConnectionMapKeyFormat(context.getTaskAttemptID().toString(), stageName); + CONNECTION_MAP.put(connectionId, connection); + LOG.debug("Connection Added to the map with connectionId : {}", connectionId); PreparedStatement statement = connection.prepareStatement(constructQueryOnOperation(tableName, fieldNames, operationName, listKeys)); return new DBRecordWriter(connection, statement) { @@ -98,23 +183,15 @@ public void close(TaskAttemptContext context) throws IOException { if (!emptyData) { getStatement().executeBatch(); } - getConnection().commit(); } catch (SQLException e) { - try { - getConnection().rollback(); - } catch (SQLException ex) { - LOG.warn(StringUtils.stringifyException(ex)); - } throw new IOException(e); } finally { try { getStatement().close(); - getConnection().close(); } catch (SQLException ex) { throw new IOException(ex); } } - try { DriverManager.deregisterDriver(driverShim); } catch (SQLException e) { @@ -298,4 +375,8 @@ public String constructUpdateQuery(String table, String[] fieldNames, String[] l return query.toString(); } } + + private String getConnectionMapKeyFormat(String taskAttemptId, String stageName) { + return String.format(CONNECTION_MAP_KEY_FORMAT, taskAttemptId, stageName); + } } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java index 559985758..54d1e2ab6 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java @@ -46,7 +46,6 @@ import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.ConnectionConfigAccessor; import io.cdap.plugin.db.DBConfig; -import io.cdap.plugin.db.DBErrorDetailsProvider; import io.cdap.plugin.db.DBRecord; import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.TransactionIsolationLevel; @@ -201,18 +200,20 @@ private Schema loadSchemaFromDB(Class driverClass) } catch (SQLException e) { // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath + String errorMessage = + String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(), + e.getSQLState(), e.getErrorCode()); String errorMessageWithDetails = String.format("Error occurred while trying to get schema from database." + "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState()); String externalDocumentationLink = getExternalDocumentationLink(); if (!Strings.isNullOrEmpty(externalDocumentationLink)) { - if (!errorMessageWithDetails.endsWith(".")) { - errorMessageWithDetails = errorMessageWithDetails + "."; + if (!errorMessage.endsWith(".")) { + errorMessage = errorMessage + "."; } - errorMessageWithDetails = String.format("%s For more details, see %s", errorMessageWithDetails, - externalDocumentationLink); + errorMessage = String.format("%s For more details, see %s", errorMessage, externalDocumentationLink); } throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - e.getMessage(), errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE, + errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE, e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(), e.getSQLState(), e.getErrorCode())); } finally { @@ -239,7 +240,7 @@ protected SchemaReader getSchemaReader() { * @return ErrorDetailsProvider class name */ protected String getErrorDetailsProviderClassName() { - return DBErrorDetailsProvider.class.getName(); + return null; } private DriverCleanup loadPluginClassAndGetDriver(Class driverClass) @@ -299,7 +300,9 @@ public void prepareRun(BatchSourceContext context) throws Exception { schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList())); } // set error details provider - context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName())); + if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) { + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName())); + } context.setInput(Input.of(sourceConfig.getReferenceName(), new SourceInputFormatProvider( DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration()))); } @@ -373,6 +376,12 @@ protected Class getDBRecordType() { return DBRecord.class; } + /** + * Returns the external documentation link. + * Override this method to provide a custom external documentation link. + * + * @return external documentation link + */ protected String getExternalDocumentationLink() { return null; } @@ -520,7 +529,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) { } @VisibleForTesting - static void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) { + void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) { if (configSchema == null) { collector.addFailure("Schema should not be null or empty.", null) .withConfigProperty(SCHEMA); @@ -541,14 +550,20 @@ static void validateSchema(Schema actualSchema, Schema configSchema, FailureColl Schema expectedFieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); - if (actualFieldSchema.getType() != expectedFieldSchema.getType() || - actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) { - collector.addFailure( - String.format("Schema field '%s' has type '%s but found '%s'.", - field.getName(), expectedFieldSchema.getDisplayName(), - actualFieldSchema.getDisplayName()), null) - .withOutputSchemaField(field.getName()); - } + validateField(collector, field, actualFieldSchema, expectedFieldSchema); + } + } + + protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema, + Schema expectedFieldSchema) { + if (actualFieldSchema.getType() != expectedFieldSchema.getType() || + actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) { + collector.addFailure( + String.format("Schema field '%s' is expected to have type '%s but found '%s'.", field.getName(), + expectedFieldSchema.getDisplayName(), actualFieldSchema.getDisplayName()), + String.format("Change the data type of field %s to %s.", field.getName(), + actualFieldSchema.getDisplayName())) + .withOutputSchemaField(field.getName()); } } diff --git a/database-commons/src/main/java/io/cdap/plugin/util/CloudSQLUtil.java b/database-commons/src/main/java/io/cdap/plugin/util/CloudSQLUtil.java index 11595ac06..f704f2ad5 100644 --- a/database-commons/src/main/java/io/cdap/plugin/util/CloudSQLUtil.java +++ b/database-commons/src/main/java/io/cdap/plugin/util/CloudSQLUtil.java @@ -31,6 +31,9 @@ public class CloudSQLUtil { public static final String INSTANCE_TYPE = "instanceType"; public static final String PUBLIC_INSTANCE = "public"; public static final String PRIVATE_INSTANCE = "private"; + public static final String CLOUDSQL_POSTGRESQL = "CloudSQL PostgreSQL"; + public static final String CLOUDSQL_MYSQL = "CloudSQL MySQL"; + /** * Utility method to check the Connection Name format of a CloudSQL instance. @@ -38,9 +41,10 @@ public class CloudSQLUtil { * @param failureCollector {@link FailureCollector} for the pipeline * @param instanceType CloudSQL instance type * @param connectionName Connection Name for the CloudSQL instance + * @param databaseType Type of CloudSQL instance- CloudSQL PostgreSQL, CLoudSQL MySQL */ public static void checkConnectionName( - FailureCollector failureCollector, String instanceType, String connectionName) { + FailureCollector failureCollector, String instanceType, String connectionName, String databaseType) { if (PUBLIC_INSTANCE.equalsIgnoreCase(instanceType)) { Pattern connectionNamePattern = @@ -50,16 +54,16 @@ public static void checkConnectionName( if (!matcher.matches()) { failureCollector .addFailure( - "Connection Name must be in the format :: to connect to " - + "a public CloudSQL PostgreSQL instance.", null) + String.format("Connection Name must be in the format :: to connect to " + + "a public %s instance.", databaseType), null) .withConfigProperty(CONNECTION_NAME); } } else { if (!InetAddresses.isInetAddress(connectionName)) { failureCollector .addFailure( - "Enter the internal IP address of the Compute Engine VM cloudsql proxy " - + "is running on, to connect to a private CloudSQL PostgreSQL instance.", null) + String.format("Enter the internal IP address of the Compute Engine VM cloudsql proxy " + + "is running on, to connect to a private %s instance.", databaseType), null) .withConfigProperty(CONNECTION_NAME); } } diff --git a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java index ffcfdc375..b125a7214 100644 --- a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java +++ b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java @@ -61,9 +61,13 @@ public final class DBUtils { public static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender(); public static final String MYSQL_SUPPORTED_DOC_URL = "https://dev.mysql.com/doc/mysql-errors/9.0/en/"; + public static final String MARIADB_SUPPORTED_DOC_URL = "https://mariadb.com/kb/en/mariadb-error-codes/"; + public static final String MSSQL_SUPPORTED_DOC_URL = + "https://docs.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors"; public static final String CLOUDSQLMYSQL_SUPPORTED_DOC_URL = "https://cloud.google.com/sql/docs/mysql/error-messages"; public static final String POSTGRES_SUPPORTED_DOC_URL = "https://www.postgresql.org/docs/current/errcodes-appendix.html"; + public static final String ORACLE_SUPPORTED_DOC_URL = "https://docs.oracle.com/en/error-help/db/ora-index.html"; public static final String CLOUDSQLPOSTGRES_SUPPORTED_DOC_URL = "https://cloud.google.com/sql/docs/postgres/error-messages"; diff --git a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java index 0f5a3ca4a..cbe1361d0 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java @@ -17,6 +17,7 @@ package io.cdap.plugin.db; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ProgramFailureException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -162,49 +163,49 @@ public void testGetSchemaThrowsExceptionOnNumericWithZeroPrecision() throws SQLE reader.getSchema(metadata, 1); } - @Test(expected = SQLException.class) + @Test(expected = ProgramFailureException.class) public void testGetSchemaThrowsExceptionOnArray() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.ARRAY); reader.getSchema(metadata, 1); } - @Test(expected = SQLException.class) + @Test(expected = ProgramFailureException.class) public void testGetSchemaThrowsExceptionOnDatalink() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.DATALINK); reader.getSchema(metadata, 1); } - @Test(expected = SQLException.class) + @Test(expected = ProgramFailureException.class) public void testGetSchemaThrowsExceptionOnDistinct() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.DISTINCT); reader.getSchema(metadata, 1); } - @Test(expected = SQLException.class) + @Test(expected = ProgramFailureException.class) public void testGetSchemaThrowsExceptionOnJavaObject() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.JAVA_OBJECT); reader.getSchema(metadata, 1); } - @Test(expected = SQLException.class) + @Test(expected = ProgramFailureException.class) public void testGetSchemaThrowsExceptionOnOther() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.OTHER); reader.getSchema(metadata, 1); } - @Test(expected = SQLException.class) + @Test(expected = ProgramFailureException.class) public void testGetSchemaThrowsExceptionOnRef() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.REF); reader.getSchema(metadata, 1); } - @Test(expected = SQLException.class) + @Test(expected = ProgramFailureException.class) public void testGetSchemaThrowsExceptionOnSQLXML() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.SQLXML); reader.getSchema(metadata, 1); } - @Test(expected = SQLException.class) + @Test(expected = ProgramFailureException.class) public void testGetSchemaThrowsExceptionOnStruct() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.STRUCT); reader.getSchema(metadata, 1); diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java index 3dc7a2d1c..a8be38b46 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java @@ -43,11 +43,17 @@ public class AbstractDBSourceTest { Schema.Field.of("double_column", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))), Schema.Field.of("boolean_column", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))) ); + private static final AbstractDBSource.DBSourceConfig TEST_CONFIG = new AbstractDBSource.DBSourceConfig() { + @Override + public String getConnectionString() { + return ""; + } + }; @Test public void testValidateSourceSchemaCorrectSchema() { MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); - AbstractDBSource.DBSourceConfig.validateSchema(SCHEMA, SCHEMA, collector); + TEST_CONFIG.validateSchema(SCHEMA, SCHEMA, collector); Assert.assertEquals(0, collector.getValidationFailures().size()); } @@ -65,7 +71,7 @@ public void testValidateSourceSchemaMismatchFields() { ); MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); - AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector); + TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector); assertPropertyValidationFailed(collector, "boolean_column"); } @@ -84,7 +90,7 @@ public void testValidateSourceSchemaInvalidFieldType() { ); MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); - AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector); + TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector); assertPropertyValidationFailed(collector, "boolean_column"); } diff --git a/db2-plugin/pom.xml b/db2-plugin/pom.xml index 39c3fcd52..920ed89c7 100644 --- a/db2-plugin/pom.xml +++ b/db2-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 IBM DB2 plugin diff --git a/generic-database-plugin/pom.xml b/generic-database-plugin/pom.xml index b823356d8..08f979397 100644 --- a/generic-database-plugin/pom.xml +++ b/generic-database-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Generic database plugin diff --git a/generic-db-argument-setter/pom.xml b/generic-db-argument-setter/pom.xml index d8a78cd4d..70c4414dc 100644 --- a/generic-db-argument-setter/pom.xml +++ b/generic-db-argument-setter/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Generic database argument setter plugin diff --git a/mariadb-plugin/docs/Mariadb-batchsink.md b/mariadb-plugin/docs/Mariadb-batchsink.md index 11176c0db..e4541fe67 100644 --- a/mariadb-plugin/docs/Mariadb-batchsink.md +++ b/mariadb-plugin/docs/Mariadb-batchsink.md @@ -60,41 +60,39 @@ connections. Data Types Mapping ---------- - +--------------------------------+-----------------------+------------------------------------+ - | MariaDB Data Type | CDAP Schema Data Type | Comment | - +--------------------------------+-----------------------+------------------------------------+ - | TINYINT | int | | - | BOOLEAN, BOOL | boolean | | - | SMALLINT | int | | - | MEDIUMINT | int | | - | INT, INTEGER | int | | - | BIGINT | long | | - | DECIMAL, DEC, NUMERIC, FIXED | decimal | | - | FLOAT | float | | - | DOUBLE, DOUBLE PRECISION, REAL | decimal | | - | BIT | boolean | | - | CHAR | string | | - | VARCHAR | string | | - | BINARY | bytes | | - | CHAR BYTE | bytes | | - | VARBINARY | bytes | | - | TINYBLOB | bytes | | - | BLOB | bytes | | - | MEDIUMBLOB | bytes | | - | LONGBLOB | bytes | | - | TINYTEXT | string | | - | TEXT | string | | - | MEDIUMTEXT | string | | - | LONGTEXT | string | | - | JSON | string | In MariaDB it is alias to LONGTEXT | - | ENUM | string | Mapping to String by default | - | SET | string | | - | DATE | date | | - | TIME | time_micros | | - | DATETIME | timestamp_micros | | - | TIMESTAMP | timestamp_micros | | - | YEAR | date | | - +--------------------------------+-----------------------+------------------------------------+ + | MariaDB Data Type | CDAP Schema Data Type | Comment | + |--------------------------------|-----------------------|---------------------------------------------------------| + | TINYINT | int | | + | BOOLEAN, BOOL | boolean | | + | SMALLINT | int | | + | MEDIUMINT | int | | + | INT, INTEGER | int | | + | BIGINT | long | | + | DECIMAL, DEC, NUMERIC, FIXED | decimal | | + | FLOAT | float | | + | DOUBLE, DOUBLE PRECISION, REAL | decimal | | + | BIT | boolean | | + | CHAR | string | | + | VARCHAR | string | | + | BINARY | bytes | | + | CHAR BYTE | bytes | | + | VARBINARY | bytes | | + | TINYBLOB | bytes | | + | BLOB | bytes | | + | MEDIUMBLOB | bytes | | + | LONGBLOB | bytes | | + | TINYTEXT | string | | + | TEXT | string | | + | MEDIUMTEXT | string | | + | LONGTEXT | string | | + | JSON | string | In MariaDB it is alias to LONGTEXT | + | ENUM | string | Mapping to String by default | + | SET | string | | + | DATE | date | | + | TIME | time_micros | | + | DATETIME | timestamp_micros | | + | TIMESTAMP | timestamp_micros | | + | YEAR | int | Users can manually set output schema to map it to Date. | Example ------- diff --git a/mariadb-plugin/docs/Mariadb-batchsource.md b/mariadb-plugin/docs/Mariadb-batchsource.md index 2b1fe3944..713af2ee8 100644 --- a/mariadb-plugin/docs/Mariadb-batchsource.md +++ b/mariadb-plugin/docs/Mariadb-batchsource.md @@ -78,43 +78,39 @@ with the tradeoff of higher memory usage. Data Types Mapping ---------- - - +--------------------------------+-----------------------+------------------------------------+ - | MariaDB Data Type | CDAP Schema Data Type | Comment | - +--------------------------------+-----------------------+------------------------------------+ - | TINYINT | int | | - | BOOLEAN, BOOL | boolean | | - | SMALLINT | int | | - | MEDIUMINT | int | | - | INT, INTEGER | int | | - | BIGINT | long | | - | DECIMAL, DEC, NUMERIC, FIXED | decimal | | - | FLOAT | float | | - | DOUBLE, DOUBLE PRECISION, REAL | decimal | | - | BIT | boolean | | - | CHAR | string | | - | VARCHAR | string | | - | BINARY | bytes | | - | CHAR BYTE | bytes | | - | VARBINARY | bytes | | - | TINYBLOB | bytes | | - | BLOB | bytes | | - | MEDIUMBLOB | bytes | | - | LONGBLOB | bytes | | - | TINYTEXT | string | | - | TEXT | string | | - | MEDIUMTEXT | string | | - | LONGTEXT | string | | - | JSON | string | In MariaDB it is alias to LONGTEXT | - | ENUM | string | Mapping to String by default | - | SET | string | | - | DATE | date | | - | TIME | time_micros | | - | DATETIME | timestamp_micros | | - | TIMESTAMP | timestamp_micros | | - | YEAR | date | | - +--------------------------------+-----------------------+------------------------------------+ - + | MariaDB Data Type | CDAP Schema Data Type | Comment | + |--------------------------------|-----------------------|---------------------------------------------------------| + | TINYINT | int | | + | BOOLEAN, BOOL | boolean | | + | SMALLINT | int | | + | MEDIUMINT | int | | + | INT, INTEGER | int | | + | BIGINT | long | | + | DECIMAL, DEC, NUMERIC, FIXED | decimal | | + | FLOAT | float | | + | DOUBLE, DOUBLE PRECISION, REAL | decimal | | + | BIT | boolean | | + | CHAR | string | | + | VARCHAR | string | | + | BINARY | bytes | | + | CHAR BYTE | bytes | | + | VARBINARY | bytes | | + | TINYBLOB | bytes | | + | BLOB | bytes | | + | MEDIUMBLOB | bytes | | + | LONGBLOB | bytes | | + | TINYTEXT | string | | + | TEXT | string | | + | MEDIUMTEXT | string | | + | LONGTEXT | string | | + | JSON | string | In MariaDB it is alias to LONGTEXT | + | ENUM | string | Mapping to String by default | + | SET | string | | + | DATE | date | | + | TIME | time_micros | | + | DATETIME | timestamp_micros | | + | TIMESTAMP | timestamp_micros | | + | YEAR | int | Users can manually set output schema to map it to Date. | Example ------ diff --git a/mariadb-plugin/pom.xml b/mariadb-plugin/pom.xml index 7ece99f31..682cc153f 100644 --- a/mariadb-plugin/pom.xml +++ b/mariadb-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Maria DB plugin @@ -83,6 +83,11 @@ RELEASE compile + + io.cdap.plugin + mysql-plugin + ${project.version} + diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java new file mode 100644 index 000000000..94498c787 --- /dev/null +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mariadb; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.plugin.db.ColumnType; +import io.cdap.plugin.mysql.MysqlDBRecord; +import java.util.List; + +/** + * Writable class for MariaDB Source/Sink. + */ +public class MariadbDBRecord extends MysqlDBRecord { + + /** + * Used in map-reduce. Do not remove. + */ + @SuppressWarnings("unused") + public MariadbDBRecord() { + // Required by Hadoop DBRecordReader to create an instance + } + + public MariadbDBRecord(StructuredRecord record, List columnTypes) { + super(record, columnTypes); + } +} diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbErrorDetailsProvider.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbErrorDetailsProvider.java new file mode 100644 index 000000000..38405225d --- /dev/null +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbErrorDetailsProvider.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mariadb; + + +import io.cdap.plugin.mysql.MysqlErrorDetailsProvider; +import io.cdap.plugin.util.DBUtils; + +/** + * A custom ErrorDetailsProvider for MariaDb plugins. + */ +public class MariadbErrorDetailsProvider extends MysqlErrorDetailsProvider { + + @Override + protected String getExternalDocumentationLink() { + return DBUtils.MARIADB_SUPPORTED_DOC_URL; + } + +} diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java new file mode 100644 index 000000000..71ccb0d06 --- /dev/null +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java @@ -0,0 +1,25 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mariadb; + +import io.cdap.plugin.mysql.MysqlFieldsValidator; + +/** + * Field validator for maraidb + */ +public class MariadbFieldsValidator extends MysqlFieldsValidator { +} diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java new file mode 100644 index 000000000..37ac12a93 --- /dev/null +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mariadb; + + +import io.cdap.plugin.mysql.MysqlSchemaReader; +import java.util.Map; + +/** + * Schema reader for mapping Maria DB type + */ +public class MariadbSchemaReader extends MysqlSchemaReader { + + public MariadbSchemaReader (String sessionID) { + super(sessionID); + } + + public MariadbSchemaReader (String sessionID, Map connectionArguments) { + super(sessionID, connectionArguments); + } + +} diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java index ab20f3c5d..52a73344a 100644 --- a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java @@ -19,9 +19,15 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.config.DBSpecificSinkConfig; import io.cdap.plugin.db.sink.AbstractDBSink; +import io.cdap.plugin.db.sink.FieldsValidator; +import io.cdap.plugin.mysql.MysqlFieldsValidator; +import io.cdap.plugin.util.DBUtils; import java.util.Map; import javax.annotation.Nullable; @@ -45,6 +51,32 @@ public MariadbSink(MariadbSinkConfig mariadbSinkConfig) { this.mariadbSinkConfig = mariadbSinkConfig; } + @Override + protected DBRecord getDBRecord(StructuredRecord output) { + return new MariadbDBRecord(output, columnTypes); + } + + @Override + protected SchemaReader getSchemaReader() { + return new MariadbSchemaReader(null); + } + + + @Override + protected String getErrorDetailsProviderClassName() { + return MariadbErrorDetailsProvider.class.getName(); + } + + @Override + protected String getExternalDocumentationLink() { + return DBUtils.MARIADB_SUPPORTED_DOC_URL; + } + + @Override + protected FieldsValidator getFieldsValidator() { + return new MariadbFieldsValidator(); + } + /** * MariaDB Sink Config. */ diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java index d5ffcb290..28204100c 100644 --- a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java +++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java @@ -19,10 +19,19 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.plugin.common.Asset; +import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.config.DBSpecificSourceConfig; import io.cdap.plugin.db.source.AbstractDBSource; +import io.cdap.plugin.util.DBUtils; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -53,10 +62,46 @@ protected String createConnectionString() { mariadbSourceConfig.host, mariadbSourceConfig.port, mariadbSourceConfig.database); } + @Override + protected Class getDBRecordType() { + return MariadbDBRecord.class; + } + + @Override + protected LineageRecorder getLineageRecorder(BatchSourceContext context) { + String fqn = DBUtils.constructFQN("mariadb", + mariadbSourceConfig.host, + mariadbSourceConfig.port, + mariadbSourceConfig.database, + mariadbSourceConfig.getReferenceName()); + Asset asset = Asset.builder(mariadbSourceConfig.getReferenceName()).setFqn(fqn).build(); + return new LineageRecorder(context, asset); + } + + @Override + protected SchemaReader getSchemaReader() { + return new MariadbSchemaReader(null, mariadbSourceConfig.getConnectionArguments()); + } + + @Override + protected String getErrorDetailsProviderClassName() { + return MariadbErrorDetailsProvider.class.getName(); + } + + @Override + protected String getExternalDocumentationLink() { + return DBUtils.MARIADB_SUPPORTED_DOC_URL; + } + /** * MaraiDB source mariadbSourceConfig. */ public static class MariadbSourceConfig extends DBSpecificSourceConfig { + private static final String JDBC_PROPERTY_CONNECT_TIMEOUT = "connectTimeout"; + private static final String JDBC_PROPERTY_SOCKET_TIMEOUT = "socketTimeout"; + private static final String JDBC_REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements"; + + private static final String MARIADB_TINYINT1_IS_BIT = "tinyInt1isBit"; @Name(MariadbConstants.AUTO_RECONNECT) @Description("Should the driver try to re-establish stale and/or dead connections") @@ -116,5 +161,43 @@ public Map getDBSpecificArguments() { public List getInitQueries() { return MariadbUtil.composeDbInitQueries(useAnsiQuotes); } + + @Override + public Map getConnectionArguments() { + Map arguments = new HashMap<>(super.getConnectionArguments()); + // the unit below is millisecond + arguments.putIfAbsent(JDBC_PROPERTY_CONNECT_TIMEOUT, "20000"); + arguments.putIfAbsent(JDBC_PROPERTY_SOCKET_TIMEOUT, "20000"); + arguments.putIfAbsent(JDBC_REWRITE_BATCHED_STATEMENTS, "true"); + // MariaDB property to ensure that TINYINT(1) type data is not converted to MariaDB Bit/Boolean type in the + // ResultSet. + arguments.putIfAbsent(MARIADB_TINYINT1_IS_BIT, "false"); + return arguments; + } + + @Override + protected void validateField(FailureCollector collector, + Schema.Field field, + Schema actualFieldSchema, + Schema expectedFieldSchema) { + // Backward compatibility changes to support MySQL YEAR to Date type conversion + if (Schema.LogicalType.DATE.equals(expectedFieldSchema.getLogicalType()) + && Schema.Type.INT.equals(actualFieldSchema.getType())) { + return; + } + + // Backward compatibility change to support MySQL MEDIUMINT UNSIGNED to Long type conversion + if (Schema.Type.LONG.equals(expectedFieldSchema.getType()) + && Schema.Type.INT.equals(actualFieldSchema.getType())) { + return; + } + + // Backward compatibility change to support MySQL TINYINT(1) to Bool type conversion + if (Schema.Type.BOOLEAN.equals(expectedFieldSchema.getType()) + && Schema.Type.INT.equals(actualFieldSchema.getType())) { + return; + } + super.validateField(collector, field, actualFieldSchema, expectedFieldSchema); + } } } diff --git a/memsql-plugin/pom.xml b/memsql-plugin/pom.xml index c9dbaf035..f3ae24c38 100644 --- a/memsql-plugin/pom.xml +++ b/memsql-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Memsql plugin diff --git a/mssql-plugin/docs/SQL Server-connector.md b/mssql-plugin/docs/SQL Server-connector.md index cb72161f5..6f0038715 100644 --- a/mssql-plugin/docs/SQL Server-connector.md +++ b/mssql-plugin/docs/SQL Server-connector.md @@ -22,6 +22,14 @@ authentication. Optional for databases that do not require authentication. **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the database connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible. + +For more details on the Transaction Isolation Levels supported in SQL Server, refer to the [SQL Server documentation](https://learn.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver16) + **Authentication Type:** Indicates which authentication method will be used for the connection. Use 'SQL Login'. to connect to a SQL Server using username and password properties. Use 'Active Directory Password' to connect to an Azure SQL Database/Data Warehouse using an Azure AD principal name and password. diff --git a/mssql-plugin/docs/SqlServer-batchsink.md b/mssql-plugin/docs/SqlServer-batchsink.md index 5d10b4bb6..b4ca1cbc5 100644 --- a/mssql-plugin/docs/SqlServer-batchsink.md +++ b/mssql-plugin/docs/SqlServer-batchsink.md @@ -46,6 +46,14 @@ an Azure SQL Database/Data Warehouse using an Azure AD principal name and passwo **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the database connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible. + +For more details on the Transaction Isolation Levels supported in SQL Server, refer to the [SQL Server documentation](https://learn.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver16) + **Instance Name:** SQL Server instance name to connect to. When it is not specified, a connection is made to the default instance. For the case where both the instanceName and port are specified, see the notes for port. If you specify a Virtual Network Name in the Server connection property, you cannot diff --git a/mssql-plugin/docs/SqlServer-batchsource.md b/mssql-plugin/docs/SqlServer-batchsource.md index c8e30f77e..5c917621c 100644 --- a/mssql-plugin/docs/SqlServer-batchsource.md +++ b/mssql-plugin/docs/SqlServer-batchsource.md @@ -56,6 +56,14 @@ an Azure SQL Database/Data Warehouse using an Azure AD principal name and passwo **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the database connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible. + +For more details on the Transaction Isolation Levels supported in SQL Server, refer to the [SQL Server documentation](https://learn.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver16) + **Instance Name:** SQL Server instance name to connect to. When it is not specified, a connection is made to the default instance. For the case where both the instanceName and port are specified, see the notes for port. If you specify a Virtual Network Name in the Server connection property, you cannot diff --git a/mssql-plugin/pom.xml b/mssql-plugin/pom.xml index f5fc81a93..376a6bc3f 100644 --- a/mssql-plugin/pom.xml +++ b/mssql-plugin/pom.xml @@ -20,17 +20,51 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Microsoft SQL Server plugin mssql-plugin 4.0.0 + Microsoft SQL Server plugin database plugins + https://github.com/data-integrations/database-plugins + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + CDAP + cdap-dev@googlegroups.com + CDAP + http://cdap.io + + + + + scm:git:https://github.com/cdapio/hydrator-plugins.git + scm:git:git@github.com:cdapio/hydrator-plugins.git + https://github.com/cdapio/hydrator-plugins.git + HEAD + io.cdap.cdap cdap-etl-api + ${cdap.version} + + + io.cdap.cdap + cdap-api + ${cdap.version} + provided io.cdap.plugin @@ -40,10 +74,12 @@ io.cdap.plugin hydrator-common + ${cdap.plugin.version} com.google.guava guava + ${guava.version} @@ -57,18 +93,26 @@ io.cdap.cdap hydrator-test + ${cdap.version} + test io.cdap.cdap cdap-data-pipeline3_2.12 + ${cdap.version} + test junit junit + ${junit.version} + test org.mockito mockito-core + ${mockito.version} + test com.microsoft.sqlserver @@ -76,11 +120,6 @@ 8.2.1.jre8 test - - io.cdap.cdap - cdap-api - provided - org.jetbrains annotations diff --git a/mssql-plugin/src/e2e-test/resources/errorMessage.properties b/mssql-plugin/src/e2e-test/resources/errorMessage.properties index 00721f148..c752d6ec1 100644 --- a/mssql-plugin/src/e2e-test/resources/errorMessage.properties +++ b/mssql-plugin/src/e2e-test/resources/errorMessage.properties @@ -13,11 +13,11 @@ errorMessagenumofSplit=Split-By Field Name must be specified if Number of Splits errorMessageInvalidSinkDatabase=Exception while trying to validate schema of database table errorMessageInvalidSinkTableName=Table 'Table123@' does not exist. errormessageBlankHost=Exception while trying to validate schema of database table -errorMessageInvalidTableName=Spark program 'phase-1' failed with error: Errors were encountered during validation. \ - Table 'Table123@' does not exist.. Please check the system logs for more details. +errorMessageInvalidTableName=Spark program 'phase-1' failed with error: Stage 'SQL Server2' encountered : io.cdap.cdap.etl.api.validation.ValidationException: \ + Errors were encountered during validation. Table 'Table123@' does not exist.. Please check the system logs for more details. errorMessageInvalidCredentials=Spark program 'phase-1' failed with error: Unable to create config for batchsink SqlServer \ 'connection' is invalid: Failed to assign value -errorMessageInvalidsourcetable=Spark program 'phase-1' failed with error: Incorrect syntax near the keyword 'table'.. \ - Please check the system logs for more details. -errorMessageInvalidCredentialSource=Spark program 'phase-1' failed with error: Plugin with id SQL \ - Server:source.jdbc.sqlserver does not exist in program phase-1 of application +errorMessageInvalidsourcetable=Spark program 'phase-1' failed with error: Stage 'SQL Server' encountered : io.cdap.cdap.api.exception.ProgramFailureException: \ + Error occurred while trying to get schema from database.Error message: 'Incorrect syntax near the keyword 'table'.'. +errorMessageInvalidCredentialSource=Spark program 'phase-1' failed with error: Stage 'SQL Server' encountered : java.lang.IllegalArgumentException: \ + Plugin with id SQL Server:source.jdbc.sqlserver does not exist in program phase-1 of application diff --git a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerErrorDetailsProvider.java b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerErrorDetailsProvider.java new file mode 100644 index 000000000..90d1ce7b7 --- /dev/null +++ b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerErrorDetailsProvider.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mssql; + +import io.cdap.plugin.common.db.DBErrorDetailsProvider; +import io.cdap.plugin.util.DBUtils; + +/** + * A custom ErrorDetailsProvider for SQL Server plugins. + */ +public class SqlServerErrorDetailsProvider extends DBErrorDetailsProvider { + + @Override + protected String getExternalDocumentationLink() { + return DBUtils.MSSQL_SUPPORTED_DOC_URL; + } +} diff --git a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSink.java b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSink.java index 0fa8991c5..dc442d200 100644 --- a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSink.java +++ b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSink.java @@ -88,6 +88,16 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) { return new LineageRecorder(context, asset); } + @Override + protected String getErrorDetailsProviderClassName() { + return SqlServerErrorDetailsProvider.class.getName(); + } + + @Override + protected String getExternalDocumentationLink() { + return DBUtils.MSSQL_SUPPORTED_DOC_URL; + } + /** * MSSQL action configuration. */ @@ -167,6 +177,11 @@ public Map getDBSpecificArguments() { packetSize, queryTimeout); } + @Override + public String getTransactionIsolationLevel() { + return connection.getTransactionIsolationLevel(); + } + @Override public String getConnectionString() { return String.format(SqlServerConstants.SQL_SERVER_CONNECTION_STRING_FORMAT, diff --git a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java index 9603b24db..004532064 100644 --- a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java +++ b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java @@ -75,6 +75,11 @@ protected Class getDBRecordType() { return SqlServerSourceDBRecord.class; } + @Override + protected String getErrorDetailsProviderClassName() { + return SqlServerErrorDetailsProvider.class.getName(); + } + @Override protected LineageRecorder getLineageRecorder(BatchSourceContext context) { String fqn = DBUtils.constructFQN("mssql", @@ -85,6 +90,11 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) { return new LineageRecorder(context, asset); } + @Override + protected String getExternalDocumentationLink() { + return DBUtils.MSSQL_SUPPORTED_DOC_URL; + } + /** * MSSQL source config. */ @@ -188,6 +198,11 @@ public List getInitQueries() { return Collections.emptyList(); } + @Override + public String getTransactionIsolationLevel() { + return connection.getTransactionIsolationLevel(); + } + @Override public void validate(FailureCollector collector) { ConfigUtil.validateConnection(this, useConnection, connection, collector); diff --git a/mssql-plugin/widgets/SQL Server-connector.json b/mssql-plugin/widgets/SQL Server-connector.json index 171076295..c326cd81d 100644 --- a/mssql-plugin/widgets/SQL Server-connector.json +++ b/mssql-plugin/widgets/SQL Server-connector.json @@ -64,6 +64,20 @@ "widget-type": "password", "label": "Password", "name": "password" + }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_UNCOMMITTED", + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } } ] }, diff --git a/mssql-plugin/widgets/SqlServer-batchsink.json b/mssql-plugin/widgets/SqlServer-batchsink.json index 260c66259..fb20cad9d 100644 --- a/mssql-plugin/widgets/SqlServer-batchsink.json +++ b/mssql-plugin/widgets/SqlServer-batchsink.json @@ -84,6 +84,20 @@ "label": "Password", "name": "password" }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_UNCOMMITTED", + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } + }, { "widget-type": "keyvalue", "label": "Connection Arguments", @@ -280,6 +294,10 @@ { "type": "property", "name": "connectionArguments" + }, + { + "type": "property", + "name": "transactionIsolationLevel" } ] }, diff --git a/mssql-plugin/widgets/SqlServer-batchsource.json b/mssql-plugin/widgets/SqlServer-batchsource.json index dad5f4708..b3494e485 100644 --- a/mssql-plugin/widgets/SqlServer-batchsource.json +++ b/mssql-plugin/widgets/SqlServer-batchsource.json @@ -84,6 +84,20 @@ "label": "Password", "name": "password" }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_UNCOMMITTED", + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } + }, { "widget-type": "keyvalue", "label": "Connection Arguments", @@ -316,6 +330,10 @@ { "type": "property", "name": "connectionArguments" + }, + { + "type": "property", + "name": "transactionIsolationLevel" } ] }, diff --git a/mysql-plugin/docs/MySQL-connector.md b/mysql-plugin/docs/MySQL-connector.md index fb5c1fbb8..f586084c1 100644 --- a/mysql-plugin/docs/MySQL-connector.md +++ b/mysql-plugin/docs/MySQL-connector.md @@ -22,6 +22,14 @@ authentication. Optional for databases that do not require authentication. **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the databse connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible. + +For more details on the Transaction Isolation Levels supported in MySQL, refer to the [MySQL documentation](https://dev.mysql.com/doc/refman/8.4/en/innodb-transaction-isolation-levels.html) + **Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments will be passed to the JDBC driver, as connection arguments, for JDBC drivers that may need additional configurations. This is a semicolon-separated list of key-value pairs, where each pair is separated by a equals '=' and specifies diff --git a/mysql-plugin/docs/Mysql-batchsink.md b/mysql-plugin/docs/Mysql-batchsink.md index b28a28618..46a763f9d 100644 --- a/mysql-plugin/docs/Mysql-batchsink.md +++ b/mysql-plugin/docs/Mysql-batchsink.md @@ -39,6 +39,14 @@ You also can use the macro function ${conn(connection-name)}. **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the databse connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible. + +For more details on the Transaction Isolation Levels supported in MySQL, refer to the [MySQL documentation](https://dev.mysql.com/doc/refman/8.4/en/innodb-transaction-isolation-levels.html) + **Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. diff --git a/mysql-plugin/docs/Mysql-batchsource.md b/mysql-plugin/docs/Mysql-batchsource.md index 010e08216..552bb5504 100644 --- a/mysql-plugin/docs/Mysql-batchsource.md +++ b/mysql-plugin/docs/Mysql-batchsource.md @@ -49,6 +49,14 @@ For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is s **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the database connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible. + +For more details on the Transaction Isolation Levels supported in MySQL, refer to the [MySQL documentation](https://dev.mysql.com/doc/refman/8.4/en/innodb-transaction-isolation-levels.html) + **Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. diff --git a/mysql-plugin/pom.xml b/mysql-plugin/pom.xml index bb5caf4c0..d2ea17f41 100644 --- a/mysql-plugin/pom.xml +++ b/mysql-plugin/pom.xml @@ -20,17 +20,51 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Mysql plugin mysql-plugin 4.0.0 + Mysql database plugins + https://github.com/data-integrations/database-plugins + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + CDAP + cdap-dev@googlegroups.com + CDAP + http://cdap.io + + + + + scm:git:https://github.com/cdapio/hydrator-plugins.git + scm:git:git@github.com:cdapio/hydrator-plugins.git + https://github.com/cdapio/hydrator-plugins.git + HEAD + io.cdap.cdap cdap-etl-api + ${cdap.version} + + + io.cdap.cdap + cdap-api + ${cdap.version} + provided io.cdap.plugin @@ -40,10 +74,12 @@ io.cdap.plugin hydrator-common + ${cdap.plugin.version} com.google.guava guava + ${guava.version} @@ -57,23 +93,26 @@ io.cdap.cdap hydrator-test + ${cdap.version} + test io.cdap.cdap cdap-data-pipeline3_2.12 + ${cdap.version} + test junit junit - - - io.cdap.cdap - cdap-api - provided + ${junit.version} + test org.mockito mockito-core + ${mockito.version} + test mysql diff --git a/mysql-plugin/src/e2e-test/features/mysqlsource/RunTime.feature b/mysql-plugin/src/e2e-test/features/mysqlsource/RunTime.feature index 1ad2f8cc1..0ea426da0 100644 --- a/mysql-plugin/src/e2e-test/features/mysqlsource/RunTime.feature +++ b/mysql-plugin/src/e2e-test/features/mysqlsource/RunTime.feature @@ -142,7 +142,7 @@ Feature: MySQL Source - Run time scenarios Then Close the Plugin Properties page Then Save the pipeline Then Preview and run the pipeline - Then Wait till pipeline preview is in running state + Then Wait till pipeline preview is in running state and check if any error occurs Then Open and capture pipeline preview logs Then Verify the preview run status of pipeline in the logs is "failed" diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlErrorDetailsProvider.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlErrorDetailsProvider.java index 251f0fc74..ca9a2b928 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlErrorDetailsProvider.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlErrorDetailsProvider.java @@ -17,7 +17,7 @@ package io.cdap.plugin.mysql; import io.cdap.cdap.api.exception.ErrorType; -import io.cdap.plugin.db.DBErrorDetailsProvider; +import io.cdap.plugin.common.db.DBErrorDetailsProvider; import io.cdap.plugin.util.DBUtils; /** @@ -31,7 +31,7 @@ protected String getExternalDocumentationLink() { } @Override - protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) { + protected ErrorType getErrorTypeFromErrorCodeAndSqlState(int errorCode, String sqlState) { // https://dev.mysql.com/doc/refman/9.0/en/error-message-elements.html#error-code-ranges if (errorCode >= 1000 && errorCode <= 5999) { return ErrorType.USER; diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java index 42488b31e..0a9257a0a 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java @@ -114,6 +114,11 @@ protected String getErrorDetailsProviderClassName() { return MysqlErrorDetailsProvider.class.getName(); } + @Override + protected String getExternalDocumentationLink() { + return DBUtils.MYSQL_SUPPORTED_DOC_URL; + } + /** * MySQL action configuration. */ @@ -189,6 +194,11 @@ public Map getDBSpecificArguments() { trustCertificateKeyStorePassword, false); } + @Override + public String getTransactionIsolationLevel() { + return connection.getTransactionIsolationLevel(); + } + @Override public MysqlConnectorConfig getConnection() { return connection; diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java index 971b76809..38642468c 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java @@ -197,6 +197,11 @@ public MysqlConnectorConfig getConnection() { return connection; } + @Override + public String getTransactionIsolationLevel() { + return connection.getTransactionIsolationLevel(); + } + @Override public void validate(FailureCollector collector) { ConfigUtil.validateConnection(this, useConnection, connection, collector); diff --git a/mysql-plugin/widgets/MySQL-connector.json b/mysql-plugin/widgets/MySQL-connector.json index 9064d1bf6..f60f5526f 100644 --- a/mysql-plugin/widgets/MySQL-connector.json +++ b/mysql-plugin/widgets/MySQL-connector.json @@ -30,6 +30,20 @@ "widget-attributes": { "default": "3306" } + }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_UNCOMMITTED", + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } } ] }, diff --git a/mysql-plugin/widgets/Mysql-batchsink.json b/mysql-plugin/widgets/Mysql-batchsink.json index c525ead40..58596aae2 100644 --- a/mysql-plugin/widgets/Mysql-batchsink.json +++ b/mysql-plugin/widgets/Mysql-batchsink.json @@ -65,6 +65,20 @@ "label": "Password", "name": "password" }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_UNCOMMITTED", + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } + }, { "widget-type": "keyvalue", "label": "Connection Arguments", @@ -225,6 +239,10 @@ "type": "property", "name": "password" }, + { + "type": "property", + "name": "transactionIsolationLevel" + }, { "type": "property", "name": "host" diff --git a/mysql-plugin/widgets/Mysql-batchsource.json b/mysql-plugin/widgets/Mysql-batchsource.json index 9175bd5ed..506e837f7 100644 --- a/mysql-plugin/widgets/Mysql-batchsource.json +++ b/mysql-plugin/widgets/Mysql-batchsource.json @@ -65,6 +65,20 @@ "label": "Password", "name": "password" }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_UNCOMMITTED", + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } + }, { "widget-type": "keyvalue", "label": "Connection Arguments", @@ -277,6 +291,10 @@ "type": "property", "name": "password" }, + { + "type": "property", + "name": "transactionIsolationLevel" + }, { "type": "property", "name": "host" diff --git a/netezza-plugin/pom.xml b/netezza-plugin/pom.xml index f7b559439..f141c7371 100644 --- a/netezza-plugin/pom.xml +++ b/netezza-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Netezza plugin diff --git a/oracle-plugin/pom.xml b/oracle-plugin/pom.xml index b21152fcd..30e4b5911 100644 --- a/oracle-plugin/pom.xml +++ b/oracle-plugin/pom.xml @@ -20,17 +20,51 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 Oracle plugin oracle-plugin 4.0.0 + Oracle database plugins + https://github.com/data-integrations/database-plugins + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + CDAP + cdap-dev@googlegroups.com + CDAP + http://cdap.io + + + + + scm:git:https://github.com/cdapio/hydrator-plugins.git + scm:git:git@github.com:cdapio/hydrator-plugins.git + https://github.com/cdapio/hydrator-plugins.git + HEAD + io.cdap.cdap cdap-etl-api + ${cdap.version} + + + io.cdap.cdap + cdap-api + ${cdap.version} + provided io.cdap.plugin @@ -40,10 +74,12 @@ io.cdap.plugin hydrator-common + ${cdap.plugin.version} com.google.guava guava + ${guava.version} @@ -57,18 +93,25 @@ io.cdap.cdap hydrator-test + ${cdap.version} + test io.cdap.cdap cdap-data-pipeline3_2.12 + ${cdap.version} + test junit junit + ${junit.version} + test org.hsqldb hsqldb + ${hsql.version} test @@ -80,11 +123,8 @@ org.mockito mockito-core - - - io.cdap.cdap - cdap-api - provided + ${mockito.version} + test org.glassfish diff --git a/oracle-plugin/src/e2e-test/features/source/OracleRunTime.feature b/oracle-plugin/src/e2e-test/features/source/OracleRunTime.feature index 2d1ca9ad1..d6ad85cd4 100644 --- a/oracle-plugin/src/e2e-test/features/source/OracleRunTime.feature +++ b/oracle-plugin/src/e2e-test/features/source/OracleRunTime.feature @@ -296,7 +296,7 @@ Feature: Oracle - Verify data transfer from Oracle source to BigQuery sink Then Close the Plugin Properties page Then Save the pipeline Then Preview and run the pipeline - Then Wait till pipeline preview is in running state + Then Wait till pipeline preview is in running state and check if any error occurs Then Verify the preview run status of pipeline in the logs is "failed" @ORACLE_SOURCE_TEST @BQ_SINK_TEST @@ -338,7 +338,9 @@ Feature: Oracle - Verify data transfer from Oracle source to BigQuery sink And Save and Deploy Pipeline And Run the Pipeline in Runtime And Wait till pipeline is in running state + And Open and capture logs And Verify the pipeline status is "Failed" + And Close the pipeline logs Then Open Pipeline logs and verify Log entries having below listed Level and Message: | Level | Message | | ERROR | errorLogsMessageInvalidBoundingQuery | diff --git a/oracle-plugin/src/e2e-test/resources/errorMessage.properties b/oracle-plugin/src/e2e-test/resources/errorMessage.properties index b981faf81..e44f4c00a 100644 --- a/oracle-plugin/src/e2e-test/resources/errorMessage.properties +++ b/oracle-plugin/src/e2e-test/resources/errorMessage.properties @@ -15,5 +15,7 @@ errorMessageBlankUsername=Username is required when password is given. errorMessageInvalidTableName=Exception while trying to validate schema of database table '"table"' for connection errorMessageInvalidSinkDatabase=Exception while trying to validate schema of database table '"TARGETTABLE_ errorMessageInvalidHost=Exception while trying to validate schema of database table '"table"' for connection -errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: ORA-00936: missing expression . \ - Please check the system logs for more details. +errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'Oracle' encountered : \ + java.io.IOException: ORA-00936: missing expression . Please check the system logs for more details. +blank.database.message=Required property 'database' has no value. +blank.connection.message=Exception while trying to validate schema of database table diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java index 3d2f7399a..16371d5c1 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java @@ -112,7 +112,8 @@ protected DBConnectorPath getDBConnectorPath(String path) { @Override protected SchemaReader getSchemaReader(String sessionID) { - return new OracleSourceSchemaReader(sessionID); + return new OracleSourceSchemaReader(sessionID, config.getTreatAsOldTimestamp(), + config.getTreatPrecisionlessNumAsDeci()); } @Override diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java index 10022364a..cbc1e5ed2 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java @@ -22,8 +22,6 @@ import io.cdap.plugin.db.TransactionIsolationLevel; import io.cdap.plugin.db.connector.AbstractDBSpecificConnectorConfig; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; import javax.annotation.Nullable; @@ -43,12 +41,14 @@ public OracleConnectorConfig(String host, int port, String user, String password public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName, String connectionArguments, String connectionType, String database) { - this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null); + this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null, null, + null); } public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName, String connectionArguments, String connectionType, String database, - String role, Boolean useSSL) { + String role, Boolean useSSL, @Nullable Boolean treatAsOldTimestamp, + @Nullable Boolean treatPrecisionlessNumAsDeci) { this.host = host; this.port = port; @@ -60,6 +60,8 @@ public OracleConnectorConfig(String host, int port, String user, String password this.database = database; this.role = role; this.useSSL = useSSL; + this.treatAsOldTimestamp = treatAsOldTimestamp; + this.treatPrecisionlessNumAsDeci = treatPrecisionlessNumAsDeci; } @Override @@ -81,17 +83,21 @@ public String getConnectionString() { @Macro private String database; - @Name(OracleConstants.TRANSACTION_ISOLATION_LEVEL) - @Description("The transaction isolation level for the database session.") - @Macro - @Nullable - private String transactionIsolationLevel; - @Name(OracleConstants.USE_SSL) @Description("Turns on SSL encryption. Connection will fail if SSL is not available") @Nullable public Boolean useSSL; + @Name(OracleConstants.TREAT_AS_OLD_TIMESTAMP) + @Description("A hidden field to handle timestamp as CDAP's timestamp micros or string as per old behavior.") + @Nullable + public Boolean treatAsOldTimestamp; + + @Name(OracleConstants.TREAT_PRECISIONLESSNUM_AS_DECI) + @Description("A hidden field to handle precision less number as CDAP's decimal per old behavior.") + @Nullable + public Boolean treatPrecisionlessNumAsDeci; + @Override protected int getDefaultPort() { return 1521; @@ -114,6 +120,14 @@ public Boolean getSSlMode() { return useSSL != null && useSSL; } + public Boolean getTreatAsOldTimestamp() { + return Boolean.TRUE.equals(treatAsOldTimestamp); + } + + public Boolean getTreatPrecisionlessNumAsDeci() { + return Boolean.TRUE.equals(treatPrecisionlessNumAsDeci); + } + @Override public Properties getConnectionArgumentsProperties() { Properties prop = super.getConnectionArgumentsProperties(); @@ -124,6 +138,7 @@ public Properties getConnectionArgumentsProperties() { return prop; } + @Override public String getTransactionIsolationLevel() { //if null default to the highest isolation level possible if (transactionIsolationLevel == null) { @@ -133,16 +148,7 @@ public String getTransactionIsolationLevel() { //This ensures that the role is mapped to the right serialization level, even w/ incorrect user input //if role is SYSDBA or SYSOP it will map to read_committed. else serialized return (!getRole().equals(ROLE_NORMAL)) ? TransactionIsolationLevel.Level.TRANSACTION_READ_COMMITTED.name() : - TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name(); - } - - @Override - public Map getAdditionalArguments() { - Map additonalArguments = new HashMap<>(); - if (getTransactionIsolationLevel() != null) { - additonalArguments.put(TransactionIsolationLevel.CONF_KEY, getTransactionIsolationLevel()); - } - return additonalArguments; + TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name(); } @Override diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java index dc38f80ac..cbd411175 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java @@ -43,6 +43,8 @@ private OracleConstants() { public static final String TNS_CONNECTION_TYPE = "tns"; public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel"; public static final String USE_SSL = "useSSL"; + public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp"; + public static final String TREAT_PRECISIONLESSNUM_AS_DECI = "treatPrecisionlessNumAsDeci"; /** * Constructs the Oracle connection string based on the provided connection type, host, port, and database. diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleErrorDetailsProvider.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleErrorDetailsProvider.java new file mode 100644 index 000000000..e0c770c9f --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleErrorDetailsProvider.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.plugin.common.db.DBErrorDetailsProvider; +import io.cdap.plugin.util.DBUtils; + +/** + * A custom ErrorDetailsProvider for Oracle plugin. + */ +public class OracleErrorDetailsProvider extends DBErrorDetailsProvider { + + @Override + protected String getExternalDocumentationLink() { + return DBUtils.ORACLE_SUPPORTED_DOC_URL; + } +} diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java index 45afbcb51..511281e9d 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java @@ -82,6 +82,15 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) { return new LineageRecorder(context, asset); } + @Override + protected String getErrorDetailsProviderClassName() { + return OracleErrorDetailsProvider.class.getName(); + } + + @Override + protected String getExternalDocumentationLink() { + return DBUtils.ORACLE_SUPPORTED_DOC_URL; + } /** * Oracle action configuration. diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 6df62e63e..1488a084b 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -63,7 +63,12 @@ protected String createConnectionString() { @Override protected SchemaReader getSchemaReader() { - return new OracleSourceSchemaReader(); + // PLUGIN-1893 : Based on field/properties from Oracle source and Oracle connection we will pass the flag to control + // handle schema to make it backward compatible. + boolean treatAsOldTimestamp = oracleSourceConfig.getConnection().getTreatAsOldTimestamp(); + boolean treatPrecisionlessNumAsDeci = oracleSourceConfig.getConnection().getTreatPrecisionlessNumAsDeci(); + + return new OracleSourceSchemaReader(null, treatAsOldTimestamp, treatPrecisionlessNumAsDeci); } @Override @@ -71,6 +76,16 @@ protected Class getDBRecordType() { return OracleSourceDBRecord.class; } + @Override + protected String getExternalDocumentationLink() { + return DBUtils.ORACLE_SUPPORTED_DOC_URL; + } + + @Override + protected String getErrorDetailsProviderClassName() { + return OracleErrorDetailsProvider.class.getName(); + } + @Override protected LineageRecorder getLineageRecorder(BatchSourceContext context) { String fqn = DBUtils.constructFQN("oracle", @@ -117,9 +132,11 @@ public OracleSourceConfig(String host, int port, String user, String password, S String connectionArguments, String connectionType, String database, String role, int defaultBatchValue, int defaultRowPrefetch, String importQuery, Integer numSplits, int fetchSize, - String boundingQuery, String splitBy, Boolean useSSL) { + String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp, + Boolean treatPrecisionlessNumAsDeci) { this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments, - connectionType, database, role, useSSL); + connectionType, database, role, useSSL, treatAsOldTimestamp, + treatPrecisionlessNumAsDeci); this.defaultBatchValue = defaultBatchValue; this.defaultRowPrefetch = defaultRowPrefetch; this.fetchSize = fetchSize; diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 7d35f9bc7..dd17d2e84 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import java.sql.Types; import java.util.Set; +import javax.annotation.Nullable; /** * Oracle Source schema reader. @@ -65,14 +66,17 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { ); private final String sessionID; + private final Boolean isTimestampOldBehavior; + private final Boolean isPrecisionlessNumAsDecimal; public OracleSourceSchemaReader() { - this(null); + this(null, false, false); } - - public OracleSourceSchemaReader(String sessionID) { - super(); + public OracleSourceSchemaReader(@Nullable String sessionID, boolean isTimestampOldBehavior, + boolean isPrecisionlessNumAsDecimal) { this.sessionID = sessionID; + this.isTimestampOldBehavior = isTimestampOldBehavior; + this.isPrecisionlessNumAsDecimal = isPrecisionlessNumAsDecimal; } @Override @@ -81,10 +85,12 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti switch (sqlType) { case TIMESTAMP_TZ: - return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); - case Types.TIMESTAMP: + return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); case TIMESTAMP_LTZ: - return Schema.of(Schema.LogicalType.DATETIME); + return isTimestampOldBehavior ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) + : Schema.of(Schema.LogicalType.DATETIME); + case Types.TIMESTAMP: + return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME); case BINARY_FLOAT: return Schema.of(Schema.Type.FLOAT); case BINARY_DOUBLE: @@ -107,12 +113,24 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti // For a Number type without specified precision and scale, precision will be 0 and scale will be -127 if (precision == 0) { // reference : https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832 - LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " - + "converting into STRING type to avoid any precision loss.", - metadata.getColumnName(index), - metadata.getColumnTypeName(index), - metadata.getColumnName(index))); - return Schema.of(Schema.Type.STRING); + if (isPrecisionlessNumAsDecimal) { + precision = 38; + scale = 0; + LOG.warn(String.format("%s type with undefined precision and scale is detected, " + + "there may be a precision loss while running the pipeline. " + + "Please define an output precision and scale for field '%s' to avoid " + + "precision loss.", + metadata.getColumnTypeName(index), + metadata.getColumnName(index))); + return Schema.decimalOf(precision, scale); + } else { + LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " + + "converting into STRING type to avoid any precision loss.", + metadata.getColumnName(index), + metadata.getColumnTypeName(index), + metadata.getColumnName(index))); + return Schema.of(Schema.Type.STRING); + } } return Schema.decimalOf(precision, scale); } diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json index 5eca20cc4..404262fb2 100644 --- a/oracle-plugin/widgets/Oracle-batchsource.json +++ b/oracle-plugin/widgets/Oracle-batchsource.json @@ -120,6 +120,44 @@ ] } }, + { + "widget-type": "hidden", + "label": "Treat as old timestamp", + "name": "treatAsOldTimestamp", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "true" + }, + { + "id": "false", + "label": "false" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Treat precision less number as Decimal(old behavior)", + "name": "treatPrecisionlessNumAsDeci", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "true" + }, + { + "id": "false", + "label": "false" + } + ] + } + }, { "name": "connectionType", "label": "Connection Type", @@ -326,6 +364,14 @@ { "type": "property", "name": "transactionIsolationLevel" + }, + { + "type": "property", + "name": "getTreatAsOldTimestampConn" + }, + { + "type": "property", + "name": "treatPrecisionlessNumAsDeci" } ] }, diff --git a/oracle-plugin/widgets/Oracle-connector.json b/oracle-plugin/widgets/Oracle-connector.json index 628027caf..013f3b240 100644 --- a/oracle-plugin/widgets/Oracle-connector.json +++ b/oracle-plugin/widgets/Oracle-connector.json @@ -129,6 +129,44 @@ } ] } + }, + { + "widget-type": "hidden", + "label": "Treat as old timestamp", + "name": "treatAsOldTimestamp", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "true" + }, + { + "id": "false", + "label": "false" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Treat precision less number as Decimal(old behavior)", + "name": "treatPrecisionlessNumAsDeci", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "true" + }, + { + "id": "false", + "label": "false" + } + ] + } } ] }, diff --git a/pom.xml b/pom.xml index fa4724a47..3a50542e0 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin database-plugins-parent - 1.12.0-SNAPSHOT + 1.12.3 pom Database Plugins Collection of database plugins @@ -56,13 +56,29 @@ + + + CDAP + cdap-dev@googlegroups.com + CDAP + http://cdap.io + + + + + scm:git:https://github.com/cdapio/hydrator-plugins.git + scm:git:git@github.com:cdapio/hydrator-plugins.git + https://github.com/cdapio/hydrator-plugins.git + HEAD + + 7 true UTF-8 - 6.11.0-SNAPSHOT - 2.11.1 + 6.11.0 + 2.13.1 13.0.1 3.3.6 2.2.4 @@ -78,23 +94,12 @@ - - sonatype - https://oss.sonatype.org/content/groups/public - sonatype-snapshots - https://oss.sonatype.org/content/repositories/snapshots + https://central.sonatype.com/repository/maven-snapshots - - - sonatype - https://oss.sonatype.org/content/groups/public/ - - - @@ -349,16 +354,6 @@ - - - sonatype.release - https://oss.sonatype.org/service/local/staging/deploy/maven2 - - - sonatype.snapshots - https://oss.sonatype.org/content/repositories/snapshots - - ${testSourceLocation} @@ -532,14 +527,14 @@ - org.sonatype.plugins - nexus-staging-maven-plugin - 1.6.2 + org.sonatype.central + central-publishing-maven-plugin + 0.8.0 true - https://oss.sonatype.org - sonatype.release - 655dc88dc770c3 + sonatype.release + false + true @@ -612,6 +607,7 @@ src/e2e-test/java TestRunner.java + 31.1-jre @@ -724,7 +720,7 @@ io.cdap.tests.e2e cdap-e2e-framework - 0.4.0-SNAPSHOT + 0.4.0 test @@ -737,7 +733,7 @@ ch.qos.logback logback-classic - 1.2.8 + 1.3.15 runtime diff --git a/postgresql-plugin/docs/PostgreSQL-connector.md b/postgresql-plugin/docs/PostgreSQL-connector.md index 739c678e3..fe442cbf1 100644 --- a/postgresql-plugin/docs/PostgreSQL-connector.md +++ b/postgresql-plugin/docs/PostgreSQL-connector.md @@ -22,6 +22,14 @@ authentication. Optional for databases that do not require authentication. **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the databse connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- Note: PostgreSQL does not implement `TRANSACTION_READ_UNCOMMITTED` as a distinct isolation level. Instead, this mode behaves identically to`TRANSACTION_READ_COMMITTED`, which is why it is not exposed as a separate option. + +For more details on the Transaction Isolation Levels supported in PostgreSQL, refer to the [PostgreSQL documentation](https://www.postgresql.org/docs/current/transaction-iso.html#TRANSACTION-ISO) + **Database:** The name of the database to connect to. **Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments diff --git a/postgresql-plugin/docs/Postgres-batchsink.md b/postgresql-plugin/docs/Postgres-batchsink.md index b8a996463..82065e0fd 100644 --- a/postgresql-plugin/docs/Postgres-batchsink.md +++ b/postgresql-plugin/docs/Postgres-batchsink.md @@ -39,6 +39,14 @@ You also can use the macro function ${conn(connection-name)}. **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the databse connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- Note: PostgreSQL does not implement `TRANSACTION_READ_UNCOMMITTED` as a distinct isolation level. Instead, this mode behaves identically to`TRANSACTION_READ_COMMITTED`, which is why it is not exposed as a separate option. + +For more details on the Transaction Isolation Levels supported in PostgreSQL, refer to the [PostgreSQL documentation](https://www.postgresql.org/docs/current/transaction-iso.html#TRANSACTION-ISO) + **Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. diff --git a/postgresql-plugin/docs/Postgres-batchsource.md b/postgresql-plugin/docs/Postgres-batchsource.md index af359022d..559723526 100644 --- a/postgresql-plugin/docs/Postgres-batchsource.md +++ b/postgresql-plugin/docs/Postgres-batchsource.md @@ -49,6 +49,14 @@ For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is s **Password:** Password to use to connect to the specified database. +**Transaction Isolation Level** The transaction isolation level of the databse connection +- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible. +- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented. +- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible. +- Note: PostgreSQL does not implement `TRANSACTION_READ_UNCOMMITTED` as a distinct isolation level. Instead, this mode behaves identically to`TRANSACTION_READ_COMMITTED`, which is why it is not exposed as a separate option. + +For more details on the Transaction Isolation Levels supported in PostgreSQL, refer to the [PostgreSQL documentation](https://www.postgresql.org/docs/current/transaction-iso.html#TRANSACTION-ISO) + **Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. diff --git a/postgresql-plugin/pom.xml b/postgresql-plugin/pom.xml index 8f086e482..2eeb641bf 100644 --- a/postgresql-plugin/pom.xml +++ b/postgresql-plugin/pom.xml @@ -20,17 +20,51 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 PostgreSQL plugin postgresql-plugin 4.0.0 + PostgreSQL database plugins + https://github.com/data-integrations/database-plugins + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + CDAP + cdap-dev@googlegroups.com + CDAP + http://cdap.io + + + + + scm:git:https://github.com/cdapio/hydrator-plugins.git + scm:git:git@github.com:cdapio/hydrator-plugins.git + https://github.com/cdapio/hydrator-plugins.git + HEAD + io.cdap.cdap cdap-etl-api + ${cdap.version} + + + io.cdap.cdap + cdap-api + ${cdap.version} + provided io.cdap.plugin @@ -40,10 +74,12 @@ io.cdap.plugin hydrator-common + ${cdap.plugin.version} com.google.guava guava + ${guava.version} @@ -63,15 +99,14 @@ io.cdap.cdap hydrator-test + ${cdap.version} + test io.cdap.cdap cdap-data-pipeline3_2.12 - - - io.cdap.cdap - cdap-api - provided + ${cdap.version} + test org.jetbrains @@ -82,11 +117,13 @@ org.mockito mockito-core + ${mockito.version} test junit junit + ${junit.version} test diff --git a/postgresql-plugin/src/e2e-test/resources/errorMessage.properties b/postgresql-plugin/src/e2e-test/resources/errorMessage.properties index 6e1929245..f793e3be7 100644 --- a/postgresql-plugin/src/e2e-test/resources/errorMessage.properties +++ b/postgresql-plugin/src/e2e-test/resources/errorMessage.properties @@ -18,5 +18,5 @@ errorMessageInvalidSourceHost=SQL error while getting query schema: The connecti errorMessageInvalidTableName=Table 'table' does not exist. Ensure table '"table"' is set correctly and that the errorMessageInvalidSinkDatabase=Exception while trying to validate schema of database table '"targettable_ errorMessageInvalidHost=Exception while trying to validate schema of database table '"table"' for connection -errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: The column index is out of range: 1, \ - number of columns: 0.. Please check the system logs for more details. +errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'PostgreSQL' encountered : \ + java.io.IOException: The column index is out of range: 1, number of columns: 0.. Please check the system logs for more details. diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresErrorDetailsProvider.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresErrorDetailsProvider.java index 3202a3e28..a7de4e5dc 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresErrorDetailsProvider.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresErrorDetailsProvider.java @@ -19,7 +19,7 @@ import com.google.common.base.Strings; import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.api.exception.ErrorType; -import io.cdap.plugin.db.DBErrorDetailsProvider; +import io.cdap.plugin.common.db.DBErrorDetailsProvider; import io.cdap.plugin.util.DBUtils; import java.util.HashMap; @@ -77,7 +77,7 @@ protected String getExternalDocumentationLink() { } @Override - protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) { + protected ErrorType getErrorTypeFromErrorCodeAndSqlState(int errorCode, String sqlState) { if (!Strings.isNullOrEmpty(sqlState) && sqlState.length() >= 2 && ERROR_CODE_TO_ERROR_TYPE.containsKey(sqlState.substring(0, 2))) { return ERROR_CODE_TO_ERROR_TYPE.get(sqlState.substring(0, 2)); diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java index 3becf5f27..73430c1e2 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java @@ -121,6 +121,11 @@ protected String getErrorDetailsProviderClassName() { return PostgresErrorDetailsProvider.class.getName(); } + @Override + protected String getExternalDocumentationLink() { + return DBUtils.POSTGRES_SUPPORTED_DOC_URL; + } + /** * PostgreSQL action configuration. */ @@ -170,6 +175,11 @@ public Map getDBSpecificArguments() { return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout)); } + @Override + public String getTransactionIsolationLevel() { + return connection.getTransactionIsolationLevel(); + } + @Override protected PostgresConnectorConfig getConnection() { return connection; diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java index 8e3c091f9..b230f3d1e 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java @@ -143,6 +143,11 @@ protected PostgresConnectorConfig getConnection() { return connection; } + @Override + public String getTransactionIsolationLevel() { + return connection.getTransactionIsolationLevel(); + } + @Override public void validate(FailureCollector collector) { ConfigUtil.validateConnection(this, useConnection, connection, collector); diff --git a/postgresql-plugin/widgets/PostgreSQL-connector.json b/postgresql-plugin/widgets/PostgreSQL-connector.json index 091afc972..9a7a02e14 100644 --- a/postgresql-plugin/widgets/PostgreSQL-connector.json +++ b/postgresql-plugin/widgets/PostgreSQL-connector.json @@ -31,6 +31,19 @@ "default": "5432" } }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } + }, { "widget-type": "textbox", "label": "Database", diff --git a/postgresql-plugin/widgets/Postgres-batchsink.json b/postgresql-plugin/widgets/Postgres-batchsink.json index 6aa2dad8a..14e6f8154 100644 --- a/postgresql-plugin/widgets/Postgres-batchsink.json +++ b/postgresql-plugin/widgets/Postgres-batchsink.json @@ -65,6 +65,19 @@ "label": "Password", "name": "password" }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } + }, { "widget-type": "keyvalue", "label": "Connection Arguments", @@ -186,6 +199,10 @@ "type": "property", "name": "port" }, + { + "type": "property", + "name": "transactionIsolationLevel" + }, { "type": "property", "name": "database" diff --git a/postgresql-plugin/widgets/Postgres-batchsource.json b/postgresql-plugin/widgets/Postgres-batchsource.json index 0e4ba28c1..60de4725f 100644 --- a/postgresql-plugin/widgets/Postgres-batchsource.json +++ b/postgresql-plugin/widgets/Postgres-batchsource.json @@ -65,6 +65,19 @@ "label": "Password", "name": "password" }, + { + "widget-type": "select", + "label": "Transaction Isolation Level", + "name": "transactionIsolationLevel", + "widget-attributes": { + "values": [ + "TRANSACTION_READ_COMMITTED", + "TRANSACTION_REPEATABLE_READ", + "TRANSACTION_SERIALIZABLE" + ], + "default": "TRANSACTION_SERIALIZABLE" + } + }, { "widget-type": "keyvalue", "label": "Connection Arguments", @@ -206,6 +219,10 @@ "type": "property", "name": "port" }, + { + "type": "property", + "name": "transactionIsolationLevel" + }, { "type": "property", "name": "database" diff --git a/saphana-plugin/pom.xml b/saphana-plugin/pom.xml index 26ce2e396..c40736e07 100644 --- a/saphana-plugin/pom.xml +++ b/saphana-plugin/pom.xml @@ -20,7 +20,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 SAP HANA plugin diff --git a/teradata-plugin/pom.xml b/teradata-plugin/pom.xml index c351a9d96..0201805c5 100644 --- a/teradata-plugin/pom.xml +++ b/teradata-plugin/pom.xml @@ -21,7 +21,7 @@ database-plugins-parent io.cdap.plugin - 1.12.0-SNAPSHOT + 1.12.3 teradata-plugin