diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index c710ef929..ab3be13ff 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -65,6 +65,7 @@ jobs:
with:
repository: cdapio/cdap-e2e-tests
path: e2e
+ ref: release/6.11
- name: Cache
uses: actions/cache@v4
diff --git a/amazon-redshift-plugin/pom.xml b/amazon-redshift-plugin/pom.xml
index 17aa5e48b..17a18caa9 100644
--- a/amazon-redshift-plugin/pom.xml
+++ b/amazon-redshift-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Amazon Redshift plugin
diff --git a/aurora-mysql-plugin/pom.xml b/aurora-mysql-plugin/pom.xml
index df38e7267..654bde42b 100644
--- a/aurora-mysql-plugin/pom.xml
+++ b/aurora-mysql-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Aurora DB MySQL plugin
diff --git a/aurora-postgresql-plugin/pom.xml b/aurora-postgresql-plugin/pom.xml
index cb803d1ba..fd508c8fd 100644
--- a/aurora-postgresql-plugin/pom.xml
+++ b/aurora-postgresql-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Aurora DB PostgreSQL plugin
diff --git a/cloudsql-mysql-plugin/pom.xml b/cloudsql-mysql-plugin/pom.xml
index 44dd1fe8c..012acc177 100644
--- a/cloudsql-mysql-plugin/pom.xml
+++ b/cloudsql-mysql-plugin/pom.xml
@@ -20,17 +20,51 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
CloudSQL MySQL plugin
cloudsql-mysql-plugin
4.0.0
+ CloudSQL MySQL database plugins
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
provided
@@ -41,11 +75,12 @@
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
io.cdap.plugin
mysql-plugin
- 1.12.0-SNAPSHOT
+ ${project.version}
@@ -59,24 +94,26 @@
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
test
junit
junit
-
-
- io.cdap.cdap
- cdap-api
- provided
+ ${junit.version}
+ test
org.mockito
mockito-core
+ ${mockito.version}
+ test
org.jetbrains
diff --git a/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties b/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties
index 5ff3357f2..d4fc1de28 100644
--- a/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties
+++ b/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties
@@ -12,10 +12,12 @@ errorMessageNumberOfSplits=Split-By Field Name must be specified if Number of Sp
errorMessageBoundingQuery=Bounding Query must be specified if Number of Splits is not set to 1. Specify the Bounding Query.
errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'URLDecoder: Illegal hex characters in escape (%) pattern - For input string: "$^"'
errorMessageInvalidTableName=Table 'Invalidtable' does not exist. Ensure table 'Invalidtable' is set correctly and
-errorMessageConnectionName=Connection Name must be in the format :: to connect to a public CloudSQL PostgreSQL instance.
+errorMessageConnectionName=Connection Name must be in the format :: to connect to a public CloudSQL MySQL instance.
validationSuccessMessage=No errors found.
validationErrorMessage=COUNT ERROR found
-errorLogsMessageInvalidTableName=Spark program 'phase-1' failed with error: Errors were encountered during validation. \
- Table
-errorLogsMessageInvalidCredentials =Spark program 'phase-1' failed with error: Errors were encountered during validation.
-errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'table' at line 1. Please check the system logs for more details.
+errorLogsMessageInvalidTableName=Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : io.cdap.cdap.etl.api.validation.ValidationException: Errors were encountered during validation. \
+ Table 'Table123' does not exist.. Please check the system logs for more details.
+errorLogsMessageInvalidCredentials =Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : io.cdap.cdap.etl.api.validation.ValidationException: Errors were encountered during validation. \
+ Exception while trying to validate schema of database table
+errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : java.io.IOException: You have an error in your SQL syntax; \
+ check the manual that corresponds to your MySQL server version for the right syntax to use near 'table' at line 1. Please check the system logs for more details.
diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java
index 0608edb75..770dd9030 100644
--- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java
+++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java
@@ -55,7 +55,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
CloudSQLUtil.checkConnectionName(
failureCollector,
cloudsqlMysqlActionConfig.instanceType,
- cloudsqlMysqlActionConfig.connectionName);
+ cloudsqlMysqlActionConfig.connectionName,
+ CloudSQLUtil.CLOUDSQL_MYSQL);
}
super.configurePipeline(pipelineConfigurer);
diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java
index 86a8e6f52..6cd1b0031 100644
--- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java
+++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java
@@ -74,7 +74,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
CloudSQLUtil.checkConnectionName(
failureCollector,
cloudsqlMysqlSinkConfig.connection.getInstanceType(),
- cloudsqlMysqlSinkConfig.connection.getConnectionName());
+ cloudsqlMysqlSinkConfig.connection.getConnectionName(),
+ CloudSQLUtil.CLOUDSQL_MYSQL);
}
super.configurePipeline(pipelineConfigurer);
@@ -108,6 +109,11 @@ protected String getErrorDetailsProviderClassName() {
return CloudSQLMySQLErrorDetailsProvider.class.getName();
}
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.CLOUDSQLMYSQL_SUPPORTED_DOC_URL;
+ }
+
@Override
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
String host;
diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java
index 8273169c0..201360c67 100644
--- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java
+++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java
@@ -70,7 +70,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
CloudSQLUtil.checkConnectionName(
failureCollector,
cloudsqlMysqlSourceConfig.connection.getInstanceType(),
- cloudsqlMysqlSourceConfig.connection.getConnectionName());
+ cloudsqlMysqlSourceConfig.connection.getConnectionName(),
+ CloudSQLUtil.CLOUDSQL_MYSQL);
}
super.configurePipeline(pipelineConfigurer);
diff --git a/cloudsql-postgresql-plugin/pom.xml b/cloudsql-postgresql-plugin/pom.xml
index 8faab79ba..d219d5f65 100644
--- a/cloudsql-postgresql-plugin/pom.xml
+++ b/cloudsql-postgresql-plugin/pom.xml
@@ -20,17 +20,51 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
CloudSQL PostgreSQL plugin
cloudsql-postgresql-plugin
4.0.0
+ CloudSQL PostgreSQL database plugins
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
provided
@@ -41,6 +75,7 @@
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
io.cdap.plugin
@@ -63,24 +98,26 @@
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
test
junit
junit
-
-
- io.cdap.cdap
- cdap-api
- provided
+ ${junit.version}
+ test
org.mockito
mockito-core
+ ${mockito.version}
+ test
org.jetbrains
diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java
index 1a3f8ad7b..5b13759f6 100644
--- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java
+++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLAction.java
@@ -55,7 +55,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
CloudSQLUtil.checkConnectionName(
failureCollector,
cloudsqlPostgresqlActionConfig.instanceType,
- cloudsqlPostgresqlActionConfig.connectionName);
+ cloudsqlPostgresqlActionConfig.connectionName,
+ CloudSQLUtil.CLOUDSQL_POSTGRESQL);
}
super.configurePipeline(pipelineConfigurer);
diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java
index c8ca0d6dc..060b67f82 100644
--- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java
+++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java
@@ -81,7 +81,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
CloudSQLUtil.checkConnectionName(
failureCollector,
cloudsqlPostgresqlSinkConfig.connection.getInstanceType(),
- cloudsqlPostgresqlSinkConfig.connection.getConnectionName());
+ cloudsqlPostgresqlSinkConfig.connection.getConnectionName(),
+ CloudSQLUtil.CLOUDSQL_POSTGRESQL);
}
super.configurePipeline(pipelineConfigurer);
@@ -152,6 +153,11 @@ protected String getErrorDetailsProviderClassName() {
return CloudSQLPostgreSQLErrorDetailsProvider.class.getName();
}
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.CLOUDSQLPOSTGRES_SUPPORTED_DOC_URL;
+ }
+
/** CloudSQL PostgreSQL sink config. */
public static class CloudSQLPostgreSQLSinkConfig extends AbstractDBSpecificSinkConfig {
diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java
index e32651a9a..db3f2d708 100644
--- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java
+++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java
@@ -70,7 +70,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
CloudSQLUtil.checkConnectionName(
failureCollector,
cloudsqlPostgresqlSourceConfig.connection.getInstanceType(),
- cloudsqlPostgresqlSourceConfig.connection.getConnectionName());
+ cloudsqlPostgresqlSourceConfig.connection.getConnectionName(),
+ CloudSQLUtil.CLOUDSQL_POSTGRESQL);
}
super.configurePipeline(pipelineConfigurer);
diff --git a/database-commons/pom.xml b/database-commons/pom.xml
index 683dd2f43..8ee1e295b 100644
--- a/database-commons/pom.xml
+++ b/database-commons/pom.xml
@@ -20,39 +20,76 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Database Commons
database-commons
4.0.0
+ Database Commons
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+ provided
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
com.google.guava
guava
+ ${guava.version}
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
+ test
junit
junit
+ ${junit.version}
+ test
com.mockrunner
@@ -63,6 +100,8 @@
org.mockito
mockito-core
+ ${mockito.version}
+ test
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
index 588ed78b8..c5320e25e 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
@@ -45,6 +45,7 @@ public abstract class ConnectionConfig extends PluginConfig implements DatabaseC
public static final String CONNECTION_ARGUMENTS = "connectionArguments";
public static final String JDBC_PLUGIN_NAME = "jdbcPluginName";
public static final String JDBC_PLUGIN_TYPE = "jdbc";
+ public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
@Name(JDBC_PLUGIN_NAME)
@Description("Name of the JDBC driver to use. This is the value of the 'jdbcPluginName' key defined in the JSON " +
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/DBErrorDetailsProvider.java b/database-commons/src/main/java/io/cdap/plugin/db/DBErrorDetailsProvider.java
deleted file mode 100644
index cc731d6ac..000000000
--- a/database-commons/src/main/java/io/cdap/plugin/db/DBErrorDetailsProvider.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright © 2024 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package io.cdap.plugin.db;
-
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import io.cdap.cdap.api.exception.ErrorCategory;
-import io.cdap.cdap.api.exception.ErrorCodeType;
-import io.cdap.cdap.api.exception.ErrorType;
-import io.cdap.cdap.api.exception.ErrorUtils;
-import io.cdap.cdap.api.exception.ProgramFailureException;
-import io.cdap.cdap.etl.api.exception.ErrorContext;
-import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
-
-import java.sql.SQLException;
-import java.util.List;
-
-/**
- * A custom ErrorDetailsProvider for Database plugins.
- */
-public class DBErrorDetailsProvider implements ErrorDetailsProvider {
-
- public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
- List causalChain = Throwables.getCausalChain(e);
- for (Throwable t : causalChain) {
- if (t instanceof ProgramFailureException) {
- // if causal chain already has program failure exception, return null to avoid double wrap.
- return null;
- }
- if (t instanceof SQLException) {
- return getProgramFailureException((SQLException) t, errorContext);
- }
- if (t instanceof IllegalArgumentException) {
- return getProgramFailureException((IllegalArgumentException) t, errorContext);
- }
- if (t instanceof IllegalStateException) {
- return getProgramFailureException((IllegalStateException) t, errorContext);
- }
- }
- return null;
- }
-
- /**
- * Get a ProgramFailureException with the given error
- * information from {@link SQLException}.
- *
- * @param e The SQLException to get the error information from.
- * @return A ProgramFailureException with the given error information.
- */
- private ProgramFailureException getProgramFailureException(SQLException e, ErrorContext errorContext) {
- String errorMessage = e.getMessage();
- String sqlState = e.getSQLState();
- int errorCode = e.getErrorCode();
- String errorMessageWithDetails = String.format(
- "Error occurred in the phase: '%s' with sqlState: '%s', errorCode: '%s', errorMessage: %s",
- errorContext.getPhase(), sqlState, errorCode, errorMessage);
- String externalDocumentationLink = getExternalDocumentationLink();
- if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
- if (!errorMessageWithDetails.endsWith(".")) {
- errorMessageWithDetails = errorMessageWithDetails + ".";
- }
- errorMessageWithDetails = String.format("%s For more details, see %s", errorMessageWithDetails,
- externalDocumentationLink);
- }
- return ErrorUtils.getProgramFailureException(Strings.isNullOrEmpty(sqlState) ?
- new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN) : getErrorCategoryFromSqlState(sqlState),
- errorMessage, errorMessageWithDetails, getErrorTypeFromErrorCode(errorCode, sqlState), true,
- ErrorCodeType.SQLSTATE, sqlState, externalDocumentationLink, e);
- }
-
- /**
- * Get a ProgramFailureException with the given error
- * information from {@link IllegalArgumentException}.
- *
- * @param e The IllegalArgumentException to get the error information from.
- * @return A ProgramFailureException with the given error information.
- */
- private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
- String errorMessage = e.getMessage();
- String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
- return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
- errorMessage,
- String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e);
- }
-
- /**
- * Get a ProgramFailureException with the given error
- * information from {@link IllegalStateException}.
- *
- * @param e The IllegalStateException to get the error information from.
- * @return A ProgramFailureException with the given error information.
- */
- private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
- String errorMessage = e.getMessage();
- String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
- return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
- errorMessage,
- String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
- }
-
- /**
- * Get the external documentation link for the client errors if available.
- *
- * @return The external documentation link as a {@link String}.
- */
- protected String getExternalDocumentationLink() {
- return null;
- }
-
- protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) {
- return ErrorType.UNKNOWN;
- }
-
- protected ErrorCategory getErrorCategoryFromSqlState(String sqlState) {
- return new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN);
- }
-}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBAction.java b/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBAction.java
index a2be9cbf0..0eaac3148 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBAction.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBAction.java
@@ -16,12 +16,15 @@
package io.cdap.plugin.db.action;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.util.DBUtils;
import java.sql.Driver;
+import java.sql.SQLException;
/**
* Action that runs a db command.
@@ -40,7 +43,18 @@ public AbstractDBAction(QueryConfig config, Boolean enableAutoCommit) {
public void run(ActionContext context) throws Exception {
Class extends Driver> driverClass = context.loadPluginClass(JDBC_PLUGIN_ID);
DBRun executeQuery = new DBRun(config, driverClass, enableAutoCommit);
- executeQuery.run();
+ try {
+ executeQuery.run();
+ } catch (Exception e) {
+ if (e instanceof SQLException) {
+ DBErrorDetailsProvider dbe = new DBErrorDetailsProvider();
+ throw dbe.getProgramFailureException((SQLException) e, null);
+ }
+ FailureCollector collector = context.getFailureCollector();
+ collector.addFailure("Failed to execute query with message: " + e.getMessage(), null)
+ .withStacktrace(e.getStackTrace());
+ collector.getOrThrowException();
+ }
}
@Override
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java
index 5c6b08031..8de0e4d70 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java
@@ -20,8 +20,9 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.TransactionIsolationLevel;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
@@ -42,6 +43,12 @@ public abstract class AbstractDBSpecificConnectorConfig extends AbstractDBConnec
@Nullable
protected Integer port;
+ @Name(ConnectionConfig.TRANSACTION_ISOLATION_LEVEL)
+ @Description("The transaction isolation level for the database session.")
+ @Macro
+ @Nullable
+ protected String transactionIsolationLevel;
+
public String getHost() {
return host;
}
@@ -55,4 +62,21 @@ public int getPort() {
public boolean canConnect() {
return super.canConnect() && !containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT);
}
+
+ @Override
+ public Map getAdditionalArguments() {
+ Map additonalArguments = new HashMap<>();
+ if (getTransactionIsolationLevel() != null) {
+ additonalArguments.put(TransactionIsolationLevel.CONF_KEY, getTransactionIsolationLevel());
+ }
+ return additonalArguments;
+ }
+
+ public String getTransactionIsolationLevel() {
+ if (transactionIsolationLevel == null) {
+ return null;
+ }
+ return TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
+ }
}
+
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
index 26a95405b..0bb4bf123 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
@@ -25,6 +25,10 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCodeType;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
@@ -43,7 +47,6 @@
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.ConnectionConfigAccessor;
import io.cdap.plugin.db.DBConfig;
-import io.cdap.plugin.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.Operation;
import io.cdap.plugin.db.SchemaReader;
@@ -172,7 +175,17 @@ public void validateOperations(FailureCollector collector, T dbSinkConfig, @Null
* @return ErrorDetailsProvider class name
*/
protected String getErrorDetailsProviderClassName() {
- return DBErrorDetailsProvider.class.getName();
+ return null;
+ }
+
+ /**
+ * Returns the external documentation link.
+ * Override this method to provide a custom external documentation link.
+ *
+ * @return external documentation link
+ */
+ protected String getExternalDocumentationLink() {
+ return null;
}
@Override
@@ -215,6 +228,7 @@ public void prepareRun(BatchSinkContext context) {
configAccessor.setInitQueries(dbSinkConfig.getInitQueries());
configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName());
configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString);
+ configAccessor.getConfiguration().set(ETLDBOutputFormat.STAGE_NAME, context.getStageName());
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName);
@@ -240,7 +254,9 @@ public void prepareRun(BatchSinkContext context) {
context.getArguments().get(ETLDBOutputFormat.COMMIT_BATCH_SIZE));
}
// set error details provider
- context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
+ if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) {
+ context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
+ }
addOutputContext(context);
}
protected void addOutputContext(BatchSinkContext context) {
@@ -296,8 +312,23 @@ private Schema inferSchema(Class extends Driver> driverClass) {
inferredFields.addAll(getSchemaReader().getSchemaFields(rs));
}
} catch (SQLException e) {
- throw new InvalidStageException("Error while reading table metadata", e);
-
+ // wrap exception to ensure SQLException-child instances not exposed to contexts w/o jdbc driver in classpath
+ String errorMessage =
+ String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(),
+ e.getSQLState(), e.getErrorCode());
+ String errorMessageWithDetails = String.format("Error while reading table metadata." +
+ "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState());
+ String externalDocumentationLink = getExternalDocumentationLink();
+ if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
+ if (!errorMessage.endsWith(".")) {
+ errorMessage = errorMessage + ".";
+ }
+ errorMessage = String.format("%s For more details, see %s", errorMessageWithDetails, errorMessage);
+ }
+ throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
+ errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE,
+ e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(),
+ e.getSQLState(), e.getErrorCode()));
}
} catch (IllegalAccessException | InstantiationException | SQLException e) {
throw new InvalidStageException("JDBC Driver unavailable: " + dbSinkConfig.getJdbcPluginName(), e);
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/sink/ETLDBOutputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/sink/ETLDBOutputFormat.java
index ad2b91ab1..ad196386c 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/sink/ETLDBOutputFormat.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/sink/ETLDBOutputFormat.java
@@ -25,6 +25,8 @@
import io.cdap.plugin.db.TransactionIsolationLevel;
import io.cdap.plugin.util.DBUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -43,6 +45,7 @@
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import static io.cdap.plugin.db.ConnectionConfigAccessor.OPERATION_NAME;
import static io.cdap.plugin.db.ConnectionConfigAccessor.RELATION_TABLE_KEY;
@@ -56,15 +59,92 @@
public class ETLDBOutputFormat extends DBOutputFormat {
// Batch size before submitting a batch to the SQL engine. If set to 0, no batches will be submitted until commit.
public static final String COMMIT_BATCH_SIZE = "io.cdap.plugin.db.output.commit.batch.size";
+ public static final String STAGE_NAME = "io.cdap.plugin.db.output.stage_name";
public static final int DEFAULT_COMMIT_BATCH_SIZE = 1000;
private static final Character ESCAPE_CHAR = '"';
+ // Format for connection map's key will be "taskAttemptId_stageName"
+ private static final String CONNECTION_MAP_KEY_FORMAT = "%s_%s";
+
+ // CONNECTION_MAP will be used to store connections with "taskAttemptId_stageName" as key and
+ // connection object as value. Making it static to be accessed from multiple task attempts within same executor.
+ private static final Map CONNECTION_MAP = new ConcurrentHashMap<>();
private static final Logger LOG = LoggerFactory.getLogger(ETLDBOutputFormat.class);
private Configuration conf;
private Driver driver;
private JDBCDriverShim driverShim;
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new OutputCommitter() {
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ conf = context.getConfiguration();
+ String stageName = conf.get(STAGE_NAME);
+ String connectionId = getConnectionMapKeyFormat(context.getTaskAttemptID().toString(), stageName);
+ Connection connection;
+ if ((connection = CONNECTION_MAP.remove(connectionId)) != null) {
+ try {
+ connection.commit();
+ } catch (SQLException e) {
+ try {
+ connection.rollback();
+ } catch (SQLException ex) {
+ LOG.warn(StringUtils.stringifyException(ex));
+ }
+ throw new IOException(e);
+ } finally {
+ try {
+ connection.close();
+ LOG.debug("Connection Closed after committing the task with taskAttemptId {}", connectionId);
+ } catch (SQLException ex) {
+ LOG.warn(StringUtils.stringifyException(ex));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ conf = context.getConfiguration();
+ String stageName = conf.get(STAGE_NAME);
+ String connectionId = getConnectionMapKeyFormat(context.getTaskAttemptID().toString(), stageName);
+ Connection connection;
+ if ((connection = CONNECTION_MAP.remove(connectionId)) != null) {
+ try {
+ connection.rollback();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ try {
+ connection.close();
+ LOG.debug("Connection Closed after rollback the task with taskAttemptId {}", connectionId);
+ } catch (SQLException ex) {
+ LOG.warn(StringUtils.stringifyException(ex));
+ }
+ }
+ }
+ }
+ };
+ }
+
@Override
public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException {
conf = context.getConfiguration();
@@ -81,6 +161,11 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOE
try {
Connection connection = getConnection(conf);
+ String stageName = conf.get(STAGE_NAME);
+ // If using multiple sinks, task attemptID can be same in that case, appending stage in the end for uniqueness.
+ String connectionId = getConnectionMapKeyFormat(context.getTaskAttemptID().toString(), stageName);
+ CONNECTION_MAP.put(connectionId, connection);
+ LOG.debug("Connection Added to the map with connectionId : {}", connectionId);
PreparedStatement statement = connection.prepareStatement(constructQueryOnOperation(tableName, fieldNames,
operationName, listKeys));
return new DBRecordWriter(connection, statement) {
@@ -98,23 +183,15 @@ public void close(TaskAttemptContext context) throws IOException {
if (!emptyData) {
getStatement().executeBatch();
}
- getConnection().commit();
} catch (SQLException e) {
- try {
- getConnection().rollback();
- } catch (SQLException ex) {
- LOG.warn(StringUtils.stringifyException(ex));
- }
throw new IOException(e);
} finally {
try {
getStatement().close();
- getConnection().close();
} catch (SQLException ex) {
throw new IOException(ex);
}
}
-
try {
DriverManager.deregisterDriver(driverShim);
} catch (SQLException e) {
@@ -298,4 +375,8 @@ public String constructUpdateQuery(String table, String[] fieldNames, String[] l
return query.toString();
}
}
+
+ private String getConnectionMapKeyFormat(String taskAttemptId, String stageName) {
+ return String.format(CONNECTION_MAP_KEY_FORMAT, taskAttemptId, stageName);
+ }
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
index 559985758..54d1e2ab6 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
@@ -46,7 +46,6 @@
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.ConnectionConfigAccessor;
import io.cdap.plugin.db.DBConfig;
-import io.cdap.plugin.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.TransactionIsolationLevel;
@@ -201,18 +200,20 @@ private Schema loadSchemaFromDB(Class extends Driver> driverClass)
} catch (SQLException e) {
// wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath
+ String errorMessage =
+ String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(),
+ e.getSQLState(), e.getErrorCode());
String errorMessageWithDetails = String.format("Error occurred while trying to get schema from database." +
"Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState());
String externalDocumentationLink = getExternalDocumentationLink();
if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
- if (!errorMessageWithDetails.endsWith(".")) {
- errorMessageWithDetails = errorMessageWithDetails + ".";
+ if (!errorMessage.endsWith(".")) {
+ errorMessage = errorMessage + ".";
}
- errorMessageWithDetails = String.format("%s For more details, see %s", errorMessageWithDetails,
- externalDocumentationLink);
+ errorMessage = String.format("%s For more details, see %s", errorMessage, externalDocumentationLink);
}
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
- e.getMessage(), errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE,
+ errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE,
e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(),
e.getSQLState(), e.getErrorCode()));
} finally {
@@ -239,7 +240,7 @@ protected SchemaReader getSchemaReader() {
* @return ErrorDetailsProvider class name
*/
protected String getErrorDetailsProviderClassName() {
- return DBErrorDetailsProvider.class.getName();
+ return null;
}
private DriverCleanup loadPluginClassAndGetDriver(Class extends Driver> driverClass)
@@ -299,7 +300,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
}
// set error details provider
- context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
+ if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) {
+ context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
+ }
context.setInput(Input.of(sourceConfig.getReferenceName(), new SourceInputFormatProvider(
DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration())));
}
@@ -373,6 +376,12 @@ protected Class extends DBWritable> getDBRecordType() {
return DBRecord.class;
}
+ /**
+ * Returns the external documentation link.
+ * Override this method to provide a custom external documentation link.
+ *
+ * @return external documentation link
+ */
protected String getExternalDocumentationLink() {
return null;
}
@@ -520,7 +529,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) {
}
@VisibleForTesting
- static void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) {
+ void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) {
if (configSchema == null) {
collector.addFailure("Schema should not be null or empty.", null)
.withConfigProperty(SCHEMA);
@@ -541,14 +550,20 @@ static void validateSchema(Schema actualSchema, Schema configSchema, FailureColl
Schema expectedFieldSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
- if (actualFieldSchema.getType() != expectedFieldSchema.getType() ||
- actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) {
- collector.addFailure(
- String.format("Schema field '%s' has type '%s but found '%s'.",
- field.getName(), expectedFieldSchema.getDisplayName(),
- actualFieldSchema.getDisplayName()), null)
- .withOutputSchemaField(field.getName());
- }
+ validateField(collector, field, actualFieldSchema, expectedFieldSchema);
+ }
+ }
+
+ protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema,
+ Schema expectedFieldSchema) {
+ if (actualFieldSchema.getType() != expectedFieldSchema.getType() ||
+ actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) {
+ collector.addFailure(
+ String.format("Schema field '%s' is expected to have type '%s but found '%s'.", field.getName(),
+ expectedFieldSchema.getDisplayName(), actualFieldSchema.getDisplayName()),
+ String.format("Change the data type of field %s to %s.", field.getName(),
+ actualFieldSchema.getDisplayName()))
+ .withOutputSchemaField(field.getName());
}
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/util/CloudSQLUtil.java b/database-commons/src/main/java/io/cdap/plugin/util/CloudSQLUtil.java
index 11595ac06..f704f2ad5 100644
--- a/database-commons/src/main/java/io/cdap/plugin/util/CloudSQLUtil.java
+++ b/database-commons/src/main/java/io/cdap/plugin/util/CloudSQLUtil.java
@@ -31,6 +31,9 @@ public class CloudSQLUtil {
public static final String INSTANCE_TYPE = "instanceType";
public static final String PUBLIC_INSTANCE = "public";
public static final String PRIVATE_INSTANCE = "private";
+ public static final String CLOUDSQL_POSTGRESQL = "CloudSQL PostgreSQL";
+ public static final String CLOUDSQL_MYSQL = "CloudSQL MySQL";
+
/**
* Utility method to check the Connection Name format of a CloudSQL instance.
@@ -38,9 +41,10 @@ public class CloudSQLUtil {
* @param failureCollector {@link FailureCollector} for the pipeline
* @param instanceType CloudSQL instance type
* @param connectionName Connection Name for the CloudSQL instance
+ * @param databaseType Type of CloudSQL instance- CloudSQL PostgreSQL, CLoudSQL MySQL
*/
public static void checkConnectionName(
- FailureCollector failureCollector, String instanceType, String connectionName) {
+ FailureCollector failureCollector, String instanceType, String connectionName, String databaseType) {
if (PUBLIC_INSTANCE.equalsIgnoreCase(instanceType)) {
Pattern connectionNamePattern =
@@ -50,16 +54,16 @@ public static void checkConnectionName(
if (!matcher.matches()) {
failureCollector
.addFailure(
- "Connection Name must be in the format :: to connect to "
- + "a public CloudSQL PostgreSQL instance.", null)
+ String.format("Connection Name must be in the format :: to connect to "
+ + "a public %s instance.", databaseType), null)
.withConfigProperty(CONNECTION_NAME);
}
} else {
if (!InetAddresses.isInetAddress(connectionName)) {
failureCollector
.addFailure(
- "Enter the internal IP address of the Compute Engine VM cloudsql proxy "
- + "is running on, to connect to a private CloudSQL PostgreSQL instance.", null)
+ String.format("Enter the internal IP address of the Compute Engine VM cloudsql proxy "
+ + "is running on, to connect to a private %s instance.", databaseType), null)
.withConfigProperty(CONNECTION_NAME);
}
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java
index ffcfdc375..b125a7214 100644
--- a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java
+++ b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java
@@ -61,9 +61,13 @@ public final class DBUtils {
public static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender();
public static final String MYSQL_SUPPORTED_DOC_URL = "https://dev.mysql.com/doc/mysql-errors/9.0/en/";
+ public static final String MARIADB_SUPPORTED_DOC_URL = "https://mariadb.com/kb/en/mariadb-error-codes/";
+ public static final String MSSQL_SUPPORTED_DOC_URL =
+ "https://docs.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors";
public static final String CLOUDSQLMYSQL_SUPPORTED_DOC_URL = "https://cloud.google.com/sql/docs/mysql/error-messages";
public static final String POSTGRES_SUPPORTED_DOC_URL =
"https://www.postgresql.org/docs/current/errcodes-appendix.html";
+ public static final String ORACLE_SUPPORTED_DOC_URL = "https://docs.oracle.com/en/error-help/db/ora-index.html";
public static final String CLOUDSQLPOSTGRES_SUPPORTED_DOC_URL =
"https://cloud.google.com/sql/docs/postgres/error-messages";
diff --git a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java
index 0f5a3ca4a..cbe1361d0 100644
--- a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java
+++ b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java
@@ -17,6 +17,7 @@
package io.cdap.plugin.db;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.exception.ProgramFailureException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -162,49 +163,49 @@ public void testGetSchemaThrowsExceptionOnNumericWithZeroPrecision() throws SQLE
reader.getSchema(metadata, 1);
}
- @Test(expected = SQLException.class)
+ @Test(expected = ProgramFailureException.class)
public void testGetSchemaThrowsExceptionOnArray() throws SQLException {
when(metadata.getColumnType(eq(1))).thenReturn(Types.ARRAY);
reader.getSchema(metadata, 1);
}
- @Test(expected = SQLException.class)
+ @Test(expected = ProgramFailureException.class)
public void testGetSchemaThrowsExceptionOnDatalink() throws SQLException {
when(metadata.getColumnType(eq(1))).thenReturn(Types.DATALINK);
reader.getSchema(metadata, 1);
}
- @Test(expected = SQLException.class)
+ @Test(expected = ProgramFailureException.class)
public void testGetSchemaThrowsExceptionOnDistinct() throws SQLException {
when(metadata.getColumnType(eq(1))).thenReturn(Types.DISTINCT);
reader.getSchema(metadata, 1);
}
- @Test(expected = SQLException.class)
+ @Test(expected = ProgramFailureException.class)
public void testGetSchemaThrowsExceptionOnJavaObject() throws SQLException {
when(metadata.getColumnType(eq(1))).thenReturn(Types.JAVA_OBJECT);
reader.getSchema(metadata, 1);
}
- @Test(expected = SQLException.class)
+ @Test(expected = ProgramFailureException.class)
public void testGetSchemaThrowsExceptionOnOther() throws SQLException {
when(metadata.getColumnType(eq(1))).thenReturn(Types.OTHER);
reader.getSchema(metadata, 1);
}
- @Test(expected = SQLException.class)
+ @Test(expected = ProgramFailureException.class)
public void testGetSchemaThrowsExceptionOnRef() throws SQLException {
when(metadata.getColumnType(eq(1))).thenReturn(Types.REF);
reader.getSchema(metadata, 1);
}
- @Test(expected = SQLException.class)
+ @Test(expected = ProgramFailureException.class)
public void testGetSchemaThrowsExceptionOnSQLXML() throws SQLException {
when(metadata.getColumnType(eq(1))).thenReturn(Types.SQLXML);
reader.getSchema(metadata, 1);
}
- @Test(expected = SQLException.class)
+ @Test(expected = ProgramFailureException.class)
public void testGetSchemaThrowsExceptionOnStruct() throws SQLException {
when(metadata.getColumnType(eq(1))).thenReturn(Types.STRUCT);
reader.getSchema(metadata, 1);
diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java
index 3dc7a2d1c..a8be38b46 100644
--- a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java
+++ b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java
@@ -43,11 +43,17 @@ public class AbstractDBSourceTest {
Schema.Field.of("double_column", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
Schema.Field.of("boolean_column", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN)))
);
+ private static final AbstractDBSource.DBSourceConfig TEST_CONFIG = new AbstractDBSource.DBSourceConfig() {
+ @Override
+ public String getConnectionString() {
+ return "";
+ }
+ };
@Test
public void testValidateSourceSchemaCorrectSchema() {
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
- AbstractDBSource.DBSourceConfig.validateSchema(SCHEMA, SCHEMA, collector);
+ TEST_CONFIG.validateSchema(SCHEMA, SCHEMA, collector);
Assert.assertEquals(0, collector.getValidationFailures().size());
}
@@ -65,7 +71,7 @@ public void testValidateSourceSchemaMismatchFields() {
);
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
- AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
+ TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector);
assertPropertyValidationFailed(collector, "boolean_column");
}
@@ -84,7 +90,7 @@ public void testValidateSourceSchemaInvalidFieldType() {
);
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
- AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
+ TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector);
assertPropertyValidationFailed(collector, "boolean_column");
}
diff --git a/db2-plugin/pom.xml b/db2-plugin/pom.xml
index 39c3fcd52..920ed89c7 100644
--- a/db2-plugin/pom.xml
+++ b/db2-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
IBM DB2 plugin
diff --git a/generic-database-plugin/pom.xml b/generic-database-plugin/pom.xml
index b823356d8..08f979397 100644
--- a/generic-database-plugin/pom.xml
+++ b/generic-database-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Generic database plugin
diff --git a/generic-db-argument-setter/pom.xml b/generic-db-argument-setter/pom.xml
index d8a78cd4d..70c4414dc 100644
--- a/generic-db-argument-setter/pom.xml
+++ b/generic-db-argument-setter/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Generic database argument setter plugin
diff --git a/mariadb-plugin/docs/Mariadb-batchsink.md b/mariadb-plugin/docs/Mariadb-batchsink.md
index 11176c0db..e4541fe67 100644
--- a/mariadb-plugin/docs/Mariadb-batchsink.md
+++ b/mariadb-plugin/docs/Mariadb-batchsink.md
@@ -60,41 +60,39 @@ connections.
Data Types Mapping
----------
- +--------------------------------+-----------------------+------------------------------------+
- | MariaDB Data Type | CDAP Schema Data Type | Comment |
- +--------------------------------+-----------------------+------------------------------------+
- | TINYINT | int | |
- | BOOLEAN, BOOL | boolean | |
- | SMALLINT | int | |
- | MEDIUMINT | int | |
- | INT, INTEGER | int | |
- | BIGINT | long | |
- | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
- | FLOAT | float | |
- | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
- | BIT | boolean | |
- | CHAR | string | |
- | VARCHAR | string | |
- | BINARY | bytes | |
- | CHAR BYTE | bytes | |
- | VARBINARY | bytes | |
- | TINYBLOB | bytes | |
- | BLOB | bytes | |
- | MEDIUMBLOB | bytes | |
- | LONGBLOB | bytes | |
- | TINYTEXT | string | |
- | TEXT | string | |
- | MEDIUMTEXT | string | |
- | LONGTEXT | string | |
- | JSON | string | In MariaDB it is alias to LONGTEXT |
- | ENUM | string | Mapping to String by default |
- | SET | string | |
- | DATE | date | |
- | TIME | time_micros | |
- | DATETIME | timestamp_micros | |
- | TIMESTAMP | timestamp_micros | |
- | YEAR | date | |
- +--------------------------------+-----------------------+------------------------------------+
+ | MariaDB Data Type | CDAP Schema Data Type | Comment |
+ |--------------------------------|-----------------------|---------------------------------------------------------|
+ | TINYINT | int | |
+ | BOOLEAN, BOOL | boolean | |
+ | SMALLINT | int | |
+ | MEDIUMINT | int | |
+ | INT, INTEGER | int | |
+ | BIGINT | long | |
+ | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
+ | FLOAT | float | |
+ | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
+ | BIT | boolean | |
+ | CHAR | string | |
+ | VARCHAR | string | |
+ | BINARY | bytes | |
+ | CHAR BYTE | bytes | |
+ | VARBINARY | bytes | |
+ | TINYBLOB | bytes | |
+ | BLOB | bytes | |
+ | MEDIUMBLOB | bytes | |
+ | LONGBLOB | bytes | |
+ | TINYTEXT | string | |
+ | TEXT | string | |
+ | MEDIUMTEXT | string | |
+ | LONGTEXT | string | |
+ | JSON | string | In MariaDB it is alias to LONGTEXT |
+ | ENUM | string | Mapping to String by default |
+ | SET | string | |
+ | DATE | date | |
+ | TIME | time_micros | |
+ | DATETIME | timestamp_micros | |
+ | TIMESTAMP | timestamp_micros | |
+ | YEAR | int | Users can manually set output schema to map it to Date. |
Example
-------
diff --git a/mariadb-plugin/docs/Mariadb-batchsource.md b/mariadb-plugin/docs/Mariadb-batchsource.md
index 2b1fe3944..713af2ee8 100644
--- a/mariadb-plugin/docs/Mariadb-batchsource.md
+++ b/mariadb-plugin/docs/Mariadb-batchsource.md
@@ -78,43 +78,39 @@ with the tradeoff of higher memory usage.
Data Types Mapping
----------
-
- +--------------------------------+-----------------------+------------------------------------+
- | MariaDB Data Type | CDAP Schema Data Type | Comment |
- +--------------------------------+-----------------------+------------------------------------+
- | TINYINT | int | |
- | BOOLEAN, BOOL | boolean | |
- | SMALLINT | int | |
- | MEDIUMINT | int | |
- | INT, INTEGER | int | |
- | BIGINT | long | |
- | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
- | FLOAT | float | |
- | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
- | BIT | boolean | |
- | CHAR | string | |
- | VARCHAR | string | |
- | BINARY | bytes | |
- | CHAR BYTE | bytes | |
- | VARBINARY | bytes | |
- | TINYBLOB | bytes | |
- | BLOB | bytes | |
- | MEDIUMBLOB | bytes | |
- | LONGBLOB | bytes | |
- | TINYTEXT | string | |
- | TEXT | string | |
- | MEDIUMTEXT | string | |
- | LONGTEXT | string | |
- | JSON | string | In MariaDB it is alias to LONGTEXT |
- | ENUM | string | Mapping to String by default |
- | SET | string | |
- | DATE | date | |
- | TIME | time_micros | |
- | DATETIME | timestamp_micros | |
- | TIMESTAMP | timestamp_micros | |
- | YEAR | date | |
- +--------------------------------+-----------------------+------------------------------------+
-
+ | MariaDB Data Type | CDAP Schema Data Type | Comment |
+ |--------------------------------|-----------------------|---------------------------------------------------------|
+ | TINYINT | int | |
+ | BOOLEAN, BOOL | boolean | |
+ | SMALLINT | int | |
+ | MEDIUMINT | int | |
+ | INT, INTEGER | int | |
+ | BIGINT | long | |
+ | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
+ | FLOAT | float | |
+ | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
+ | BIT | boolean | |
+ | CHAR | string | |
+ | VARCHAR | string | |
+ | BINARY | bytes | |
+ | CHAR BYTE | bytes | |
+ | VARBINARY | bytes | |
+ | TINYBLOB | bytes | |
+ | BLOB | bytes | |
+ | MEDIUMBLOB | bytes | |
+ | LONGBLOB | bytes | |
+ | TINYTEXT | string | |
+ | TEXT | string | |
+ | MEDIUMTEXT | string | |
+ | LONGTEXT | string | |
+ | JSON | string | In MariaDB it is alias to LONGTEXT |
+ | ENUM | string | Mapping to String by default |
+ | SET | string | |
+ | DATE | date | |
+ | TIME | time_micros | |
+ | DATETIME | timestamp_micros | |
+ | TIMESTAMP | timestamp_micros | |
+ | YEAR | int | Users can manually set output schema to map it to Date. |
Example
------
diff --git a/mariadb-plugin/pom.xml b/mariadb-plugin/pom.xml
index 7ece99f31..682cc153f 100644
--- a/mariadb-plugin/pom.xml
+++ b/mariadb-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Maria DB plugin
@@ -83,6 +83,11 @@
RELEASE
compile
+
+ io.cdap.plugin
+ mysql-plugin
+ ${project.version}
+
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java
new file mode 100644
index 000000000..94498c787
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.plugin.db.ColumnType;
+import io.cdap.plugin.mysql.MysqlDBRecord;
+import java.util.List;
+
+/**
+ * Writable class for MariaDB Source/Sink.
+ */
+public class MariadbDBRecord extends MysqlDBRecord {
+
+ /**
+ * Used in map-reduce. Do not remove.
+ */
+ @SuppressWarnings("unused")
+ public MariadbDBRecord() {
+ // Required by Hadoop DBRecordReader to create an instance
+ }
+
+ public MariadbDBRecord(StructuredRecord record, List columnTypes) {
+ super(record, columnTypes);
+ }
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbErrorDetailsProvider.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbErrorDetailsProvider.java
new file mode 100644
index 000000000..38405225d
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbErrorDetailsProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+
+import io.cdap.plugin.mysql.MysqlErrorDetailsProvider;
+import io.cdap.plugin.util.DBUtils;
+
+/**
+ * A custom ErrorDetailsProvider for MariaDb plugins.
+ */
+public class MariadbErrorDetailsProvider extends MysqlErrorDetailsProvider {
+
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.MARIADB_SUPPORTED_DOC_URL;
+ }
+
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java
new file mode 100644
index 000000000..71ccb0d06
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbFieldsValidator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import io.cdap.plugin.mysql.MysqlFieldsValidator;
+
+/**
+ * Field validator for maraidb
+ */
+public class MariadbFieldsValidator extends MysqlFieldsValidator {
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java
new file mode 100644
index 000000000..37ac12a93
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+
+import io.cdap.plugin.mysql.MysqlSchemaReader;
+import java.util.Map;
+
+/**
+ * Schema reader for mapping Maria DB type
+ */
+public class MariadbSchemaReader extends MysqlSchemaReader {
+
+ public MariadbSchemaReader (String sessionID) {
+ super(sessionID);
+ }
+
+ public MariadbSchemaReader (String sessionID, Map connectionArguments) {
+ super(sessionID, connectionArguments);
+ }
+
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java
index ab20f3c5d..52a73344a 100644
--- a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java
@@ -19,9 +19,15 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.plugin.db.DBRecord;
+import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.DBSpecificSinkConfig;
import io.cdap.plugin.db.sink.AbstractDBSink;
+import io.cdap.plugin.db.sink.FieldsValidator;
+import io.cdap.plugin.mysql.MysqlFieldsValidator;
+import io.cdap.plugin.util.DBUtils;
import java.util.Map;
import javax.annotation.Nullable;
@@ -45,6 +51,32 @@ public MariadbSink(MariadbSinkConfig mariadbSinkConfig) {
this.mariadbSinkConfig = mariadbSinkConfig;
}
+ @Override
+ protected DBRecord getDBRecord(StructuredRecord output) {
+ return new MariadbDBRecord(output, columnTypes);
+ }
+
+ @Override
+ protected SchemaReader getSchemaReader() {
+ return new MariadbSchemaReader(null);
+ }
+
+
+ @Override
+ protected String getErrorDetailsProviderClassName() {
+ return MariadbErrorDetailsProvider.class.getName();
+ }
+
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.MARIADB_SUPPORTED_DOC_URL;
+ }
+
+ @Override
+ protected FieldsValidator getFieldsValidator() {
+ return new MariadbFieldsValidator();
+ }
+
/**
* MariaDB Sink Config.
*/
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java
index d5ffcb290..28204100c 100644
--- a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java
@@ -19,10 +19,19 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import io.cdap.plugin.common.Asset;
+import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.DBSpecificSourceConfig;
import io.cdap.plugin.db.source.AbstractDBSource;
+import io.cdap.plugin.util.DBUtils;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -53,10 +62,46 @@ protected String createConnectionString() {
mariadbSourceConfig.host, mariadbSourceConfig.port, mariadbSourceConfig.database);
}
+ @Override
+ protected Class extends DBWritable> getDBRecordType() {
+ return MariadbDBRecord.class;
+ }
+
+ @Override
+ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
+ String fqn = DBUtils.constructFQN("mariadb",
+ mariadbSourceConfig.host,
+ mariadbSourceConfig.port,
+ mariadbSourceConfig.database,
+ mariadbSourceConfig.getReferenceName());
+ Asset asset = Asset.builder(mariadbSourceConfig.getReferenceName()).setFqn(fqn).build();
+ return new LineageRecorder(context, asset);
+ }
+
+ @Override
+ protected SchemaReader getSchemaReader() {
+ return new MariadbSchemaReader(null, mariadbSourceConfig.getConnectionArguments());
+ }
+
+ @Override
+ protected String getErrorDetailsProviderClassName() {
+ return MariadbErrorDetailsProvider.class.getName();
+ }
+
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.MARIADB_SUPPORTED_DOC_URL;
+ }
+
/**
* MaraiDB source mariadbSourceConfig.
*/
public static class MariadbSourceConfig extends DBSpecificSourceConfig {
+ private static final String JDBC_PROPERTY_CONNECT_TIMEOUT = "connectTimeout";
+ private static final String JDBC_PROPERTY_SOCKET_TIMEOUT = "socketTimeout";
+ private static final String JDBC_REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements";
+
+ private static final String MARIADB_TINYINT1_IS_BIT = "tinyInt1isBit";
@Name(MariadbConstants.AUTO_RECONNECT)
@Description("Should the driver try to re-establish stale and/or dead connections")
@@ -116,5 +161,43 @@ public Map getDBSpecificArguments() {
public List getInitQueries() {
return MariadbUtil.composeDbInitQueries(useAnsiQuotes);
}
+
+ @Override
+ public Map getConnectionArguments() {
+ Map arguments = new HashMap<>(super.getConnectionArguments());
+ // the unit below is millisecond
+ arguments.putIfAbsent(JDBC_PROPERTY_CONNECT_TIMEOUT, "20000");
+ arguments.putIfAbsent(JDBC_PROPERTY_SOCKET_TIMEOUT, "20000");
+ arguments.putIfAbsent(JDBC_REWRITE_BATCHED_STATEMENTS, "true");
+ // MariaDB property to ensure that TINYINT(1) type data is not converted to MariaDB Bit/Boolean type in the
+ // ResultSet.
+ arguments.putIfAbsent(MARIADB_TINYINT1_IS_BIT, "false");
+ return arguments;
+ }
+
+ @Override
+ protected void validateField(FailureCollector collector,
+ Schema.Field field,
+ Schema actualFieldSchema,
+ Schema expectedFieldSchema) {
+ // Backward compatibility changes to support MySQL YEAR to Date type conversion
+ if (Schema.LogicalType.DATE.equals(expectedFieldSchema.getLogicalType())
+ && Schema.Type.INT.equals(actualFieldSchema.getType())) {
+ return;
+ }
+
+ // Backward compatibility change to support MySQL MEDIUMINT UNSIGNED to Long type conversion
+ if (Schema.Type.LONG.equals(expectedFieldSchema.getType())
+ && Schema.Type.INT.equals(actualFieldSchema.getType())) {
+ return;
+ }
+
+ // Backward compatibility change to support MySQL TINYINT(1) to Bool type conversion
+ if (Schema.Type.BOOLEAN.equals(expectedFieldSchema.getType())
+ && Schema.Type.INT.equals(actualFieldSchema.getType())) {
+ return;
+ }
+ super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);
+ }
}
}
diff --git a/memsql-plugin/pom.xml b/memsql-plugin/pom.xml
index c9dbaf035..f3ae24c38 100644
--- a/memsql-plugin/pom.xml
+++ b/memsql-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Memsql plugin
diff --git a/mssql-plugin/docs/SQL Server-connector.md b/mssql-plugin/docs/SQL Server-connector.md
index cb72161f5..6f0038715 100644
--- a/mssql-plugin/docs/SQL Server-connector.md
+++ b/mssql-plugin/docs/SQL Server-connector.md
@@ -22,6 +22,14 @@ authentication. Optional for databases that do not require authentication.
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the database connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
+
+For more details on the Transaction Isolation Levels supported in SQL Server, refer to the [SQL Server documentation](https://learn.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver16)
+
**Authentication Type:** Indicates which authentication method will be used for the connection. Use 'SQL Login'. to
connect to a SQL Server using username and password properties. Use 'Active Directory Password' to connect to an Azure
SQL Database/Data Warehouse using an Azure AD principal name and password.
diff --git a/mssql-plugin/docs/SqlServer-batchsink.md b/mssql-plugin/docs/SqlServer-batchsink.md
index 5d10b4bb6..b4ca1cbc5 100644
--- a/mssql-plugin/docs/SqlServer-batchsink.md
+++ b/mssql-plugin/docs/SqlServer-batchsink.md
@@ -46,6 +46,14 @@ an Azure SQL Database/Data Warehouse using an Azure AD principal name and passwo
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the database connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
+
+For more details on the Transaction Isolation Levels supported in SQL Server, refer to the [SQL Server documentation](https://learn.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver16)
+
**Instance Name:** SQL Server instance name to connect to. When it is not specified, a
connection is made to the default instance. For the case where both the instanceName and port are specified,
see the notes for port. If you specify a Virtual Network Name in the Server connection property, you cannot
diff --git a/mssql-plugin/docs/SqlServer-batchsource.md b/mssql-plugin/docs/SqlServer-batchsource.md
index c8e30f77e..5c917621c 100644
--- a/mssql-plugin/docs/SqlServer-batchsource.md
+++ b/mssql-plugin/docs/SqlServer-batchsource.md
@@ -56,6 +56,14 @@ an Azure SQL Database/Data Warehouse using an Azure AD principal name and passwo
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the database connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
+
+For more details on the Transaction Isolation Levels supported in SQL Server, refer to the [SQL Server documentation](https://learn.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver16)
+
**Instance Name:** SQL Server instance name to connect to. When it is not specified, a
connection is made to the default instance. For the case where both the instanceName and port are specified,
see the notes for port. If you specify a Virtual Network Name in the Server connection property, you cannot
diff --git a/mssql-plugin/pom.xml b/mssql-plugin/pom.xml
index f5fc81a93..376a6bc3f 100644
--- a/mssql-plugin/pom.xml
+++ b/mssql-plugin/pom.xml
@@ -20,17 +20,51 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Microsoft SQL Server plugin
mssql-plugin
4.0.0
+ Microsoft SQL Server plugin database plugins
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
+ provided
io.cdap.plugin
@@ -40,10 +74,12 @@
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
com.google.guava
guava
+ ${guava.version}
@@ -57,18 +93,26 @@
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
+ test
junit
junit
+ ${junit.version}
+ test
org.mockito
mockito-core
+ ${mockito.version}
+ test
com.microsoft.sqlserver
@@ -76,11 +120,6 @@
8.2.1.jre8
test
-
- io.cdap.cdap
- cdap-api
- provided
-
org.jetbrains
annotations
diff --git a/mssql-plugin/src/e2e-test/resources/errorMessage.properties b/mssql-plugin/src/e2e-test/resources/errorMessage.properties
index 00721f148..c752d6ec1 100644
--- a/mssql-plugin/src/e2e-test/resources/errorMessage.properties
+++ b/mssql-plugin/src/e2e-test/resources/errorMessage.properties
@@ -13,11 +13,11 @@ errorMessagenumofSplit=Split-By Field Name must be specified if Number of Splits
errorMessageInvalidSinkDatabase=Exception while trying to validate schema of database table
errorMessageInvalidSinkTableName=Table 'Table123@' does not exist.
errormessageBlankHost=Exception while trying to validate schema of database table
-errorMessageInvalidTableName=Spark program 'phase-1' failed with error: Errors were encountered during validation. \
- Table 'Table123@' does not exist.. Please check the system logs for more details.
+errorMessageInvalidTableName=Spark program 'phase-1' failed with error: Stage 'SQL Server2' encountered : io.cdap.cdap.etl.api.validation.ValidationException: \
+ Errors were encountered during validation. Table 'Table123@' does not exist.. Please check the system logs for more details.
errorMessageInvalidCredentials=Spark program 'phase-1' failed with error: Unable to create config for batchsink SqlServer \
'connection' is invalid: Failed to assign value
-errorMessageInvalidsourcetable=Spark program 'phase-1' failed with error: Incorrect syntax near the keyword 'table'.. \
- Please check the system logs for more details.
-errorMessageInvalidCredentialSource=Spark program 'phase-1' failed with error: Plugin with id SQL \
- Server:source.jdbc.sqlserver does not exist in program phase-1 of application
+errorMessageInvalidsourcetable=Spark program 'phase-1' failed with error: Stage 'SQL Server' encountered : io.cdap.cdap.api.exception.ProgramFailureException: \
+ Error occurred while trying to get schema from database.Error message: 'Incorrect syntax near the keyword 'table'.'.
+errorMessageInvalidCredentialSource=Spark program 'phase-1' failed with error: Stage 'SQL Server' encountered : java.lang.IllegalArgumentException: \
+ Plugin with id SQL Server:source.jdbc.sqlserver does not exist in program phase-1 of application
diff --git a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerErrorDetailsProvider.java b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerErrorDetailsProvider.java
new file mode 100644
index 000000000..90d1ce7b7
--- /dev/null
+++ b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerErrorDetailsProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mssql;
+
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
+import io.cdap.plugin.util.DBUtils;
+
+/**
+ * A custom ErrorDetailsProvider for SQL Server plugins.
+ */
+public class SqlServerErrorDetailsProvider extends DBErrorDetailsProvider {
+
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.MSSQL_SUPPORTED_DOC_URL;
+ }
+}
diff --git a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSink.java b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSink.java
index 0fa8991c5..dc442d200 100644
--- a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSink.java
+++ b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSink.java
@@ -88,6 +88,16 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
return new LineageRecorder(context, asset);
}
+ @Override
+ protected String getErrorDetailsProviderClassName() {
+ return SqlServerErrorDetailsProvider.class.getName();
+ }
+
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.MSSQL_SUPPORTED_DOC_URL;
+ }
+
/**
* MSSQL action configuration.
*/
@@ -167,6 +177,11 @@ public Map getDBSpecificArguments() {
packetSize, queryTimeout);
}
+ @Override
+ public String getTransactionIsolationLevel() {
+ return connection.getTransactionIsolationLevel();
+ }
+
@Override
public String getConnectionString() {
return String.format(SqlServerConstants.SQL_SERVER_CONNECTION_STRING_FORMAT,
diff --git a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java
index 9603b24db..004532064 100644
--- a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java
+++ b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java
@@ -75,6 +75,11 @@ protected Class extends DBWritable> getDBRecordType() {
return SqlServerSourceDBRecord.class;
}
+ @Override
+ protected String getErrorDetailsProviderClassName() {
+ return SqlServerErrorDetailsProvider.class.getName();
+ }
+
@Override
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
String fqn = DBUtils.constructFQN("mssql",
@@ -85,6 +90,11 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
return new LineageRecorder(context, asset);
}
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.MSSQL_SUPPORTED_DOC_URL;
+ }
+
/**
* MSSQL source config.
*/
@@ -188,6 +198,11 @@ public List getInitQueries() {
return Collections.emptyList();
}
+ @Override
+ public String getTransactionIsolationLevel() {
+ return connection.getTransactionIsolationLevel();
+ }
+
@Override
public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
diff --git a/mssql-plugin/widgets/SQL Server-connector.json b/mssql-plugin/widgets/SQL Server-connector.json
index 171076295..c326cd81d 100644
--- a/mssql-plugin/widgets/SQL Server-connector.json
+++ b/mssql-plugin/widgets/SQL Server-connector.json
@@ -64,6 +64,20 @@
"widget-type": "password",
"label": "Password",
"name": "password"
+ },
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_UNCOMMITTED",
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
}
]
},
diff --git a/mssql-plugin/widgets/SqlServer-batchsink.json b/mssql-plugin/widgets/SqlServer-batchsink.json
index 260c66259..fb20cad9d 100644
--- a/mssql-plugin/widgets/SqlServer-batchsink.json
+++ b/mssql-plugin/widgets/SqlServer-batchsink.json
@@ -84,6 +84,20 @@
"label": "Password",
"name": "password"
},
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_UNCOMMITTED",
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
+ },
{
"widget-type": "keyvalue",
"label": "Connection Arguments",
@@ -280,6 +294,10 @@
{
"type": "property",
"name": "connectionArguments"
+ },
+ {
+ "type": "property",
+ "name": "transactionIsolationLevel"
}
]
},
diff --git a/mssql-plugin/widgets/SqlServer-batchsource.json b/mssql-plugin/widgets/SqlServer-batchsource.json
index dad5f4708..b3494e485 100644
--- a/mssql-plugin/widgets/SqlServer-batchsource.json
+++ b/mssql-plugin/widgets/SqlServer-batchsource.json
@@ -84,6 +84,20 @@
"label": "Password",
"name": "password"
},
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_UNCOMMITTED",
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
+ },
{
"widget-type": "keyvalue",
"label": "Connection Arguments",
@@ -316,6 +330,10 @@
{
"type": "property",
"name": "connectionArguments"
+ },
+ {
+ "type": "property",
+ "name": "transactionIsolationLevel"
}
]
},
diff --git a/mysql-plugin/docs/MySQL-connector.md b/mysql-plugin/docs/MySQL-connector.md
index fb5c1fbb8..f586084c1 100644
--- a/mysql-plugin/docs/MySQL-connector.md
+++ b/mysql-plugin/docs/MySQL-connector.md
@@ -22,6 +22,14 @@ authentication. Optional for databases that do not require authentication.
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the databse connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
+
+For more details on the Transaction Isolation Levels supported in MySQL, refer to the [MySQL documentation](https://dev.mysql.com/doc/refman/8.4/en/innodb-transaction-isolation-levels.html)
+
**Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments
will be passed to the JDBC driver, as connection arguments, for JDBC drivers that may need additional configurations.
This is a semicolon-separated list of key-value pairs, where each pair is separated by a equals '=' and specifies
diff --git a/mysql-plugin/docs/Mysql-batchsink.md b/mysql-plugin/docs/Mysql-batchsink.md
index b28a28618..46a763f9d 100644
--- a/mysql-plugin/docs/Mysql-batchsink.md
+++ b/mysql-plugin/docs/Mysql-batchsink.md
@@ -39,6 +39,14 @@ You also can use the macro function ${conn(connection-name)}.
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the databse connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
+
+For more details on the Transaction Isolation Levels supported in MySQL, refer to the [MySQL documentation](https://dev.mysql.com/doc/refman/8.4/en/innodb-transaction-isolation-levels.html)
+
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
diff --git a/mysql-plugin/docs/Mysql-batchsource.md b/mysql-plugin/docs/Mysql-batchsource.md
index 010e08216..552bb5504 100644
--- a/mysql-plugin/docs/Mysql-batchsource.md
+++ b/mysql-plugin/docs/Mysql-batchsource.md
@@ -49,6 +49,14 @@ For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is s
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the database connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
+
+For more details on the Transaction Isolation Levels supported in MySQL, refer to the [MySQL documentation](https://dev.mysql.com/doc/refman/8.4/en/innodb-transaction-isolation-levels.html)
+
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
diff --git a/mysql-plugin/pom.xml b/mysql-plugin/pom.xml
index bb5caf4c0..d2ea17f41 100644
--- a/mysql-plugin/pom.xml
+++ b/mysql-plugin/pom.xml
@@ -20,17 +20,51 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Mysql plugin
mysql-plugin
4.0.0
+ Mysql database plugins
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
+ provided
io.cdap.plugin
@@ -40,10 +74,12 @@
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
com.google.guava
guava
+ ${guava.version}
@@ -57,23 +93,26 @@
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
+ test
junit
junit
-
-
- io.cdap.cdap
- cdap-api
- provided
+ ${junit.version}
+ test
org.mockito
mockito-core
+ ${mockito.version}
+ test
mysql
diff --git a/mysql-plugin/src/e2e-test/features/mysqlsource/RunTime.feature b/mysql-plugin/src/e2e-test/features/mysqlsource/RunTime.feature
index 1ad2f8cc1..0ea426da0 100644
--- a/mysql-plugin/src/e2e-test/features/mysqlsource/RunTime.feature
+++ b/mysql-plugin/src/e2e-test/features/mysqlsource/RunTime.feature
@@ -142,7 +142,7 @@ Feature: MySQL Source - Run time scenarios
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
- Then Wait till pipeline preview is in running state
+ Then Wait till pipeline preview is in running state and check if any error occurs
Then Open and capture pipeline preview logs
Then Verify the preview run status of pipeline in the logs is "failed"
diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlErrorDetailsProvider.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlErrorDetailsProvider.java
index 251f0fc74..ca9a2b928 100644
--- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlErrorDetailsProvider.java
+++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlErrorDetailsProvider.java
@@ -17,7 +17,7 @@
package io.cdap.plugin.mysql;
import io.cdap.cdap.api.exception.ErrorType;
-import io.cdap.plugin.db.DBErrorDetailsProvider;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.util.DBUtils;
/**
@@ -31,7 +31,7 @@ protected String getExternalDocumentationLink() {
}
@Override
- protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) {
+ protected ErrorType getErrorTypeFromErrorCodeAndSqlState(int errorCode, String sqlState) {
// https://dev.mysql.com/doc/refman/9.0/en/error-message-elements.html#error-code-ranges
if (errorCode >= 1000 && errorCode <= 5999) {
return ErrorType.USER;
diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java
index 42488b31e..0a9257a0a 100644
--- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java
+++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java
@@ -114,6 +114,11 @@ protected String getErrorDetailsProviderClassName() {
return MysqlErrorDetailsProvider.class.getName();
}
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.MYSQL_SUPPORTED_DOC_URL;
+ }
+
/**
* MySQL action configuration.
*/
@@ -189,6 +194,11 @@ public Map getDBSpecificArguments() {
trustCertificateKeyStorePassword, false);
}
+ @Override
+ public String getTransactionIsolationLevel() {
+ return connection.getTransactionIsolationLevel();
+ }
+
@Override
public MysqlConnectorConfig getConnection() {
return connection;
diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java
index 971b76809..38642468c 100644
--- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java
+++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java
@@ -197,6 +197,11 @@ public MysqlConnectorConfig getConnection() {
return connection;
}
+ @Override
+ public String getTransactionIsolationLevel() {
+ return connection.getTransactionIsolationLevel();
+ }
+
@Override
public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
diff --git a/mysql-plugin/widgets/MySQL-connector.json b/mysql-plugin/widgets/MySQL-connector.json
index 9064d1bf6..f60f5526f 100644
--- a/mysql-plugin/widgets/MySQL-connector.json
+++ b/mysql-plugin/widgets/MySQL-connector.json
@@ -30,6 +30,20 @@
"widget-attributes": {
"default": "3306"
}
+ },
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_UNCOMMITTED",
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
}
]
},
diff --git a/mysql-plugin/widgets/Mysql-batchsink.json b/mysql-plugin/widgets/Mysql-batchsink.json
index c525ead40..58596aae2 100644
--- a/mysql-plugin/widgets/Mysql-batchsink.json
+++ b/mysql-plugin/widgets/Mysql-batchsink.json
@@ -65,6 +65,20 @@
"label": "Password",
"name": "password"
},
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_UNCOMMITTED",
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
+ },
{
"widget-type": "keyvalue",
"label": "Connection Arguments",
@@ -225,6 +239,10 @@
"type": "property",
"name": "password"
},
+ {
+ "type": "property",
+ "name": "transactionIsolationLevel"
+ },
{
"type": "property",
"name": "host"
diff --git a/mysql-plugin/widgets/Mysql-batchsource.json b/mysql-plugin/widgets/Mysql-batchsource.json
index 9175bd5ed..506e837f7 100644
--- a/mysql-plugin/widgets/Mysql-batchsource.json
+++ b/mysql-plugin/widgets/Mysql-batchsource.json
@@ -65,6 +65,20 @@
"label": "Password",
"name": "password"
},
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_UNCOMMITTED",
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
+ },
{
"widget-type": "keyvalue",
"label": "Connection Arguments",
@@ -277,6 +291,10 @@
"type": "property",
"name": "password"
},
+ {
+ "type": "property",
+ "name": "transactionIsolationLevel"
+ },
{
"type": "property",
"name": "host"
diff --git a/netezza-plugin/pom.xml b/netezza-plugin/pom.xml
index f7b559439..f141c7371 100644
--- a/netezza-plugin/pom.xml
+++ b/netezza-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Netezza plugin
diff --git a/oracle-plugin/pom.xml b/oracle-plugin/pom.xml
index b21152fcd..30e4b5911 100644
--- a/oracle-plugin/pom.xml
+++ b/oracle-plugin/pom.xml
@@ -20,17 +20,51 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
Oracle plugin
oracle-plugin
4.0.0
+ Oracle database plugins
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
+ provided
io.cdap.plugin
@@ -40,10 +74,12 @@
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
com.google.guava
guava
+ ${guava.version}
@@ -57,18 +93,25 @@
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
+ test
junit
junit
+ ${junit.version}
+ test
org.hsqldb
hsqldb
+ ${hsql.version}
test
@@ -80,11 +123,8 @@
org.mockito
mockito-core
-
-
- io.cdap.cdap
- cdap-api
- provided
+ ${mockito.version}
+ test
org.glassfish
diff --git a/oracle-plugin/src/e2e-test/features/source/OracleRunTime.feature b/oracle-plugin/src/e2e-test/features/source/OracleRunTime.feature
index 2d1ca9ad1..d6ad85cd4 100644
--- a/oracle-plugin/src/e2e-test/features/source/OracleRunTime.feature
+++ b/oracle-plugin/src/e2e-test/features/source/OracleRunTime.feature
@@ -296,7 +296,7 @@ Feature: Oracle - Verify data transfer from Oracle source to BigQuery sink
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
- Then Wait till pipeline preview is in running state
+ Then Wait till pipeline preview is in running state and check if any error occurs
Then Verify the preview run status of pipeline in the logs is "failed"
@ORACLE_SOURCE_TEST @BQ_SINK_TEST
@@ -338,7 +338,9 @@ Feature: Oracle - Verify data transfer from Oracle source to BigQuery sink
And Save and Deploy Pipeline
And Run the Pipeline in Runtime
And Wait till pipeline is in running state
+ And Open and capture logs
And Verify the pipeline status is "Failed"
+ And Close the pipeline logs
Then Open Pipeline logs and verify Log entries having below listed Level and Message:
| Level | Message |
| ERROR | errorLogsMessageInvalidBoundingQuery |
diff --git a/oracle-plugin/src/e2e-test/resources/errorMessage.properties b/oracle-plugin/src/e2e-test/resources/errorMessage.properties
index b981faf81..e44f4c00a 100644
--- a/oracle-plugin/src/e2e-test/resources/errorMessage.properties
+++ b/oracle-plugin/src/e2e-test/resources/errorMessage.properties
@@ -15,5 +15,7 @@ errorMessageBlankUsername=Username is required when password is given.
errorMessageInvalidTableName=Exception while trying to validate schema of database table '"table"' for connection
errorMessageInvalidSinkDatabase=Exception while trying to validate schema of database table '"TARGETTABLE_
errorMessageInvalidHost=Exception while trying to validate schema of database table '"table"' for connection
-errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: ORA-00936: missing expression . \
- Please check the system logs for more details.
+errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'Oracle' encountered : \
+ java.io.IOException: ORA-00936: missing expression . Please check the system logs for more details.
+blank.database.message=Required property 'database' has no value.
+blank.connection.message=Exception while trying to validate schema of database table
diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java
index 3d2f7399a..16371d5c1 100644
--- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java
+++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java
@@ -112,7 +112,8 @@ protected DBConnectorPath getDBConnectorPath(String path) {
@Override
protected SchemaReader getSchemaReader(String sessionID) {
- return new OracleSourceSchemaReader(sessionID);
+ return new OracleSourceSchemaReader(sessionID, config.getTreatAsOldTimestamp(),
+ config.getTreatPrecisionlessNumAsDeci());
}
@Override
diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java
index 10022364a..cbc1e5ed2 100644
--- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java
+++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java
@@ -22,8 +22,6 @@
import io.cdap.plugin.db.TransactionIsolationLevel;
import io.cdap.plugin.db.connector.AbstractDBSpecificConnectorConfig;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
@@ -43,12 +41,14 @@ public OracleConnectorConfig(String host, int port, String user, String password
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
String connectionArguments, String connectionType, String database) {
- this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null);
+ this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null, null,
+ null);
}
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
String connectionArguments, String connectionType, String database,
- String role, Boolean useSSL) {
+ String role, Boolean useSSL, @Nullable Boolean treatAsOldTimestamp,
+ @Nullable Boolean treatPrecisionlessNumAsDeci) {
this.host = host;
this.port = port;
@@ -60,6 +60,8 @@ public OracleConnectorConfig(String host, int port, String user, String password
this.database = database;
this.role = role;
this.useSSL = useSSL;
+ this.treatAsOldTimestamp = treatAsOldTimestamp;
+ this.treatPrecisionlessNumAsDeci = treatPrecisionlessNumAsDeci;
}
@Override
@@ -81,17 +83,21 @@ public String getConnectionString() {
@Macro
private String database;
- @Name(OracleConstants.TRANSACTION_ISOLATION_LEVEL)
- @Description("The transaction isolation level for the database session.")
- @Macro
- @Nullable
- private String transactionIsolationLevel;
-
@Name(OracleConstants.USE_SSL)
@Description("Turns on SSL encryption. Connection will fail if SSL is not available")
@Nullable
public Boolean useSSL;
+ @Name(OracleConstants.TREAT_AS_OLD_TIMESTAMP)
+ @Description("A hidden field to handle timestamp as CDAP's timestamp micros or string as per old behavior.")
+ @Nullable
+ public Boolean treatAsOldTimestamp;
+
+ @Name(OracleConstants.TREAT_PRECISIONLESSNUM_AS_DECI)
+ @Description("A hidden field to handle precision less number as CDAP's decimal per old behavior.")
+ @Nullable
+ public Boolean treatPrecisionlessNumAsDeci;
+
@Override
protected int getDefaultPort() {
return 1521;
@@ -114,6 +120,14 @@ public Boolean getSSlMode() {
return useSSL != null && useSSL;
}
+ public Boolean getTreatAsOldTimestamp() {
+ return Boolean.TRUE.equals(treatAsOldTimestamp);
+ }
+
+ public Boolean getTreatPrecisionlessNumAsDeci() {
+ return Boolean.TRUE.equals(treatPrecisionlessNumAsDeci);
+ }
+
@Override
public Properties getConnectionArgumentsProperties() {
Properties prop = super.getConnectionArgumentsProperties();
@@ -124,6 +138,7 @@ public Properties getConnectionArgumentsProperties() {
return prop;
}
+ @Override
public String getTransactionIsolationLevel() {
//if null default to the highest isolation level possible
if (transactionIsolationLevel == null) {
@@ -133,16 +148,7 @@ public String getTransactionIsolationLevel() {
//This ensures that the role is mapped to the right serialization level, even w/ incorrect user input
//if role is SYSDBA or SYSOP it will map to read_committed. else serialized
return (!getRole().equals(ROLE_NORMAL)) ? TransactionIsolationLevel.Level.TRANSACTION_READ_COMMITTED.name() :
- TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
- }
-
- @Override
- public Map getAdditionalArguments() {
- Map additonalArguments = new HashMap<>();
- if (getTransactionIsolationLevel() != null) {
- additonalArguments.put(TransactionIsolationLevel.CONF_KEY, getTransactionIsolationLevel());
- }
- return additonalArguments;
+ TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
}
@Override
diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java
index dc38f80ac..cbd411175 100644
--- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java
+++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java
@@ -43,6 +43,8 @@ private OracleConstants() {
public static final String TNS_CONNECTION_TYPE = "tns";
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
public static final String USE_SSL = "useSSL";
+ public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp";
+ public static final String TREAT_PRECISIONLESSNUM_AS_DECI = "treatPrecisionlessNumAsDeci";
/**
* Constructs the Oracle connection string based on the provided connection type, host, port, and database.
diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleErrorDetailsProvider.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleErrorDetailsProvider.java
new file mode 100644
index 000000000..e0c770c9f
--- /dev/null
+++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleErrorDetailsProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.oracle;
+
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
+import io.cdap.plugin.util.DBUtils;
+
+/**
+ * A custom ErrorDetailsProvider for Oracle plugin.
+ */
+public class OracleErrorDetailsProvider extends DBErrorDetailsProvider {
+
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.ORACLE_SUPPORTED_DOC_URL;
+ }
+}
diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java
index 45afbcb51..511281e9d 100644
--- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java
+++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java
@@ -82,6 +82,15 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
return new LineageRecorder(context, asset);
}
+ @Override
+ protected String getErrorDetailsProviderClassName() {
+ return OracleErrorDetailsProvider.class.getName();
+ }
+
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.ORACLE_SUPPORTED_DOC_URL;
+ }
/**
* Oracle action configuration.
diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java
index 6df62e63e..1488a084b 100644
--- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java
+++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java
@@ -63,7 +63,12 @@ protected String createConnectionString() {
@Override
protected SchemaReader getSchemaReader() {
- return new OracleSourceSchemaReader();
+ // PLUGIN-1893 : Based on field/properties from Oracle source and Oracle connection we will pass the flag to control
+ // handle schema to make it backward compatible.
+ boolean treatAsOldTimestamp = oracleSourceConfig.getConnection().getTreatAsOldTimestamp();
+ boolean treatPrecisionlessNumAsDeci = oracleSourceConfig.getConnection().getTreatPrecisionlessNumAsDeci();
+
+ return new OracleSourceSchemaReader(null, treatAsOldTimestamp, treatPrecisionlessNumAsDeci);
}
@Override
@@ -71,6 +76,16 @@ protected Class extends DBWritable> getDBRecordType() {
return OracleSourceDBRecord.class;
}
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.ORACLE_SUPPORTED_DOC_URL;
+ }
+
+ @Override
+ protected String getErrorDetailsProviderClassName() {
+ return OracleErrorDetailsProvider.class.getName();
+ }
+
@Override
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
String fqn = DBUtils.constructFQN("oracle",
@@ -117,9 +132,11 @@ public OracleSourceConfig(String host, int port, String user, String password, S
String connectionArguments, String connectionType, String database, String role,
int defaultBatchValue, int defaultRowPrefetch,
String importQuery, Integer numSplits, int fetchSize,
- String boundingQuery, String splitBy, Boolean useSSL) {
+ String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp,
+ Boolean treatPrecisionlessNumAsDeci) {
this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments,
- connectionType, database, role, useSSL);
+ connectionType, database, role, useSSL, treatAsOldTimestamp,
+ treatPrecisionlessNumAsDeci);
this.defaultBatchValue = defaultBatchValue;
this.defaultRowPrefetch = defaultRowPrefetch;
this.fetchSize = fetchSize;
diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java
index 7d35f9bc7..dd17d2e84 100644
--- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java
+++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java
@@ -26,6 +26,7 @@
import java.sql.SQLException;
import java.sql.Types;
import java.util.Set;
+import javax.annotation.Nullable;
/**
* Oracle Source schema reader.
@@ -65,14 +66,17 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {
);
private final String sessionID;
+ private final Boolean isTimestampOldBehavior;
+ private final Boolean isPrecisionlessNumAsDecimal;
public OracleSourceSchemaReader() {
- this(null);
+ this(null, false, false);
}
-
- public OracleSourceSchemaReader(String sessionID) {
- super();
+ public OracleSourceSchemaReader(@Nullable String sessionID, boolean isTimestampOldBehavior,
+ boolean isPrecisionlessNumAsDecimal) {
this.sessionID = sessionID;
+ this.isTimestampOldBehavior = isTimestampOldBehavior;
+ this.isPrecisionlessNumAsDecimal = isPrecisionlessNumAsDecimal;
}
@Override
@@ -81,10 +85,12 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
switch (sqlType) {
case TIMESTAMP_TZ:
- return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
- case Types.TIMESTAMP:
+ return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
case TIMESTAMP_LTZ:
- return Schema.of(Schema.LogicalType.DATETIME);
+ return isTimestampOldBehavior ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)
+ : Schema.of(Schema.LogicalType.DATETIME);
+ case Types.TIMESTAMP:
+ return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME);
case BINARY_FLOAT:
return Schema.of(Schema.Type.FLOAT);
case BINARY_DOUBLE:
@@ -107,12 +113,24 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
// For a Number type without specified precision and scale, precision will be 0 and scale will be -127
if (precision == 0) {
// reference : https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832
- LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
- + "converting into STRING type to avoid any precision loss.",
- metadata.getColumnName(index),
- metadata.getColumnTypeName(index),
- metadata.getColumnName(index)));
- return Schema.of(Schema.Type.STRING);
+ if (isPrecisionlessNumAsDecimal) {
+ precision = 38;
+ scale = 0;
+ LOG.warn(String.format("%s type with undefined precision and scale is detected, "
+ + "there may be a precision loss while running the pipeline. "
+ + "Please define an output precision and scale for field '%s' to avoid "
+ + "precision loss.",
+ metadata.getColumnTypeName(index),
+ metadata.getColumnName(index)));
+ return Schema.decimalOf(precision, scale);
+ } else {
+ LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
+ + "converting into STRING type to avoid any precision loss.",
+ metadata.getColumnName(index),
+ metadata.getColumnTypeName(index),
+ metadata.getColumnName(index)));
+ return Schema.of(Schema.Type.STRING);
+ }
}
return Schema.decimalOf(precision, scale);
}
diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json
index 5eca20cc4..404262fb2 100644
--- a/oracle-plugin/widgets/Oracle-batchsource.json
+++ b/oracle-plugin/widgets/Oracle-batchsource.json
@@ -120,6 +120,44 @@
]
}
},
+ {
+ "widget-type": "hidden",
+ "label": "Treat as old timestamp",
+ "name": "treatAsOldTimestamp",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "false",
+ "options": [
+ {
+ "id": "true",
+ "label": "true"
+ },
+ {
+ "id": "false",
+ "label": "false"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Treat precision less number as Decimal(old behavior)",
+ "name": "treatPrecisionlessNumAsDeci",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "false",
+ "options": [
+ {
+ "id": "true",
+ "label": "true"
+ },
+ {
+ "id": "false",
+ "label": "false"
+ }
+ ]
+ }
+ },
{
"name": "connectionType",
"label": "Connection Type",
@@ -326,6 +364,14 @@
{
"type": "property",
"name": "transactionIsolationLevel"
+ },
+ {
+ "type": "property",
+ "name": "getTreatAsOldTimestampConn"
+ },
+ {
+ "type": "property",
+ "name": "treatPrecisionlessNumAsDeci"
}
]
},
diff --git a/oracle-plugin/widgets/Oracle-connector.json b/oracle-plugin/widgets/Oracle-connector.json
index 628027caf..013f3b240 100644
--- a/oracle-plugin/widgets/Oracle-connector.json
+++ b/oracle-plugin/widgets/Oracle-connector.json
@@ -129,6 +129,44 @@
}
]
}
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Treat as old timestamp",
+ "name": "treatAsOldTimestamp",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "false",
+ "options": [
+ {
+ "id": "true",
+ "label": "true"
+ },
+ {
+ "id": "false",
+ "label": "false"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Treat precision less number as Decimal(old behavior)",
+ "name": "treatPrecisionlessNumAsDeci",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "false",
+ "options": [
+ {
+ "id": "true",
+ "label": "true"
+ },
+ {
+ "id": "false",
+ "label": "false"
+ }
+ ]
+ }
}
]
},
diff --git a/pom.xml b/pom.xml
index fa4724a47..3a50542e0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
io.cdap.plugin
database-plugins-parent
- 1.12.0-SNAPSHOT
+ 1.12.3
pom
Database Plugins
Collection of database plugins
@@ -56,13 +56,29 @@
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
+
7
true
UTF-8
- 6.11.0-SNAPSHOT
- 2.11.1
+ 6.11.0
+ 2.13.1
13.0.1
3.3.6
2.2.4
@@ -78,23 +94,12 @@
-
- sonatype
- https://oss.sonatype.org/content/groups/public
-
sonatype-snapshots
- https://oss.sonatype.org/content/repositories/snapshots
+ https://central.sonatype.com/repository/maven-snapshots
-
-
- sonatype
- https://oss.sonatype.org/content/groups/public/
-
-
-
@@ -349,16 +354,6 @@
-
-
- sonatype.release
- https://oss.sonatype.org/service/local/staging/deploy/maven2
-
-
- sonatype.snapshots
- https://oss.sonatype.org/content/repositories/snapshots
-
-
${testSourceLocation}
@@ -532,14 +527,14 @@
- org.sonatype.plugins
- nexus-staging-maven-plugin
- 1.6.2
+ org.sonatype.central
+ central-publishing-maven-plugin
+ 0.8.0
true
- https://oss.sonatype.org
- sonatype.release
- 655dc88dc770c3
+ sonatype.release
+ false
+ true
@@ -612,6 +607,7 @@
src/e2e-test/java
TestRunner.java
+ 31.1-jre
@@ -724,7 +720,7 @@
io.cdap.tests.e2e
cdap-e2e-framework
- 0.4.0-SNAPSHOT
+ 0.4.0
test
@@ -737,7 +733,7 @@
ch.qos.logback
logback-classic
- 1.2.8
+ 1.3.15
runtime
diff --git a/postgresql-plugin/docs/PostgreSQL-connector.md b/postgresql-plugin/docs/PostgreSQL-connector.md
index 739c678e3..fe442cbf1 100644
--- a/postgresql-plugin/docs/PostgreSQL-connector.md
+++ b/postgresql-plugin/docs/PostgreSQL-connector.md
@@ -22,6 +22,14 @@ authentication. Optional for databases that do not require authentication.
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the databse connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- Note: PostgreSQL does not implement `TRANSACTION_READ_UNCOMMITTED` as a distinct isolation level. Instead, this mode behaves identically to`TRANSACTION_READ_COMMITTED`, which is why it is not exposed as a separate option.
+
+For more details on the Transaction Isolation Levels supported in PostgreSQL, refer to the [PostgreSQL documentation](https://www.postgresql.org/docs/current/transaction-iso.html#TRANSACTION-ISO)
+
**Database:** The name of the database to connect to.
**Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments
diff --git a/postgresql-plugin/docs/Postgres-batchsink.md b/postgresql-plugin/docs/Postgres-batchsink.md
index b8a996463..82065e0fd 100644
--- a/postgresql-plugin/docs/Postgres-batchsink.md
+++ b/postgresql-plugin/docs/Postgres-batchsink.md
@@ -39,6 +39,14 @@ You also can use the macro function ${conn(connection-name)}.
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the databse connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- Note: PostgreSQL does not implement `TRANSACTION_READ_UNCOMMITTED` as a distinct isolation level. Instead, this mode behaves identically to`TRANSACTION_READ_COMMITTED`, which is why it is not exposed as a separate option.
+
+For more details on the Transaction Isolation Levels supported in PostgreSQL, refer to the [PostgreSQL documentation](https://www.postgresql.org/docs/current/transaction-iso.html#TRANSACTION-ISO)
+
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
diff --git a/postgresql-plugin/docs/Postgres-batchsource.md b/postgresql-plugin/docs/Postgres-batchsource.md
index af359022d..559723526 100644
--- a/postgresql-plugin/docs/Postgres-batchsource.md
+++ b/postgresql-plugin/docs/Postgres-batchsource.md
@@ -49,6 +49,14 @@ For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is s
**Password:** Password to use to connect to the specified database.
+**Transaction Isolation Level** The transaction isolation level of the databse connection
+- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
+- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
+- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
+- Note: PostgreSQL does not implement `TRANSACTION_READ_UNCOMMITTED` as a distinct isolation level. Instead, this mode behaves identically to`TRANSACTION_READ_COMMITTED`, which is why it is not exposed as a separate option.
+
+For more details on the Transaction Isolation Levels supported in PostgreSQL, refer to the [PostgreSQL documentation](https://www.postgresql.org/docs/current/transaction-iso.html#TRANSACTION-ISO)
+
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
diff --git a/postgresql-plugin/pom.xml b/postgresql-plugin/pom.xml
index 8f086e482..2eeb641bf 100644
--- a/postgresql-plugin/pom.xml
+++ b/postgresql-plugin/pom.xml
@@ -20,17 +20,51 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
PostgreSQL plugin
postgresql-plugin
4.0.0
+ PostgreSQL database plugins
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
+ provided
io.cdap.plugin
@@ -40,10 +74,12 @@
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
com.google.guava
guava
+ ${guava.version}
@@ -63,15 +99,14 @@
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
-
-
- io.cdap.cdap
- cdap-api
- provided
+ ${cdap.version}
+ test
org.jetbrains
@@ -82,11 +117,13 @@
org.mockito
mockito-core
+ ${mockito.version}
test
junit
junit
+ ${junit.version}
test
diff --git a/postgresql-plugin/src/e2e-test/resources/errorMessage.properties b/postgresql-plugin/src/e2e-test/resources/errorMessage.properties
index 6e1929245..f793e3be7 100644
--- a/postgresql-plugin/src/e2e-test/resources/errorMessage.properties
+++ b/postgresql-plugin/src/e2e-test/resources/errorMessage.properties
@@ -18,5 +18,5 @@ errorMessageInvalidSourceHost=SQL error while getting query schema: The connecti
errorMessageInvalidTableName=Table 'table' does not exist. Ensure table '"table"' is set correctly and that the
errorMessageInvalidSinkDatabase=Exception while trying to validate schema of database table '"targettable_
errorMessageInvalidHost=Exception while trying to validate schema of database table '"table"' for connection
-errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: The column index is out of range: 1, \
- number of columns: 0.. Please check the system logs for more details.
+errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'PostgreSQL' encountered : \
+ java.io.IOException: The column index is out of range: 1, number of columns: 0.. Please check the system logs for more details.
diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresErrorDetailsProvider.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresErrorDetailsProvider.java
index 3202a3e28..a7de4e5dc 100644
--- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresErrorDetailsProvider.java
+++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresErrorDetailsProvider.java
@@ -19,7 +19,7 @@
import com.google.common.base.Strings;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
-import io.cdap.plugin.db.DBErrorDetailsProvider;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.util.DBUtils;
import java.util.HashMap;
@@ -77,7 +77,7 @@ protected String getExternalDocumentationLink() {
}
@Override
- protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) {
+ protected ErrorType getErrorTypeFromErrorCodeAndSqlState(int errorCode, String sqlState) {
if (!Strings.isNullOrEmpty(sqlState) && sqlState.length() >= 2 &&
ERROR_CODE_TO_ERROR_TYPE.containsKey(sqlState.substring(0, 2))) {
return ERROR_CODE_TO_ERROR_TYPE.get(sqlState.substring(0, 2));
diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java
index 3becf5f27..73430c1e2 100644
--- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java
+++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java
@@ -121,6 +121,11 @@ protected String getErrorDetailsProviderClassName() {
return PostgresErrorDetailsProvider.class.getName();
}
+ @Override
+ protected String getExternalDocumentationLink() {
+ return DBUtils.POSTGRES_SUPPORTED_DOC_URL;
+ }
+
/**
* PostgreSQL action configuration.
*/
@@ -170,6 +175,11 @@ public Map getDBSpecificArguments() {
return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout));
}
+ @Override
+ public String getTransactionIsolationLevel() {
+ return connection.getTransactionIsolationLevel();
+ }
+
@Override
protected PostgresConnectorConfig getConnection() {
return connection;
diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java
index 8e3c091f9..b230f3d1e 100644
--- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java
+++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java
@@ -143,6 +143,11 @@ protected PostgresConnectorConfig getConnection() {
return connection;
}
+ @Override
+ public String getTransactionIsolationLevel() {
+ return connection.getTransactionIsolationLevel();
+ }
+
@Override
public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
diff --git a/postgresql-plugin/widgets/PostgreSQL-connector.json b/postgresql-plugin/widgets/PostgreSQL-connector.json
index 091afc972..9a7a02e14 100644
--- a/postgresql-plugin/widgets/PostgreSQL-connector.json
+++ b/postgresql-plugin/widgets/PostgreSQL-connector.json
@@ -31,6 +31,19 @@
"default": "5432"
}
},
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
+ },
{
"widget-type": "textbox",
"label": "Database",
diff --git a/postgresql-plugin/widgets/Postgres-batchsink.json b/postgresql-plugin/widgets/Postgres-batchsink.json
index 6aa2dad8a..14e6f8154 100644
--- a/postgresql-plugin/widgets/Postgres-batchsink.json
+++ b/postgresql-plugin/widgets/Postgres-batchsink.json
@@ -65,6 +65,19 @@
"label": "Password",
"name": "password"
},
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
+ },
{
"widget-type": "keyvalue",
"label": "Connection Arguments",
@@ -186,6 +199,10 @@
"type": "property",
"name": "port"
},
+ {
+ "type": "property",
+ "name": "transactionIsolationLevel"
+ },
{
"type": "property",
"name": "database"
diff --git a/postgresql-plugin/widgets/Postgres-batchsource.json b/postgresql-plugin/widgets/Postgres-batchsource.json
index 0e4ba28c1..60de4725f 100644
--- a/postgresql-plugin/widgets/Postgres-batchsource.json
+++ b/postgresql-plugin/widgets/Postgres-batchsource.json
@@ -65,6 +65,19 @@
"label": "Password",
"name": "password"
},
+ {
+ "widget-type": "select",
+ "label": "Transaction Isolation Level",
+ "name": "transactionIsolationLevel",
+ "widget-attributes": {
+ "values": [
+ "TRANSACTION_READ_COMMITTED",
+ "TRANSACTION_REPEATABLE_READ",
+ "TRANSACTION_SERIALIZABLE"
+ ],
+ "default": "TRANSACTION_SERIALIZABLE"
+ }
+ },
{
"widget-type": "keyvalue",
"label": "Connection Arguments",
@@ -206,6 +219,10 @@
"type": "property",
"name": "port"
},
+ {
+ "type": "property",
+ "name": "transactionIsolationLevel"
+ },
{
"type": "property",
"name": "database"
diff --git a/saphana-plugin/pom.xml b/saphana-plugin/pom.xml
index 26ce2e396..c40736e07 100644
--- a/saphana-plugin/pom.xml
+++ b/saphana-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
SAP HANA plugin
diff --git a/teradata-plugin/pom.xml b/teradata-plugin/pom.xml
index c351a9d96..0201805c5 100644
--- a/teradata-plugin/pom.xml
+++ b/teradata-plugin/pom.xml
@@ -21,7 +21,7 @@
database-plugins-parent
io.cdap.plugin
- 1.12.0-SNAPSHOT
+ 1.12.3
teradata-plugin