xdd 2 тижнів тому
батько
коміт
0e1a66b2a8

+ 40 - 0
fs-sync/src/main/java/com/fs/fssync/config/SpringContextHolder.java

@@ -0,0 +1,40 @@
+package com.fs.fssync.config;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+
+@Component
+public class SpringContextHolder implements ApplicationContextAware, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static ApplicationContext applicationContext;
+
+    // 记录是否已初始化
+    private static volatile boolean initialized = false;
+
+    @Override
+    public void setApplicationContext(ApplicationContext context) throws BeansException {
+        applicationContext = context;
+        initialized = true;
+    }
+
+    public static ApplicationContext getApplicationContext() {
+        if (!initialized) {
+            throw new IllegalStateException("ApplicationContext has not been initialized");
+        }
+        return applicationContext;
+    }
+
+    public static <T> T getBean(Class<T> clazz) {
+        return getApplicationContext().getBean(clazz);
+    }
+
+    public static <T> T getBean(String name, Class<T> clazz) {
+        return getApplicationContext().getBean(name, clazz);
+    }
+}

+ 42 - 11
fs-sync/src/main/java/com/fs/fssync/listener/CustomSink.java

@@ -1,10 +1,19 @@
 package com.fs.fssync.listener;
 
+import cn.hutool.extra.spring.SpringUtil;
 import com.alibaba.fastjson.JSON;
+import com.fs.fssync.config.SpringContextHolder;
+import com.fs.fssync.sink.CdcSinkStrategy;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
+import java.io.Serializable;
+import java.util.Map;
+
 /**
  * Flink CDC 数据处理
  *
@@ -12,23 +21,45 @@ import org.springframework.stereotype.Component;
  * @since 2024-3-14
  */
 @Slf4j
-@Component
-public class CustomSink extends RichSinkFunction<String> {
+public class CustomSink extends RichSinkFunction<String> implements Serializable {
 
     @Override
     public void invoke(String value, Context context) {
         log.info("收到变更原始数据:{}", value);
+        try {
+            // 从JSON中提取表名信息
+            String source = JSON.parseObject(value).getString("source");
+            String table = JSON.parseObject(source).getString("table");
 
-        String op = JSON.parseObject(value).getString("op");
-        if ("c".equals(op)) {
-            log.info("新增了一条数据 ...");
-        } else if ("u".equals(op)) {
-            log.info("更新了一条数据 ...");
-        } else if ("d".equals(op)) {
-            log.info("删除了一条数据 ...");
-        } else {
-            log.error("{} 未知的操作类型!", op);
+            // 根据表名查找对应的策略
+            processBySinkStrategy(table, value);
+        } catch (Exception e) {
+            log.error("处理CDC数据失败", e);
         }
     }
 
+    private void processBySinkStrategy(String table, String value) {
+        // 获取所有实现了CdcSinkStrategy的Bean
+        Map<String, CdcSinkStrategy> strategies =
+                SpringContextHolder.getApplicationContext().getBeansOfType(CdcSinkStrategy.class);
+
+
+        for (CdcSinkStrategy strategy : strategies.values()) {
+            try {
+                if (isStrategyForTable(strategy, table)) {
+                    strategy.process(value);
+                    return;
+                }
+            } catch (Exception e) {
+                log.error("策略处理失败", e);
+            }
+        }
+
+        log.warn("未找到表 {} 的处理策略", table);
+    }
+
+    private boolean isStrategyForTable(CdcSinkStrategy strategy, String table) {
+
+        return strategy.canHandle(table.toLowerCase());
+    }
 }

+ 5 - 3
fs-sync/src/main/java/com/fs/fssync/listener/MySqlEventListener.java

@@ -27,8 +27,6 @@ import java.util.Properties;
 @Component
 public class MySqlEventListener implements CommandLineRunner {
 
-    @Autowired
-    private CustomSink customSink;
     @Autowired
     private FlinkConfig flinkConfig;
 
@@ -66,7 +64,11 @@ public class MySqlEventListener implements CommandLineRunner {
         DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                 .setParallelism(flinkConfig.getParallelism().getSource());
 
-        streamSource.addSink(customSink).setParallelism(flinkConfig.getParallelism().getSink());
+        CustomSink customSink = new CustomSink();
+
+        streamSource.addSink(customSink)
+                .name("syncToRedis")
+                .setParallelism(flinkConfig.getParallelism().getSink());
         env.execute("Print MySQL Snapshot + Binlog");
     }
     private StartupOptions getStartupOptions(String option) {

+ 11 - 5
fs-sync/src/main/java/com/fs/fssync/sink/AbstractCdcSinkStrategy.java

@@ -1,10 +1,7 @@
 package com.fs.fssync.sink;
 
 import cn.hutool.extra.spring.SpringUtil;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.PropertyNamingStrategy;
-import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.*;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -22,6 +19,9 @@ public abstract class AbstractCdcSinkStrategy<T> implements CdcSinkStrategy<T> {
         this.objectMapper = new ObjectMapper();
         this.objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
         this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+        // 添加此行以忽略未知属性
+        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     }
 
     @Override
@@ -66,10 +66,16 @@ public abstract class AbstractCdcSinkStrategy<T> implements CdcSinkStrategy<T> {
         }
     }
 
+    protected String getIdField(){
+        return "id";
+    }
+
     protected Object getIdValue(T entity) throws Exception {
         // 获取ID字段的值,可以改为使用反射工具类
-        Field field = getEntityClass().getDeclaredField("id");
+        Field field = getEntityClass().getDeclaredField(getIdField());
         field.setAccessible(true);
         return field.get(entity);
     }
+
+
 }

+ 8 - 1
fs-sync/src/main/java/com/fs/fssync/sink/impl/FsUserSinkStrategy.java

@@ -2,8 +2,9 @@ package com.fs.fssync.sink.impl;
 
 import com.fs.fssync.sink.AbstractCdcSinkStrategy;
 import com.fs.store.domain.FsUser;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
-
+@Slf4j
 @Component
 public class FsUserSinkStrategy extends AbstractCdcSinkStrategy<FsUser> {
 
@@ -19,6 +20,12 @@ public class FsUserSinkStrategy extends AbstractCdcSinkStrategy<FsUser> {
 
     @Override
     public String getKeyPrefix() {
+        log.info("获取fs_user前缀");
         return "fs:user:";
     }
+
+    @Override
+    protected String getIdField() {
+        return "userId";
+    }
 }

+ 0 - 13
fs-sync/src/test/java/com/fs/fssync/FsSyncApplicationTests.java

@@ -1,13 +0,0 @@
-package com.fs.fssync;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class FsSyncApplicationTests {
-
-    @Test
-    void contextLoads() {
-    }
-
-}