dongdong.xiang пре 2 месеци
родитељ
комит
b282fc3ffa

+ 13 - 7
fs-sync/pom.xml

@@ -2,7 +2,7 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
-    <groupId>com.example</groupId>
+    <groupId>com.fs</groupId>
     <artifactId>fs-sync</artifactId>
     <version>0.0.1-SNAPSHOT</version>
     <name>fs-sync</name>
@@ -64,16 +64,22 @@
     <build>
         <plugins>
             <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.8.1</version>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>2.1.1.RELEASE</version>
                 <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                    <encoding>UTF-8</encoding>
+                    <fork>true</fork> <!-- 如果没有该配置,devtools不会生效 -->
                 </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
             </plugin>
         </plugins>
     </build>
 
+
 </project>

+ 60 - 37
fs-sync/src/main/java/com/fs/fssync/listener/MySqlEventListener.java

@@ -9,7 +9,10 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -19,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PreDestroy;
 import java.io.File;
 import java.util.*;
 
@@ -35,8 +39,8 @@ public class MySqlEventListener implements CommandLineRunner {
     @Autowired
     private FlinkConfig flinkConfig;
 
-    private static final String JOB_ID = "mysql-cdc-sync-job";
-    private static final String CHECKPOINT_DIR = "file:///c:/data/flink/checkpoints";
+    private static final String JOB_ID = "1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d";
+    private static final String CHECKPOINT_DIR = "d:/data/flink/checkpoints";
 
     @Override
     public void run(String... args) throws Exception {
@@ -66,9 +70,12 @@ public class MySqlEventListener implements CommandLineRunner {
         streamSource.addSink(customSink)
                 .name("syncToRedis")
                 .setParallelism(flinkConfig.getParallelism().getSink());
+        // 修改执行方法,捕获更多信息
         StreamGraph streamGraph = env.getStreamGraph();
-        streamGraph.setJobName(JOB_ID);
-
+        streamGraph.setJobName("MySQL-CDC-Sync-Job");
+        // 执行前记录更多信息
+        log.info("准备执行CDC任务,作业名称: {}, 作业ID: {}, 检查点目录: {}",
+                streamGraph.getJobName(), JOB_ID, CHECKPOINT_DIR);
         // 执行作业
         env.execute(streamGraph);
     }
@@ -105,11 +112,22 @@ public class MySqlEventListener implements CommandLineRunner {
     }
 
     private StreamExecutionEnvironment configureEnvironment() {
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration configuration = new Configuration();
+        configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, JOB_ID);
+
+        // 添加这行 - 指定具体的检查点路径
+        String latestCheckpoint = findLatestCheckpoint();
+        if (latestCheckpoint != null) {
+            configuration.setString("execution.savepoint.path", latestCheckpoint);
+            log.info("将从检查点恢复: {}", latestCheckpoint);
+        }
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         // 设置状态后端
         try {
-            env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
+            // 使用HashMap状态后端 (内存中计算,但检查点存储在文件系统)
+            env.setStateBackend(new HashMapStateBackend());
+            env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(CHECKPOINT_DIR));
             log.info("设置状态后端为: {}", CHECKPOINT_DIR);
         } catch (Exception e) {
             log.error("设置状态后端失败", e);
@@ -119,63 +137,68 @@ public class MySqlEventListener implements CommandLineRunner {
         env.enableCheckpointing(flinkConfig.getCheckpoint().getInterval());
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
         checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-        checkpointConfig.setMinPauseBetweenCheckpoints(5000);
+        checkpointConfig.setMinPauseBetweenCheckpoints(500);
         checkpointConfig.setCheckpointTimeout(60000);
         checkpointConfig.setMaxConcurrentCheckpoints(1);
         checkpointConfig.setTolerableCheckpointFailureNumber(3);
-        checkpointConfig.enableExternalizedCheckpoints(
+        checkpointConfig.setExternalizedCheckpointCleanup(
                 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
         checkpointConfig.enableUnalignedCheckpoints();
 
         // 设置重启策略
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
 
+
         log.info("Flink环境配置完成,检查点间隔: {}ms, 作业ID: {}",
                 flinkConfig.getCheckpoint().getInterval(), JOB_ID);
 
         return env;
     }
 
-    private StartupOptions getStartupOptions(String option) {
-        if (option == null) {
-            return StartupOptions.earliest();
-        }
-        switch (option.toLowerCase()) {
-            case "initial": return StartupOptions.initial();
-            case "latest": return StartupOptions.latest();
-            case "latest-offset": return StartupOptions.latest(); // 显式处理 latest-offset
-            default: return StartupOptions.latest();
-        }
-    }
-
-    private String findValidCheckpoint(String checkpointDirPath) {
-        File checkpointDir = new File(checkpointDirPath);
+    private String findLatestCheckpoint() {
+        File checkpointDir = new File("d:/data/flink/checkpoints/" + JOB_ID);
         if (!checkpointDir.exists() || !checkpointDir.isDirectory()) {
             return null;
         }
 
-        // 查找完成的检查点目录 (通常格式为 chk-xx)
-        File[] checkpointFiles = checkpointDir.listFiles(file ->
+        // 查找最新的检查点
+        File[] checkpoints = checkpointDir.listFiles(file ->
                 file.isDirectory() && file.getName().startsWith("chk-"));
 
-        if (checkpointFiles == null || checkpointFiles.length == 0) {
-            log.info("未找到有效的检查点");
+        if (checkpoints == null || checkpoints.length == 0) {
             return null;
         }
 
-        // 按修改时间排序
-        Arrays.sort(checkpointFiles, Comparator.comparingLong(File::lastModified).reversed());
-
-        // 检查是否有 _metadata 文件,这通常表示完整的检查点
-        for (File cpDir : checkpointFiles) {
-            File metadataFile = new File(cpDir, "_metadata");
-            if (metadataFile.exists()) {
-                log.info("找到有效检查点: {}", cpDir.getAbsolutePath());
-                return "file:///" + cpDir.getAbsolutePath();
-            }
+        // 按检查点ID排序(提取数字部分)
+        Arrays.sort(checkpoints, (f1, f2) -> {
+            int id1 = Integer.parseInt(f1.getName().substring(4));
+            int id2 = Integer.parseInt(f2.getName().substring(4));
+            return Integer.compare(id2, id1); // 降序排列
+        });
+
+        // 检查_metadata文件是否存在
+        File latest = checkpoints[0];
+        File metadata = new File(latest, "_metadata");
+        if (metadata.exists() && metadata.isFile()) {
+            return "file:///d:/data/flink/checkpoints/" + JOB_ID + "/" + latest.getName();
         }
 
-        log.info("未找到包含元数据的有效检查点");
         return null;
     }
+
+    private StartupOptions getStartupOptions(String option) {
+        // 如果没有检查点,使用配置的启动模式
+        log.info("将使用配置的启动模式: {}", option);
+        if (option == null) {
+            return StartupOptions.earliest();
+        }
+        switch (option.toLowerCase()) {
+            case "initial": return StartupOptions.initial();
+            case "latest": return StartupOptions.latest();
+            case "earliest": return StartupOptions.earliest();
+            default: return StartupOptions.latest();
+        }
+    }
+
 }

+ 4 - 4
fs-sync/src/main/resources/application.yml

@@ -1,8 +1,8 @@
 logging:
   level:
-    org.apache.flink.runtime.checkpoint: warn
-    org.apache.flink.runtime.source.coordinator.SourceCoordinator: warn
-    com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader: warn
+    org.apache.flink.runtime.checkpoint: debug
+    org.apache.flink.runtime.source.coordinator.SourceCoordinator: debug
+    com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader: info
 
 flink:
   cdc:
@@ -14,7 +14,7 @@ flink:
       username: root
       password: Rtyy_2023
       serverTimeZone: Asia/Shanghai
-      startupOptions: latest-offset # 可选值: latest, initial, specificOffset, timestamp
+      startupOptions: initial # 可选值: latest, initial, specificOffset, timestamp
   checkpoint:
     interval: 3000 # 单位:毫秒
   parallelism: