xdd 1 місяць тому
батько
коміт
4969c84dd5

+ 105 - 41
fs-sync/src/main/java/com/fs/fssync/listener/MySqlEventListener.java

@@ -10,7 +10,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptionsInternal;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.streaming.api.CheckpointingMode;
@@ -24,7 +23,11 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PreDestroy;
 import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * MySQL事件监听
@@ -41,22 +44,27 @@ public class MySqlEventListener implements CommandLineRunner {
 
     private static final String JOB_ID = "1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d";
     private static final String CHECKPOINT_DIR = "file:///home/software/fs-sync/data/flink/checkpoints";
+    // 文件操作用的路径(不含协议前缀)
+    private static final String CHECKPOINT_LOCAL_PATH = "/home/software/fs-sync/data/flink/checkpoints";
 
     @Override
     public void run(String... args) throws Exception {
+        // 查找最新的有效检查点
+        String latestCheckpoint = findLatestCheckpoint();
+        boolean hasValidCheckpoint = latestCheckpoint != null;
 
-        // 创建环境并配置
-        final StreamExecutionEnvironment env = configureEnvironment();
+        log.info("检查点查找结果: {}", hasValidCheckpoint ? "找到有效检查点" : "未找到有效检查点");
 
+        // 创建环境并配置
+        final StreamExecutionEnvironment env = configureEnvironment(latestCheckpoint);
 
         FlinkConfig.MySqlConfig mysqlConfig = flinkConfig.getCdc().getMysql();
 
         // Debezium 配置
         Properties debeziumProps = createDebeziumProperties();
 
-        // 创建 MySQL CDC 源
-        MySqlSource<String> mySqlSource = createMySqlSource(mysqlConfig, debeziumProps);
-
+        // 创建 MySQL CDC 源 - 根据是否有检查点决定启动模式
+        MySqlSource<String> mySqlSource = createMySqlSource(mysqlConfig, debeziumProps, hasValidCheckpoint);
 
         // 创建数据流
         DataStreamSource<String> streamSource = env.fromSource(
@@ -70,12 +78,16 @@ public class MySqlEventListener implements CommandLineRunner {
         streamSource.addSink(customSink)
                 .name("syncToRedis")
                 .setParallelism(flinkConfig.getParallelism().getSink());
+
         // 修改执行方法,捕获更多信息
         StreamGraph streamGraph = env.getStreamGraph();
         streamGraph.setJobName("MySQL-CDC-Sync-Job");
+
         // 执行前记录更多信息
-        log.info("准备执行CDC任务,作业名称: {}, 作业ID: {}, 检查点目录: {}",
-                streamGraph.getJobName(), JOB_ID, CHECKPOINT_DIR);
+        log.info("准备执行CDC任务,作业名称: {}, 作业ID: {}, 检查点目录: {}, 启动模式: {}",
+                streamGraph.getJobName(), JOB_ID, CHECKPOINT_DIR,
+                hasValidCheckpoint ? "从检查点恢复" : mysqlConfig.getStartupOptions());
+
         // 执行作业
         env.execute(streamGraph);
     }
@@ -96,7 +108,20 @@ public class MySqlEventListener implements CommandLineRunner {
         return props;
     }
 
-    private MySqlSource<String> createMySqlSource(FlinkConfig.MySqlConfig mysqlConfig, Properties debeziumProps) {
+    private MySqlSource<String> createMySqlSource(
+            FlinkConfig.MySqlConfig mysqlConfig,
+            Properties debeziumProps,
+            boolean hasValidCheckpoint) {
+
+        // 如果有有效检查点,使用LATEST模式避免重复处理数据
+        // 注意:当使用检查点恢复时,Flink会自动从上次的位置继续,
+        // 此时应使用LATEST启动选项以避免再次读取已处理的数据
+        StartupOptions startupOptions = hasValidCheckpoint
+            ? StartupOptions.latest()
+            : getStartupOptions(mysqlConfig.getStartupOptions());
+
+        log.info("MySqlSource 启动模式: {}", hasValidCheckpoint ? "LATEST (从检查点恢复)" : startupOptions);
+
         return MySqlSource.<String>builder()
                 .debeziumProperties(debeziumProps)
                 .hostname(mysqlConfig.getHostname())
@@ -106,21 +131,21 @@ public class MySqlEventListener implements CommandLineRunner {
                 .username(mysqlConfig.getUsername())
                 .password(mysqlConfig.getPassword())
                 .deserializer(new JsonDebeziumDeserializationSchema())
-                .startupOptions(getStartupOptions(mysqlConfig.getStartupOptions()))
+                .startupOptions(startupOptions)
                 .serverTimeZone(mysqlConfig.getServerTimeZone())
                 .build();
     }
 
-    private StreamExecutionEnvironment configureEnvironment() {
+    private StreamExecutionEnvironment configureEnvironment(String latestCheckpoint) {
         Configuration configuration = new Configuration();
         configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, JOB_ID);
 
-        // 添加这行 - 指定具体的检查点路径
-        String latestCheckpoint = findLatestCheckpoint();
+        // 如果找到有效检查点,设置恢复路径
         if (latestCheckpoint != null) {
             configuration.setString("execution.savepoint.path", latestCheckpoint);
             log.info("将从检查点恢复: {}", latestCheckpoint);
         }
+
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         // 设置状态后端
@@ -149,7 +174,6 @@ public class MySqlEventListener implements CommandLineRunner {
         // 设置重启策略
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
 
-
         log.info("Flink环境配置完成,检查点间隔: {}ms, 作业ID: {}",
                 flinkConfig.getCheckpoint().getInterval(), JOB_ID);
 
@@ -157,41 +181,82 @@ public class MySqlEventListener implements CommandLineRunner {
     }
 
     private String findLatestCheckpoint() {
-        File checkpointDir = new File(CHECKPOINT_DIR + JOB_ID);
-        if (!checkpointDir.exists() || !checkpointDir.isDirectory()) {
+        try {
+            // 使用Java NIO API进行更可靠的文件操作
+            Path checkpointsRootDir = Paths.get(CHECKPOINT_LOCAL_PATH);
+            if (!Files.exists(checkpointsRootDir) || !Files.isDirectory(checkpointsRootDir)) {
+                log.warn("检查点根目录不存在: {}", CHECKPOINT_LOCAL_PATH);
+                return null;
+            }
+            // 查找Job ID目录
+            Path jobDir = checkpointsRootDir.resolve(JOB_ID);
+            if (!Files.exists(jobDir) || !Files.isDirectory(jobDir)) {
+                log.warn("作业检查点目录不存在: {}", jobDir);
+                return null;
+            }
+            // 在作业目录下查找所有chk-开头的检查点目录
+            List<Path> checkpoints = Files.list(jobDir)
+                    .filter(p -> Files.isDirectory(p) && p.getFileName().toString().startsWith("chk-"))
+                    .sorted((p1, p2) -> {
+                        try {
+                            // 提取检查点ID进行比较
+                            int id1 = Integer.parseInt(p1.getFileName().toString().substring(4));
+                            int id2 = Integer.parseInt(p2.getFileName().toString().substring(4));
+                            // 降序排列,最新的检查点优先
+                            return Integer.compare(id2, id1);
+                        } catch (Exception e) {
+                            log.warn("比较检查点目录时出错: {}", e.getMessage());
+                            return 0;
+                        }
+                    })
+                    .collect(Collectors.toList());
+            if (checkpoints.isEmpty()) {
+                log.warn("在作业目录下未找到检查点: {}", jobDir);
+                return null;
+            }
+            // 遍历检查点目录,查找有效的检查点
+            for (Path checkpoint : checkpoints) {
+                // 检查_metadata文件是否存在
+                Path metadata = checkpoint.resolve("_metadata");
+                if (Files.exists(metadata) && Files.isRegularFile(metadata)) {
+                    // 构建Flink可识别的检查点路径(注意检查点是相对于Job ID目录的)
+                    String checkpointPath = "file://" + checkpoint.toString();
+                    log.info("找到有效检查点: {}", checkpointPath);
+                    // 记录详细信息用于调试
+                    log.info("检查点ID: {}, 检查点目录: {}, 元数据文件: {}",
+                            checkpoint.getFileName().toString().substring(4),
+                            checkpoint,
+                            metadata);
+                    return checkpointPath;
+                }
+            }
+            log.warn("未找到有效的检查点");
             return null;
-        }
-
-        // 查找最新的检查点
-        File[] checkpoints = checkpointDir.listFiles(file ->
-                file.isDirectory() && file.getName().startsWith("chk-"));
-
-        if (checkpoints == null || checkpoints.length == 0) {
+        } catch (Exception e) {
+            log.error("查找检查点时出错", e);
             return null;
         }
-
-        // 按检查点ID排序(提取数字部分)
-        Arrays.sort(checkpoints, (f1, f2) -> {
-            int id1 = Integer.parseInt(f1.getName().substring(4));
-            int id2 = Integer.parseInt(f2.getName().substring(4));
-            return Integer.compare(id2, id1); // 降序排列
-        });
-
-        // 检查_metadata文件是否存在
-        File latest = checkpoints[0];
-        File metadata = new File(latest, "_metadata");
-        if (metadata.exists() && metadata.isFile()) {
-            return CHECKPOINT_DIR + JOB_ID + "/" + latest.getName();
+    }
+    // 从检查点目录名中提取检查点ID
+    private long extractCheckpointId(String directoryName) {
+        try {
+            // 针对不同格式的检查点目录名进行处理
+            if (directoryName.startsWith("chk-")) {
+                return Long.parseLong(directoryName.substring(4));
+            } else {
+                // 尝试直接解析整个名称
+                return Long.parseLong(directoryName);
+            }
+        } catch (NumberFormatException e) {
+            log.warn("无法从目录名提取检查点ID: {}", directoryName);
+            return 0;
         }
-
-        return null;
     }
 
     private StartupOptions getStartupOptions(String option) {
-        // 如果没有检查点,使用配置的启动模式
         log.info("将使用配置的启动模式: {}", option);
         if (option == null) {
-            return StartupOptions.earliest();
+            return StartupOptions.latest();
         }
         switch (option.toLowerCase()) {
             case "initial": return StartupOptions.initial();
@@ -200,5 +265,4 @@ public class MySqlEventListener implements CommandLineRunner {
             default: return StartupOptions.latest();
         }
     }
-
 }