This article mainly introduces how to use DolphinScheduler in combination with SeaTunnel to complete data synchronization between heterogeneous data sources. This is a good solution for constructing a unified big data warehouse for batch and streaming data. It is stable and efficient—once you use it, you’ll love it.
For those who have not yet installed the above environments, please refer to the official website to set up the basic environment.
Here is an explanation: The SeaTunnel data synchronization tasks configured through DolphinScheduler are eventually assigned to a certain Worker group or Worker node of the DS cluster for execution. Therefore, you must ensure that the target Worker nodes of your DS cluster also have the SeaTunnel service installed.
This is important because the SeaTunnel task instances defined in DolphinScheduler ultimately need to call the SeaTunnel service on the Worker node to execute the local task startup command and complete task submission and execution.
Since we need to use SeaTunnel for data integration, we need to configure the installation directory of SeaTunnel in DolphinScheduler's system environment variables.
Locate the dolphinscheduler_env.sh
file under the installation directory of your DolphinScheduler master node:
$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh
Set the access directory for SEATUNNEL_HOME
, assigning it to your SeaTunnel installation directory:
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5}
Save the file, and restart the DolphinScheduler cluster to synchronize the configuration changes to all api-server
, master-server
, and worker-server
nodes.
Why modify DolphinScheduler's source code?
Because I am using SeaTunnel version 2.3.5 and the engine is not SeaTunnel's default engine, I am using the Spark engine, specifically version 2.4.5. The command I use to execute tasks is as follows:
$SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template
If I were using Spark 3.X, the command would be:
$SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template
However, in DolphinScheduler 3.1.5, the SeaTunnel task plugin has some issues that make it incompatible. Firstly, on the front end, the engine only supports Spark and Flink without compatibility for specific versions, so there is no option to freely choose between Spark 2, Spark 3, Flink 1.3, or Flink 1.5.
Secondly, the back-end code also has issues.
Locate the EngineEnum
class, and modify the code as follows:
public enum EngineEnum {
// FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
// SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");
FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"),
FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"),
SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"),
SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh");
private String command;
EngineEnum(String command) {
this.command = command;
}
public String getCommand() {
return command;
}
}
After completing the modifications, compile and package the DolphinScheduler source code.
After compiling and packaging the project, locate the dolphinscheduler-task-seatunnel-3.1.5.jar
file in the target
directory of the dolphinscheduler-task-seatunnel
project. Upload it to the master node of your DolphinScheduler cluster.
Rename the existing dolphinscheduler-task-seatunnel-3.1.5.jar
file in the following directories of the DS installation directory on the master node to dolphinscheduler-task-seatunnel-3.1.5.jar.20240606
(including the date for easier tracking):
api-server/libs
master-server/libs
worker-server/libs
alert-server/libs
Copy the newly compiled dolphinscheduler-task-seatunnel-3.1.5.jar
file to these directories. Verify that each directory contains the updated .jar
file (skip directories that do not originally have this file).
Use the distribution script on the master node to synchronize the modifications in api-server/libs
, master-server/libs
, worker-server/libs
, and alert-server/libs
to other DS nodes. After the distribution is complete, check whether the distribution was successful.
Finally, restart the DS cluster. With the above steps, we have completed the upgrade and adaptation of the SeaTunnel plugin in DolphinScheduler.
We define a Seatunnel data synchronization task on the workflow definition page of DolphinScheduler to complete the task of collecting Oracle database tables into a MySQL database. Let’s proceed with the operation.
Regarding the Seatunnel task configuration script file, the official documentation introduces it as follows:
Here, our input source is Oracle, so we directly look for the relevant Oracle configuration definitions in the Source section. The official documentation provides many task examples:
# Defining the runtime environment
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = "root"
password = "123456"
query = "SELECT * FROM TEST_TABLE"
}
}
transform {}
sink {
Console {}
}
Parallel reading is configured for the partition column and data. If you want to read the entire table, you can do this:
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
# Define query logic as required
query = "SELECT * FROM TEST_TABLE"
# Set partition column for parallel slicing
partition_column = "ID"
# Number of partition slices
partition_num = 10
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
}
}
sink {
Console {}
}
Configuring table_path
enables automatic splitting, and you can configure split.*
to adjust the splitting strategy.
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
table_path = "DA.SCHEMA1.TABLE1"
query = "select * from SCHEMA1.TABLE1"
split.size = 10000
}
}
sink {
Console {}
}
Specifying the upper and lower bounds of the query allows for more efficient data retrieval within the defined range.
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
# Define query logic as required
query = "SELECT * FROM TEST_TABLE"
partition_column = "ID"
# Read start boundary
partition_lower_bound = 1
# Read end boundary
partition_upper_bound = 500
partition_num = 10
}
}
Configuring table_list
enables automatic splitting, and you can adjust the splitting strategy via split.
env {
job.mode = "BATCH"
parallelism = 4
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
"table_list" = [
{
"table_path" = "XE.TEST.USER_INFO"
},
{
"table_path" = "XE.TEST.YOURTABLENAME"
}
]
split.size = 10000
}
}
sink {
Console {}
}
This example defines a SeaTunnel synchronization task. FakeSource automatically generates data and sends it to JDBC Sink. FakeSource generates 16 rows of data (row.num=16
), with two fields per row: name
(string type) and age
(int type). The target table is test_table
, which also contains 16 rows.
Before running this job, you need to create the test
database and test_table
table in MySQL. If you haven't installed and deployed SeaTunnel, follow the instructions in the installation guide. Then run the job as described in the quick start guide.
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
transform {}
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
}
This example does not require writing complex SQL statements. You can configure the output database name and table name to generate insert statements for you automatically.
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
generate_sink_sql = true
database = test
table = test_table
}
}
For scenarios requiring accurate writes, we ensure exactly once semantics.
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
is_exactly_once = "true"
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}
}
We also support CDC for change data capture. In this case, you need to configure the database, table, and primary_keys
.
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
generate_sink_sql = true
database = test
table = sink_table
primary_keys = ["id", "name"]
field_ide = UPPERCASE
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}
Below is the complete configuration file for this example:
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP"
driver = "oracle.jdbc.OracleDriver"
user = "appuser001"
password = "appuser001"
query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS"
}
}
transform {}
sink {
jdbc {
url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "appuser001"
password = "appuser001"
generate_sink_sql = "true"
database = "hive"
table = "met_com_icdoperation_ls"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}
(Attention: The current node configuration depends on your needs. If you have not installed Hadoop, choose either the 'local' or 'client' mode here. If you have installed a Hadoop cluster, you can select to run using the 'yarn' and 'cluster' modes.)
Replace the database configuration information in the above script with your data connection configuration, then overwrite the script into the script input box shown in the figure above. Save the workflow, go online, and then start the workflow.
Verify in the corresponding database.
Original Oracle database table:
Synchronized MySQL database table:
The task ran, and the data was successfully synchronized. OK, the test passed! Next, you can expand and explore more based on this demo. The more practical experience you have, the deeper your understanding of the architecture and principles of DolphinScheduler and Seatunnel will become. Gradually, you can upgrade and extend the functionalities of these excellent open-source frameworks by extending the source code.