paint-brush
The King Combination: Efficiently Completing Heterogeneous Data Integration with DolphinScheduler 3.1. and SeaTunnel 2.3.by@williamguo
New Story

The King Combination: Efficiently Completing Heterogeneous Data Integration with DolphinScheduler 3.1. and SeaTunnel 2.3.

by William Guo13mJanuary 15th, 2025
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

This is a good solution for constructing a unified big data warehouse for batch and streaming data. It is stable and efficient—once you use it, you’ll love it.
featured image - The King Combination: Efficiently Completing Heterogeneous Data Integration with DolphinScheduler 3.1. and SeaTunnel 2.3.
William Guo HackerNoon profile picture

Overview

This article mainly introduces how to use DolphinScheduler in combination with SeaTunnel to complete data synchronization between heterogeneous data sources. This is a good solution for constructing a unified big data warehouse for batch and streaming data. It is stable and efficient—once you use it, you’ll love it.

Environment Preparation

  • DolphinScheduler cluster >= 3.1.5
  • DolphinScheduler 3.1.5 source code
  • SeaTunnel cluster >= 2.3.3


For those who have not yet installed the above environments, please refer to the official website to set up the basic environment.

Configuration File Modifications

Here is an explanation: The SeaTunnel data synchronization tasks configured through DolphinScheduler are eventually assigned to a certain Worker group or Worker node of the DS cluster for execution. Therefore, you must ensure that the target Worker nodes of your DS cluster also have the SeaTunnel service installed.


This is important because the SeaTunnel task instances defined in DolphinScheduler ultimately need to call the SeaTunnel service on the Worker node to execute the local task startup command and complete task submission and execution.

Modifications to DolphinScheduler Configuration Files

Since we need to use SeaTunnel for data integration, we need to configure the installation directory of SeaTunnel in DolphinScheduler's system environment variables.


Locate the dolphinscheduler_env.sh file under the installation directory of your DolphinScheduler master node:

$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh


Set the access directory for SEATUNNEL_HOME, assigning it to your SeaTunnel installation directory:

export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5}


Save the file, and restart the DolphinScheduler cluster to synchronize the configuration changes to all api-server, master-server, and worker-server nodes.

Modifications to DolphinScheduler Source Code

Why modify DolphinScheduler's source code?

Because I am using SeaTunnel version 2.3.5 and the engine is not SeaTunnel's default engine, I am using the Spark engine, specifically version 2.4.5. The command I use to execute tasks is as follows:

$SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template


If I were using Spark 3.X, the command would be:

$SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template


However, in DolphinScheduler 3.1.5, the SeaTunnel task plugin has some issues that make it incompatible. Firstly, on the front end, the engine only supports Spark and Flink without compatibility for specific versions, so there is no option to freely choose between Spark 2, Spark 3, Flink 1.3, or Flink 1.5.


Secondly, the back-end code also has issues.

Image description

Locate the EngineEnum class, and modify the code as follows:

Image description

public enum EngineEnum {

    // FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
    // SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");
    FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"),
    FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"),
    SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"),
    SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh");

    private String command;

    EngineEnum(String command) {
        this.command = command;
    }

    public String getCommand() {
        return command;
    }
}


After completing the modifications, compile and package the DolphinScheduler source code.

Updating the SeaTunnel Task Plugin in the DolphinScheduler Cluster

After compiling and packaging the project, locate the dolphinscheduler-task-seatunnel-3.1.5.jar file in the target directory of the dolphinscheduler-task-seatunnel project. Upload it to the master node of your DolphinScheduler cluster.

Image description

Rename the existing dolphinscheduler-task-seatunnel-3.1.5.jar file in the following directories of the DS installation directory on the master node to dolphinscheduler-task-seatunnel-3.1.5.jar.20240606 (including the date for easier tracking):

  • api-server/libs
  • master-server/libs
  • worker-server/libs
  • alert-server/libs

Image description

Copy the newly compiled dolphinscheduler-task-seatunnel-3.1.5.jar file to these directories. Verify that each directory contains the updated .jar file (skip directories that do not originally have this file).


Use the distribution script on the master node to synchronize the modifications in api-server/libs, master-server/libs, worker-server/libs, and alert-server/libs to other DS nodes. After the distribution is complete, check whether the distribution was successful.


Finally, restart the DS cluster. With the above steps, we have completed the upgrade and adaptation of the SeaTunnel plugin in DolphinScheduler.

Test Verification

We define a Seatunnel data synchronization task on the workflow definition page of DolphinScheduler to complete the task of collecting Oracle database tables into a MySQL database. Let’s proceed with the operation.


Regarding the Seatunnel task configuration script file, the official documentation introduces it as follows:

Source Input Configuration Definition Explanation

Here, our input source is Oracle, so we directly look for the relevant Oracle configuration definitions in the Source section. The official documentation provides many task examples:

Simple Task Example

# Defining the runtime environment  
env {  
  parallelism = 4  
  job.mode = "BATCH"  
}  
source {  
    Jdbc {  
        url = "jdbc:oracle:thin:@datasource01:1523:xe"  
        driver = "oracle.jdbc.OracleDriver"  
        user = "root"  
        password = "123456"  
        query = "SELECT * FROM TEST_TABLE"  
    }  
}  

transform {}  

sink {  
    Console {}  
}  


Partition Column Parallel Task Example

Parallel reading is configured for the partition column and data. If you want to read the entire table, you can do this:

env {  
  parallelism = 4  
  job.mode = "BATCH"  
}  
source {  
    Jdbc {  
        url = "jdbc:oracle:thin:@datasource01:1523:xe"  
        driver = "oracle.jdbc.OracleDriver"  
        connection_check_timeout_sec = 100  
        user = "root"  
        password = "123456"  
        # Define query logic as required  
        query = "SELECT * FROM TEST_TABLE"  
        # Set partition column for parallel slicing  
        partition_column = "ID"  
        # Number of partition slices  
        partition_num = 10  
        properties {  
        database.oracle.jdbc.timezoneAsRegion = "false"  
        }  
    }  
}  
sink {  
  Console {}  
}  


Primary Key or Unique Index Parallel Task Example

Configuring table_path enables automatic splitting, and you can configure split.* to adjust the splitting strategy.

env {  
  parallelism = 4  
  job.mode = "BATCH"  
}  
source {  
    Jdbc {  
        url = "jdbc:oracle:thin:@datasource01:1523:xe"  
        driver = "oracle.jdbc.OracleDriver"  
        connection_check_timeout_sec = 100  
        user = "root"  
        password = "123456"  
        table_path = "DA.SCHEMA1.TABLE1"  
        query = "select * from SCHEMA1.TABLE1"  
        split.size = 10000  
    }  
}  

sink {  
  Console {}  
}  


Parallel Upper and Lower Bound Task Example

Specifying the upper and lower bounds of the query allows for more efficient data retrieval within the defined range.

source {  
    Jdbc {  
        url = "jdbc:oracle:thin:@datasource01:1523:xe"  
        driver = "oracle.jdbc.OracleDriver"  
        connection_check_timeout_sec = 100  
        user = "root"  
        password = "123456"  
        # Define query logic as required  
        query = "SELECT * FROM TEST_TABLE"  
        partition_column = "ID"  
        # Read start boundary  
        partition_lower_bound = 1  
        # Read end boundary  
        partition_upper_bound = 500  
        partition_num = 10  
    }  
}  


Multi-Table Reading Task Example

Configuring table_list enables automatic splitting, and you can adjust the splitting strategy via split.

env {  
  job.mode = "BATCH"  
  parallelism = 4  
}  
source {  
  Jdbc {  
    url = "jdbc:oracle:thin:@datasource01:1523:xe"  
    driver = "oracle.jdbc.OracleDriver"  
    connection_check_timeout_sec = 100  
    user = "root"  
    password = "123456"  
    "table_list" = [  
        {  
            "table_path" = "XE.TEST.USER_INFO"  
        },  
        {  
            "table_path" = "XE.TEST.YOURTABLENAME"  
        }  
    ]  
    split.size = 10000  
  }  
}  

sink {  
  Console {}  
}  


Sink Output Configuration Definition Explanation

Simple Task Example

This example defines a SeaTunnel synchronization task. FakeSource automatically generates data and sends it to JDBC Sink. FakeSource generates 16 rows of data (row.num=16), with two fields per row: name (string type) and age (int type). The target table is test_table, which also contains 16 rows.


Before running this job, you need to create the test database and test_table table in MySQL. If you haven't installed and deployed SeaTunnel, follow the instructions in the installation guide. Then run the job as described in the quick start guide.

env {  
  parallelism = 1  
  job.mode = "BATCH"  
}  

source {  
  FakeSource {  
    parallelism = 1  
    result_table_name = "fake"  
    row.num = 16  
    schema = {  
      fields {  
        name = "string"  
        age = "int"  
      }  
    }  
  }  
}  

transform {}  

sink {  
    jdbc {  
        url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"  
        driver = "com.mysql.cj.jdbc.Driver"  
        user = "root"  
        password = "123456"  
        query = "insert into test_table(name,age) values(?,?)"  
        }  
}  


Generated Output SQL Task Example

This example does not require writing complex SQL statements. You can configure the output database name and table name to generate insert statements for you automatically.

sink {  
    jdbc {  
        url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"  
        driver = "com.mysql.cj.jdbc.Driver"  
        user = "root"  
        password = "123456"  
        generate_sink_sql = true  
        database = test  
        table = test_table  
    }  
}  


Accurate Task Example

For scenarios requiring accurate writes, we ensure exactly once semantics.

sink {  
    jdbc {  
        url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"  
        driver = "com.mysql.cj.jdbc.Driver"  
        max_retries = 0  
        user = "root"  
        password = "123456"  
        query = "insert into test_table(name,age) values(?,?)"  
        is_exactly_once = "true"  
        xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"  
    }  
}  


CDC (Change Data Capture) Events

We also support CDC for change data capture. In this case, you need to configure the database, table, and primary_keys.

sink {  
    jdbc {  
        url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"  
        driver = "com.mysql.cj.jdbc.Driver"  
        user = "root"  
        password = "123456"  
        generate_sink_sql = true  
        database = test  
        table = sink_table  
        primary_keys = ["id", "name"]  
        field_ide = UPPERCASE  
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"  
        data_save_mode = "APPEND_DATA"  
    }  
}  


Complete Test Script Configuration File

Below is the complete configuration file for this example:

env {  
  parallelism = 4  
  job.mode = "BATCH"  
}  
source {  
    Jdbc {  
        url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP"  
        driver = "oracle.jdbc.OracleDriver"  
        user = "appuser001"  
        password = "appuser001"  
        query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS"  
    }  
}  

transform {}  

sink {  
    jdbc {  
        url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"  
        driver = "com.mysql.cj.jdbc.Driver"  
        user = "appuser001"  
        password = "appuser001"  
        generate_sink_sql = "true"  
        database = "hive"  
        table = "met_com_icdoperation_ls"  
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"  
        data_save_mode = "APPEND_DATA"  
    }  
}  

(Attention: The current node configuration depends on your needs. If you have not installed Hadoop, choose either the 'local' or 'client' mode here. If you have installed a Hadoop cluster, you can select to run using the 'yarn' and 'cluster' modes.)


Replace the database configuration information in the above script with your data connection configuration, then overwrite the script into the script input box shown in the figure above. Save the workflow, go online, and then start the workflow.


Verify in the corresponding database.

Original Oracle database table:

Image description

Synchronized MySQL database table:

Image description

The task ran, and the data was successfully synchronized. OK, the test passed! Next, you can expand and explore more based on this demo. The more practical experience you have, the deeper your understanding of the architecture and principles of DolphinScheduler and Seatunnel will become. Gradually, you can upgrade and extend the functionalities of these excellent open-source frameworks by extending the source code.