|
@@ -0,0 +1,84 @@
|
|
|
+package com.fs.fssync.listener;
|
|
|
+
|
|
|
+import com.fs.fssync.config.FlinkConfig;
|
|
|
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
|
|
|
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
|
|
|
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
|
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
|
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.boot.CommandLineRunner;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Properties;
|
|
|
+
|
|
|
+/**
|
|
|
+ * MySQL事件监听
|
|
|
+ *
|
|
|
+ * @author jokey
|
|
|
+ * @since 2024-3-14
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class MySqlEventListener implements CommandLineRunner {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private CustomSink customSink;
|
|
|
+ @Autowired
|
|
|
+ private FlinkConfig flinkConfig;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(String... args) throws Exception {
|
|
|
+ FlinkConfig.MySqlConfig mysqlConfig = flinkConfig.getCdc().getMysql();
|
|
|
+ Map<String, String> configMap = new HashMap<>();
|
|
|
+ configMap.put("decimal.handling.mode", "string");
|
|
|
+ configMap.put("bigint.unsigned.handling.mode", "long");
|
|
|
+ // 时间日期类型配置
|
|
|
+ configMap.put("time.precision.mode", "adaptive_time_microseconds"); // TIME类型精度处理
|
|
|
+ configMap.put("datetime.handling.mode", "string"); // DATETIME类型处理
|
|
|
+ // 二进制数据配置
|
|
|
+ configMap.put("binary.handling.mode", "bytes"); // BINARY/VARBINARY类型
|
|
|
+ configMap.put("json.handling.mode", "string"); // JSON类型
|
|
|
+ configMap.put("geometry.handling.mode", "string"); // 几何类型数据
|
|
|
+ configMap.put("include.unknown.datatypes", "false"); // 未知数据类型处理
|
|
|
+ // 将Map转换为Properties
|
|
|
+ Properties debeziumProps = new Properties();
|
|
|
+ debeziumProps.putAll(configMap);
|
|
|
+ MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
|
|
|
+ .debeziumProperties(debeziumProps)
|
|
|
+ .hostname(mysqlConfig.getHostname())
|
|
|
+ .port(mysqlConfig.getPort())
|
|
|
+ .databaseList(mysqlConfig.getDatabaseList())
|
|
|
+ .tableList(mysqlConfig.getTableList())
|
|
|
+ .username(mysqlConfig.getUsername())
|
|
|
+ .password(mysqlConfig.getPassword())
|
|
|
+ .deserializer(new JsonDebeziumDeserializationSchema())
|
|
|
+ .startupOptions(getStartupOptions(mysqlConfig.getStartupOptions()))
|
|
|
+ .serverTimeZone(mysqlConfig.getServerTimeZone())
|
|
|
+ .build();
|
|
|
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
+ env.enableCheckpointing(flinkConfig.getCheckpoint().getInterval());
|
|
|
+ DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
|
|
|
+ .setParallelism(flinkConfig.getParallelism().getSource());
|
|
|
+
|
|
|
+ streamSource.addSink(customSink).setParallelism(flinkConfig.getParallelism().getSink());
|
|
|
+ env.execute("Print MySQL Snapshot + Binlog");
|
|
|
+ }
|
|
|
+ private StartupOptions getStartupOptions(String option) {
|
|
|
+ if (option == null) {
|
|
|
+ return StartupOptions.latest();
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (option.toLowerCase()) {
|
|
|
+ case "initial": return StartupOptions.initial();
|
|
|
+ case "latest": return StartupOptions.latest();
|
|
|
+ // 可以添加其他选项如 specificOffset, timestamp 等
|
|
|
+ default: return StartupOptions.latest();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|