|
@@ -6,16 +6,21 @@ import com.ververica.cdc.connectors.mysql.table.StartupOptions;
|
|
|
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
|
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
|
|
+import org.apache.flink.api.common.time.Time;
|
|
|
+import org.apache.flink.configuration.Configuration;
|
|
|
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
|
|
+import org.apache.flink.streaming.api.CheckpointingMode;
|
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
|
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
+import org.apache.flink.streaming.api.graph.StreamGraph;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.boot.CommandLineRunner;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Properties;
|
|
|
+import java.io.File;
|
|
|
+import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* MySQL事件监听
|
|
@@ -30,24 +35,62 @@ public class MySqlEventListener implements CommandLineRunner {
|
|
|
@Autowired
|
|
|
private FlinkConfig flinkConfig;
|
|
|
|
|
|
+ private static final String JOB_ID = "mysql-cdc-sync-job";
|
|
|
+ private static final String CHECKPOINT_DIR = "file:///c:/data/flink/checkpoints";
|
|
|
+
|
|
|
@Override
|
|
|
public void run(String... args) throws Exception {
|
|
|
+
|
|
|
+ // 创建环境并配置
|
|
|
+ final StreamExecutionEnvironment env = configureEnvironment();
|
|
|
+
|
|
|
+
|
|
|
FlinkConfig.MySqlConfig mysqlConfig = flinkConfig.getCdc().getMysql();
|
|
|
+
|
|
|
+ // Debezium 配置
|
|
|
+ Properties debeziumProps = createDebeziumProperties();
|
|
|
+
|
|
|
+ // 创建 MySQL CDC 源
|
|
|
+ MySqlSource<String> mySqlSource = createMySqlSource(mysqlConfig, debeziumProps);
|
|
|
+
|
|
|
+
|
|
|
+ // 创建数据流
|
|
|
+ DataStreamSource<String> streamSource = env.fromSource(
|
|
|
+ mySqlSource,
|
|
|
+ WatermarkStrategy.noWatermarks(),
|
|
|
+ "MySQL Source"
|
|
|
+ ).setParallelism(flinkConfig.getParallelism().getSource());
|
|
|
+
|
|
|
+ // 添加自定义接收器
|
|
|
+ CustomSink customSink = new CustomSink();
|
|
|
+ streamSource.addSink(customSink)
|
|
|
+ .name("syncToRedis")
|
|
|
+ .setParallelism(flinkConfig.getParallelism().getSink());
|
|
|
+ StreamGraph streamGraph = env.getStreamGraph();
|
|
|
+ streamGraph.setJobName(JOB_ID);
|
|
|
+
|
|
|
+ // 执行作业
|
|
|
+ env.execute(streamGraph);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Properties createDebeziumProperties() {
|
|
|
Map<String, String> configMap = new HashMap<>();
|
|
|
configMap.put("decimal.handling.mode", "string");
|
|
|
configMap.put("bigint.unsigned.handling.mode", "long");
|
|
|
- // 时间日期类型配置
|
|
|
- configMap.put("time.precision.mode", "adaptive_time_microseconds"); // TIME类型精度处理
|
|
|
- configMap.put("datetime.handling.mode", "string"); // DATETIME类型处理
|
|
|
- // 二进制数据配置
|
|
|
- configMap.put("binary.handling.mode", "bytes"); // BINARY/VARBINARY类型
|
|
|
- configMap.put("json.handling.mode", "string"); // JSON类型
|
|
|
- configMap.put("geometry.handling.mode", "string"); // 几何类型数据
|
|
|
- configMap.put("include.unknown.datatypes", "false"); // 未知数据类型处理
|
|
|
- // 将Map转换为Properties
|
|
|
- Properties debeziumProps = new Properties();
|
|
|
- debeziumProps.putAll(configMap);
|
|
|
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
|
|
|
+ configMap.put("time.precision.mode", "adaptive_time_microseconds");
|
|
|
+ configMap.put("datetime.handling.mode", "string");
|
|
|
+ configMap.put("binary.handling.mode", "bytes");
|
|
|
+ configMap.put("json.handling.mode", "string");
|
|
|
+ configMap.put("geometry.handling.mode", "string");
|
|
|
+ configMap.put("include.unknown.datatypes", "false");
|
|
|
+
|
|
|
+ Properties props = new Properties();
|
|
|
+ props.putAll(configMap);
|
|
|
+ return props;
|
|
|
+ }
|
|
|
+
|
|
|
+ private MySqlSource<String> createMySqlSource(FlinkConfig.MySqlConfig mysqlConfig, Properties debeziumProps) {
|
|
|
+ return MySqlSource.<String>builder()
|
|
|
.debeziumProperties(debeziumProps)
|
|
|
.hostname(mysqlConfig.getHostname())
|
|
|
.port(mysqlConfig.getPort())
|
|
@@ -59,28 +102,80 @@ public class MySqlEventListener implements CommandLineRunner {
|
|
|
.startupOptions(getStartupOptions(mysqlConfig.getStartupOptions()))
|
|
|
.serverTimeZone(mysqlConfig.getServerTimeZone())
|
|
|
.build();
|
|
|
+ }
|
|
|
+
|
|
|
+ private StreamExecutionEnvironment configureEnvironment() {
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
+
|
|
|
+ // 设置状态后端
|
|
|
+ try {
|
|
|
+ env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
|
|
|
+ log.info("设置状态后端为: {}", CHECKPOINT_DIR);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("设置状态后端失败", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查点配置
|
|
|
env.enableCheckpointing(flinkConfig.getCheckpoint().getInterval());
|
|
|
- DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
|
|
|
- .setParallelism(flinkConfig.getParallelism().getSource());
|
|
|
+ CheckpointConfig checkpointConfig = env.getCheckpointConfig();
|
|
|
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
|
|
|
+ checkpointConfig.setMinPauseBetweenCheckpoints(5000);
|
|
|
+ checkpointConfig.setCheckpointTimeout(60000);
|
|
|
+ checkpointConfig.setMaxConcurrentCheckpoints(1);
|
|
|
+ checkpointConfig.setTolerableCheckpointFailureNumber(3);
|
|
|
+ checkpointConfig.enableExternalizedCheckpoints(
|
|
|
+ CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
|
|
+ checkpointConfig.enableUnalignedCheckpoints();
|
|
|
|
|
|
- CustomSink customSink = new CustomSink();
|
|
|
+ // 设置重启策略
|
|
|
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
|
|
|
|
|
|
- streamSource.addSink(customSink)
|
|
|
- .name("syncToRedis")
|
|
|
- .setParallelism(flinkConfig.getParallelism().getSink());
|
|
|
- env.execute("Print MySQL Snapshot + Binlog");
|
|
|
+ log.info("Flink环境配置完成,检查点间隔: {}ms, 作业ID: {}",
|
|
|
+ flinkConfig.getCheckpoint().getInterval(), JOB_ID);
|
|
|
+
|
|
|
+ return env;
|
|
|
}
|
|
|
+
|
|
|
private StartupOptions getStartupOptions(String option) {
|
|
|
if (option == null) {
|
|
|
- return StartupOptions.latest();
|
|
|
+ return StartupOptions.earliest();
|
|
|
}
|
|
|
-
|
|
|
switch (option.toLowerCase()) {
|
|
|
case "initial": return StartupOptions.initial();
|
|
|
case "latest": return StartupOptions.latest();
|
|
|
- // 可以添加其他选项如 specificOffset, timestamp 等
|
|
|
+ case "latest-offset": return StartupOptions.latest(); // 显式处理 latest-offset
|
|
|
default: return StartupOptions.latest();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private String findValidCheckpoint(String checkpointDirPath) {
|
|
|
+ File checkpointDir = new File(checkpointDirPath);
|
|
|
+ if (!checkpointDir.exists() || !checkpointDir.isDirectory()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查找完成的检查点目录 (通常格式为 chk-xx)
|
|
|
+ File[] checkpointFiles = checkpointDir.listFiles(file ->
|
|
|
+ file.isDirectory() && file.getName().startsWith("chk-"));
|
|
|
+
|
|
|
+ if (checkpointFiles == null || checkpointFiles.length == 0) {
|
|
|
+ log.info("未找到有效的检查点");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 按修改时间排序
|
|
|
+ Arrays.sort(checkpointFiles, Comparator.comparingLong(File::lastModified).reversed());
|
|
|
+
|
|
|
+ // 检查是否有 _metadata 文件,这通常表示完整的检查点
|
|
|
+ for (File cpDir : checkpointFiles) {
|
|
|
+ File metadataFile = new File(cpDir, "_metadata");
|
|
|
+ if (metadataFile.exists()) {
|
|
|
+ log.info("找到有效检查点: {}", cpDir.getAbsolutePath());
|
|
|
+ return "file:///" + cpDir.getAbsolutePath();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("未找到包含元数据的有效检查点");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
}
|