Explorar el Código

Merge remote-tracking branch 'refs/remotes/origin/master_feat_sync_20250824' into master_fhhx_20250718

# Conflicts:
#	deploy.sh
#	fs-admin/src/main/resources/application-dev.yml
#	fs-api/src/main/resources/application-dev.yml
#	fs-company/src/main/resources/application-dev.yml
#	fs-service-system/src/main/resources/application-config.yml
#	fs-user-app/src/main/resources/application-dev.yml
xdd hace 2 meses
padre
commit
49d84b0c77
Se han modificado 30 ficheros con 1231 adiciones y 7 borrados
  1. 8 2
      fs-admin/src/main/java/com/fs/core/config/ResourcesConfig.java
  2. 35 0
      fs-admin/src/main/java/com/fs/core/config/ThreadPoolTaskWrapExecutor.java
  3. 40 0
      fs-admin/src/main/java/com/fs/core/interceptor/LogInterceptor.java
  4. 61 0
      fs-admin/src/main/java/com/fs/core/util/ThreadMdcUtil.java
  5. 1 1
      fs-admin/src/test/java/com/fs/store/controller/FsStorePaymentControllerTest.java
  6. 8 2
      fs-company/src/main/java/com/fs/core/config/ResourcesConfig.java
  7. 35 0
      fs-company/src/main/java/com/fs/core/config/ThreadPoolTaskWrapExecutor.java
  8. 40 0
      fs-company/src/main/java/com/fs/core/interceptor/LogInterceptor.java
  9. 61 0
      fs-company/src/main/java/com/fs/core/util/ThreadMdcUtil.java
  10. 4 0
      fs-service-system/src/main/java/com/fs/store/cache/impl/IFsStoreProductCacheServiceImpl.java
  11. 10 1
      fs-service-system/src/main/java/com/fs/store/service/impl/FsStoreOrderServiceImpl.java
  12. 33 0
      fs-sync/.gitignore
  13. 85 0
      fs-sync/pom.xml
  14. 23 0
      fs-sync/src/main/java/com/fs/fssync/FsSyncApplication.java
  15. 72 0
      fs-sync/src/main/java/com/fs/fssync/config/FastJson2JsonRedisSerializer.java
  16. 50 0
      fs-sync/src/main/java/com/fs/fssync/config/FlinkConfig.java
  17. 43 0
      fs-sync/src/main/java/com/fs/fssync/config/RedisConfig.java
  18. 40 0
      fs-sync/src/main/java/com/fs/fssync/config/SpringContextHolder.java
  19. 65 0
      fs-sync/src/main/java/com/fs/fssync/listener/CustomSink.java
  20. 204 0
      fs-sync/src/main/java/com/fs/fssync/listener/MySqlEventListener.java
  21. 82 0
      fs-sync/src/main/java/com/fs/fssync/sink/AbstractCdcSinkStrategy.java
  22. 23 0
      fs-sync/src/main/java/com/fs/fssync/sink/CdcSinkStrategy.java
  23. 28 0
      fs-sync/src/main/java/com/fs/fssync/sink/impl/FsStoreProductAttrValueSinkStrategy.java
  24. 27 0
      fs-sync/src/main/java/com/fs/fssync/sink/impl/FsStoreProductPackageSinkStrategy.java
  25. 33 0
      fs-sync/src/main/java/com/fs/fssync/sink/impl/FsStoreProductSinkStrategy.java
  26. 34 0
      fs-sync/src/main/java/com/fs/fssync/sink/impl/FsUserSinkStrategy.java
  27. 25 0
      fs-sync/src/main/resources/application-dev.yml
  28. 51 0
      fs-sync/src/main/resources/application.yml
  29. 9 1
      fs-user-app/src/main/java/com/fs/app/utils/JwtUtils.java
  30. 1 0
      pom.xml

+ 8 - 2
fs-admin/src/main/java/com/fs/core/config/ResourcesConfig.java

@@ -1,5 +1,6 @@
 package com.fs.core.config;
 
+import com.fs.core.interceptor.LogInterceptor;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -15,7 +16,7 @@ import com.fs.core.interceptor.RepeatSubmitInterceptor;
 
 /**
  * 通用配置
- * 
+ *
 
  */
 @Configuration
@@ -24,6 +25,9 @@ public class ResourcesConfig implements WebMvcConfigurer
     @Autowired
     private RepeatSubmitInterceptor repeatSubmitInterceptor;
 
+    @Autowired
+    private LogInterceptor logInterceptor;
+
     @Override
     public void addResourceHandlers(ResourceHandlerRegistry registry)
     {
@@ -42,6 +46,8 @@ public class ResourcesConfig implements WebMvcConfigurer
     public void addInterceptors(InterceptorRegistry registry)
     {
         registry.addInterceptor(repeatSubmitInterceptor).addPathPatterns("/**");
+        registry.addInterceptor(logInterceptor)
+                .addPathPatterns("/**");
     }
 
     /**
@@ -63,4 +69,4 @@ public class ResourcesConfig implements WebMvcConfigurer
         source.registerCorsConfiguration("/**", config);
         return new CorsFilter(source);
     }
-}
+}

+ 35 - 0
fs-admin/src/main/java/com/fs/core/config/ThreadPoolTaskWrapExecutor.java

@@ -0,0 +1,35 @@
+package com.fs.core.config;
+
+import com.fs.core.util.ThreadMdcUtil;
+import org.slf4j.MDC;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+/**
+ * @description:
+ * @author: xdd
+ * @date: 2025/3/13
+ */
+public final class ThreadPoolTaskWrapExecutor extends ThreadPoolTaskExecutor {
+    public ThreadPoolTaskWrapExecutor() {
+        super();
+    }
+
+    @Override
+    public void execute(Runnable task) {
+        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
+    }
+
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
+    }
+}

+ 40 - 0
fs-admin/src/main/java/com/fs/core/interceptor/LogInterceptor.java

@@ -0,0 +1,40 @@
+package com.fs.core.interceptor;
+
+
+
+import org.slf4j.MDC;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import org.springframework.web.servlet.HandlerInterceptor;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.UUID;
+
+
+/**
+ * @description: 日志拦截器
+ * @author: xdd
+ * @date: 2025/3/13
+ */
+@Component
+public class LogInterceptor implements HandlerInterceptor {
+
+    private static final String traceId = "traceId";
+
+    @Override
+    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
+        String tid = UUID.randomUUID().toString().replace("-", "");
+        if (!StringUtils.isEmpty(request.getHeader("traceId"))) {
+            tid = request.getHeader("traceId");
+        }
+        MDC.put(traceId, tid);
+        return true;
+    }
+
+    @Override
+    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
+                                Exception ex) {
+        MDC.remove(traceId);
+    }
+}

+ 61 - 0
fs-admin/src/main/java/com/fs/core/util/ThreadMdcUtil.java

@@ -0,0 +1,61 @@
+package com.fs.core.util;
+
+
+import org.slf4j.MDC;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+/**
+ * @description:
+ * @author: xdd
+ * @date: 2025/3/13
+ * @Description:
+ */
+public final class ThreadMdcUtil {
+    private static final String traceId = "traceId";
+
+    public static String generateTraceId() {
+        return UUID.randomUUID().toString().replace("-", "");
+    }
+
+    public static void setTraceIdIfAbsent() {
+        if (MDC.get(traceId) == null) {
+            MDC.put(traceId, generateTraceId());
+        }
+    }
+
+    public static<T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
+        return () -> {
+            if (context == null) {
+                MDC.clear();
+            } else {
+                MDC.setContextMap(context);
+            }
+            setTraceIdIfAbsent();
+            try {
+                return callable.call();
+            } finally {
+                MDC.clear();
+            }
+        };
+    }
+
+
+    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
+        return () -> {
+            if (context == null) {
+                MDC.clear();
+            } else {
+                MDC.setContextMap(context);
+            }
+            setTraceIdIfAbsent();
+            try {
+                runnable.run();
+            } finally {
+                MDC.clear();
+            }
+        };
+    }
+}

+ 1 - 1
fs-admin/src/test/java/com/fs/store/controller/FsStorePaymentControllerTest.java

@@ -116,7 +116,7 @@ public class FsStorePaymentControllerTest {
 
     @Test
     public void pushErp() throws ParseException {
-        fsStoreOrderService.createOmsOrder(1332687L);
+        fsStoreOrderService.createOmsOrder(1370120L);
     }
 
     @Test

+ 8 - 2
fs-company/src/main/java/com/fs/core/config/ResourcesConfig.java

@@ -1,5 +1,6 @@
 package com.fs.core.config;
 
+import com.fs.core.interceptor.LogInterceptor;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -15,7 +16,7 @@ import com.fs.core.interceptor.RepeatSubmitInterceptor;
 
 /**
  * 通用配置
- * 
+ *
 
  */
 @Configuration
@@ -23,6 +24,8 @@ public class ResourcesConfig implements WebMvcConfigurer
 {
     @Autowired
     private RepeatSubmitInterceptor repeatSubmitInterceptor;
+    @Autowired
+    private LogInterceptor logInterceptor;
 
     @Override
     public void addResourceHandlers(ResourceHandlerRegistry registry)
@@ -42,6 +45,9 @@ public class ResourcesConfig implements WebMvcConfigurer
     public void addInterceptors(InterceptorRegistry registry)
     {
         registry.addInterceptor(repeatSubmitInterceptor).addPathPatterns("/**");
+
+        registry.addInterceptor(logInterceptor)
+                .addPathPatterns("/**");
     }
 
     /**
@@ -63,4 +69,4 @@ public class ResourcesConfig implements WebMvcConfigurer
         source.registerCorsConfiguration("/**", config);
         return new CorsFilter(source);
     }
-}
+}

+ 35 - 0
fs-company/src/main/java/com/fs/core/config/ThreadPoolTaskWrapExecutor.java

@@ -0,0 +1,35 @@
+package com.fs.core.config;
+
+import com.fs.core.util.ThreadMdcUtil;
+import org.slf4j.MDC;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+/**
+ * @description:
+ * @author: xdd
+ * @date: 2025/3/13
+ */
+public final class ThreadPoolTaskWrapExecutor extends ThreadPoolTaskExecutor {
+    public ThreadPoolTaskWrapExecutor() {
+        super();
+    }
+
+    @Override
+    public void execute(Runnable task) {
+        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
+    }
+
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
+    }
+}

+ 40 - 0
fs-company/src/main/java/com/fs/core/interceptor/LogInterceptor.java

@@ -0,0 +1,40 @@
+package com.fs.core.interceptor;
+
+
+
+import org.slf4j.MDC;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import org.springframework.web.servlet.HandlerInterceptor;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.UUID;
+
+
+/**
+ * @description: 日志拦截器
+ * @author: xdd
+ * @date: 2025/3/13
+ */
+@Component
+public class LogInterceptor implements HandlerInterceptor {
+
+    private static final String traceId = "traceId";
+
+    @Override
+    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
+        String tid = UUID.randomUUID().toString().replace("-", "");
+        if (!StringUtils.isEmpty(request.getHeader("traceId"))) {
+            tid = request.getHeader("traceId");
+        }
+        MDC.put(traceId, tid);
+        return true;
+    }
+
+    @Override
+    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
+                                Exception ex) {
+        MDC.remove(traceId);
+    }
+}

+ 61 - 0
fs-company/src/main/java/com/fs/core/util/ThreadMdcUtil.java

@@ -0,0 +1,61 @@
+package com.fs.core.util;
+
+
+import org.slf4j.MDC;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+/**
+ * @description:
+ * @author: xdd
+ * @date: 2025/3/13
+ * @Description:
+ */
+public final class ThreadMdcUtil {
+    private static final String traceId = "traceId";
+
+    public static String generateTraceId() {
+        return UUID.randomUUID().toString().replace("-", "");
+    }
+
+    public static void setTraceIdIfAbsent() {
+        if (MDC.get(traceId) == null) {
+            MDC.put(traceId, generateTraceId());
+        }
+    }
+
+    public static<T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
+        return () -> {
+            if (context == null) {
+                MDC.clear();
+            } else {
+                MDC.setContextMap(context);
+            }
+            setTraceIdIfAbsent();
+            try {
+                return callable.call();
+            } finally {
+                MDC.clear();
+            }
+        };
+    }
+
+
+    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
+        return () -> {
+            if (context == null) {
+                MDC.clear();
+            } else {
+                MDC.setContextMap(context);
+            }
+            setTraceIdIfAbsent();
+            try {
+                runnable.run();
+            } finally {
+                MDC.clear();
+            }
+        };
+    }
+}

+ 4 - 0
fs-service-system/src/main/java/com/fs/store/cache/impl/IFsStoreProductCacheServiceImpl.java

@@ -38,11 +38,15 @@ public class IFsStoreProductCacheServiceImpl implements IFsStoreProductCacheServ
 
     @Override
     public FsStoreProduct selectFsStoreProductById(Long productId) {
+        log.info("开始查询商品信息,商品ID:{}", productId);
+
         return CACHE.get(productId, e -> fsStoreProductService.selectFsStoreProductById(productId));
     }
 
     @Override
     public String getWarehouseCodeByProductId(Long productId) {
+        log.info("开始根据商品ID获取仓库编码,商品ID:{}", productId);
+
         return WAREHOUSE_CACHE.get(productId,e->{
             FsStoreProduct fsStoreProduct = selectFsStoreProductById(productId);
             if(ObjectUtil.isNotNull(fsStoreProduct) && ObjectUtil.isNotNull(fsStoreProduct.getWarehouseId())){

+ 10 - 1
fs-service-system/src/main/java/com/fs/store/service/impl/FsStoreOrderServiceImpl.java

@@ -280,6 +280,15 @@ public class FsStoreOrderServiceImpl implements IFsStoreOrderService
     @Value("${snowflake.datacenter-id:1}")
     private long datacenterId;
 
+    @Autowired
+    private FsUserMapper fsUserMapper;
+
+    @Value("${snowflake.worker-id:1}")
+    private long workerId;
+
+    @Value("${snowflake.datacenter-id:1}")
+    private long datacenterId;
+
     @Override
     public void syncExpressToWx() {
         List<FsWxExpressTask> fsWxExpressTasks = fsWxExpressTaskMapper.selectPendingData();
@@ -2151,7 +2160,7 @@ public class FsStoreOrderServiceImpl implements IFsStoreOrderService
                             item.setQty(dto.getCount()*cartDTO.getNum());
                             item.setRefund(0);
                             // 查询仓库代码
-                            String warehouseCode = fsStoreProductCacheService.getWarehouseCodeByProductId(dto.getProductId());
+                            String warehouseCode = fsStoreProductCacheService.getWarehouseCodeByProductId(attrValue.getProductId());
                             if(StringUtils.isNotBlank(warehouseCode)){
                                 item.setWarehouseCode(warehouseCode);
                             }

+ 33 - 0
fs-sync/.gitignore

@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/

+ 85 - 0
fs-sync/pom.xml

@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.fs</groupId>
+    <artifactId>fs-sync</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>fs-sync</name>
+    <description>fs-sync</description>
+    <parent>
+        <artifactId>fs</artifactId>
+        <groupId>com.fs</groupId>
+        <version>1.1.0</version>
+    </parent>
+    <properties>
+        <java.version>1.8</java.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <flink.version>1.14.6</flink.version>
+        <scala.binary.version>2.12</scala.binary.version>
+    </properties>
+    <dependencies>
+
+        <!-- Flink核心依赖 - 使用1.14.6,是1.14系列的最终版本 -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- MySQL CDC连接器 - 与Flink 1.14兼容的最新版本 -->
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fs</groupId>
+            <artifactId>fs-service-system</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>2.1.1.RELEASE</version>
+                <configuration>
+                    <fork>true</fork> <!-- 如果没有该配置,devtools不会生效 -->
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>

+ 23 - 0
fs-sync/src/main/java/com/fs/fssync/FsSyncApplication.java

@@ -0,0 +1,23 @@
+package com.fs.fssync;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.autoconfigure.web.servlet.ServletWebServerFactoryAutoConfiguration;
+import org.springframework.boot.autoconfigure.web.servlet.WebMvcAutoConfiguration;
+
+@SpringBootApplication(
+        exclude = {
+                WebMvcAutoConfiguration.class,
+                ServletWebServerFactoryAutoConfiguration.class,
+                DataSourceAutoConfiguration.class,
+                DataSourceTransactionManagerAutoConfiguration.class
+        }
+)
+public class FsSyncApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(FsSyncApplication.class, args);
+    }
+
+}

+ 72 - 0
fs-sync/src/main/java/com/fs/fssync/config/FastJson2JsonRedisSerializer.java

@@ -0,0 +1,72 @@
+package com.fs.fssync.config;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.parser.ParserConfig;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.SerializationException;
+import org.springframework.util.Assert;
+
+import java.nio.charset.Charset;
+
+/**
+ * Redis使用FastJson序列化
+ *
+
+ */
+public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T>
+{
+    @SuppressWarnings("unused")
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
+
+    private Class<T> clazz;
+
+    static
+    {
+        ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
+    }
+
+    public FastJson2JsonRedisSerializer(Class<T> clazz)
+    {
+        super();
+        this.clazz = clazz;
+    }
+
+    @Override
+    public byte[] serialize(T t) throws SerializationException
+    {
+        if (t == null)
+        {
+            return new byte[0];
+        }
+        return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
+    }
+
+    @Override
+    public T deserialize(byte[] bytes) throws SerializationException
+    {
+        if (bytes == null || bytes.length <= 0)
+        {
+            return null;
+        }
+        String str = new String(bytes, DEFAULT_CHARSET);
+
+        return JSON.parseObject(str, clazz);
+    }
+
+    public void setObjectMapper(ObjectMapper objectMapper)
+    {
+        Assert.notNull(objectMapper, "'objectMapper' must not be null");
+        this.objectMapper = objectMapper;
+    }
+
+    protected JavaType getJavaType(Class<?> clazz)
+    {
+        return TypeFactory.defaultInstance().constructType(clazz);
+    }
+}

+ 50 - 0
fs-sync/src/main/java/com/fs/fssync/config/FlinkConfig.java

@@ -0,0 +1,50 @@
+package com.fs.fssync.config;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@Component
+@ConfigurationProperties(prefix = "flink")
+public class FlinkConfig {
+    private CdcConfig cdc = new CdcConfig();
+    private CheckpointConfig checkpoint = new CheckpointConfig();
+    private ParallelismConfig parallelism = new ParallelismConfig();
+
+    @Data
+    @NoArgsConstructor
+    public static class CdcConfig {
+        private MySqlConfig mysql = new MySqlConfig();
+    }
+
+    @Data
+    @NoArgsConstructor
+    public static class MySqlConfig {
+        private String hostname;
+        private int port;
+        private String databaseList;
+        private String[] tableList;
+        private String username;
+        private String password;
+        private String serverTimeZone;
+        private String startupOptions;
+    }
+
+    @Data
+    @NoArgsConstructor
+    public static class CheckpointConfig {
+        private int interval;
+    }
+
+    @Data
+    @NoArgsConstructor
+    public static class ParallelismConfig {
+        private int source;
+        private int sink;
+    }
+}

+ 43 - 0
fs-sync/src/main/java/com/fs/fssync/config/RedisConfig.java

@@ -0,0 +1,43 @@
+package com.fs.fssync.config;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.cache.annotation.CachingConfigurerSupport;
+import org.springframework.cache.annotation.EnableCaching;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+/**
+ * redis配置
+ *
+
+ */
+@Configuration
+@EnableCaching
+public class RedisConfig extends CachingConfigurerSupport
+{
+    @Bean
+    @SuppressWarnings(value = { "unchecked", "rawtypes" })
+    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory)
+    {
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
+        template.setConnectionFactory(connectionFactory);
+
+        FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
+
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
+        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
+        serializer.setObjectMapper(mapper);
+
+        template.setValueSerializer(serializer);
+        // 使用StringRedisSerializer来序列化和反序列化redis的key值
+        template.setKeySerializer(new StringRedisSerializer());
+        template.afterPropertiesSet();
+        return template;
+    }
+}

+ 40 - 0
fs-sync/src/main/java/com/fs/fssync/config/SpringContextHolder.java

@@ -0,0 +1,40 @@
+package com.fs.fssync.config;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+
+@Component
+public class SpringContextHolder implements ApplicationContextAware, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static ApplicationContext applicationContext;
+
+    // 记录是否已初始化
+    private static volatile boolean initialized = false;
+
+    @Override
+    public void setApplicationContext(ApplicationContext context) throws BeansException {
+        applicationContext = context;
+        initialized = true;
+    }
+
+    public static ApplicationContext getApplicationContext() {
+        if (!initialized) {
+            throw new IllegalStateException("ApplicationContext has not been initialized");
+        }
+        return applicationContext;
+    }
+
+    public static <T> T getBean(Class<T> clazz) {
+        return getApplicationContext().getBean(clazz);
+    }
+
+    public static <T> T getBean(String name, Class<T> clazz) {
+        return getApplicationContext().getBean(name, clazz);
+    }
+}

+ 65 - 0
fs-sync/src/main/java/com/fs/fssync/listener/CustomSink.java

@@ -0,0 +1,65 @@
+package com.fs.fssync.listener;
+
+import cn.hutool.extra.spring.SpringUtil;
+import com.alibaba.fastjson.JSON;
+import com.fs.fssync.config.SpringContextHolder;
+import com.fs.fssync.sink.CdcSinkStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Flink CDC 数据处理
+ *
+ * @author jokey
+ * @since 2024-3-14
+ */
+@Slf4j
+public class CustomSink extends RichSinkFunction<String> implements Serializable {
+
+    @Override
+    public void invoke(String value, Context context) {
+        log.info("收到变更原始数据:{}", value);
+        try {
+            // 从JSON中提取表名信息
+            String source = JSON.parseObject(value).getString("source");
+            String table = JSON.parseObject(source).getString("table");
+
+            // 根据表名查找对应的策略
+            processBySinkStrategy(table, value);
+        } catch (Exception e) {
+            log.error("处理CDC数据失败", e);
+        }
+    }
+
+    private void processBySinkStrategy(String table, String value) {
+        // 获取所有实现了CdcSinkStrategy的Bean
+        Map<String, CdcSinkStrategy> strategies =
+                SpringContextHolder.getApplicationContext().getBeansOfType(CdcSinkStrategy.class);
+
+
+        for (CdcSinkStrategy strategy : strategies.values()) {
+            try {
+                if (isStrategyForTable(strategy, table)) {
+                    strategy.process(value);
+                    return;
+                }
+            } catch (Exception e) {
+                log.error("策略处理失败", e);
+            }
+        }
+
+        log.warn("未找到表 {} 的处理策略", table);
+    }
+
+    private boolean isStrategyForTable(CdcSinkStrategy strategy, String table) {
+
+        return strategy.canHandle(table.toLowerCase());
+    }
+}

+ 204 - 0
fs-sync/src/main/java/com/fs/fssync/listener/MySqlEventListener.java

@@ -0,0 +1,204 @@
+package com.fs.fssync.listener;
+
+import com.fs.fssync.config.FlinkConfig;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.io.File;
+import java.util.*;
+
+/**
+ * MySQL事件监听
+ *
+ * @author jokey
+ * @since 2024-3-14
+ */
+@Slf4j
+@Component
+public class MySqlEventListener implements CommandLineRunner {
+
+    @Autowired
+    private FlinkConfig flinkConfig;
+
+    private static final String JOB_ID = "1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d";
+    private static final String CHECKPOINT_DIR = "file:///d:/data/flink/checkpoints";
+
+    @Override
+    public void run(String... args) throws Exception {
+
+        // 创建环境并配置
+        final StreamExecutionEnvironment env = configureEnvironment();
+
+
+        FlinkConfig.MySqlConfig mysqlConfig = flinkConfig.getCdc().getMysql();
+
+        // Debezium 配置
+        Properties debeziumProps = createDebeziumProperties();
+
+        // 创建 MySQL CDC 源
+        MySqlSource<String> mySqlSource = createMySqlSource(mysqlConfig, debeziumProps);
+
+
+        // 创建数据流
+        DataStreamSource<String> streamSource = env.fromSource(
+                mySqlSource,
+                WatermarkStrategy.noWatermarks(),
+                "MySQL Source"
+        ).setParallelism(flinkConfig.getParallelism().getSource());
+
+        // 添加自定义接收器
+        CustomSink customSink = new CustomSink();
+        streamSource.addSink(customSink)
+                .name("syncToRedis")
+                .setParallelism(flinkConfig.getParallelism().getSink());
+        // 修改执行方法,捕获更多信息
+        StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setJobName("MySQL-CDC-Sync-Job");
+        // 执行前记录更多信息
+        log.info("准备执行CDC任务,作业名称: {}, 作业ID: {}, 检查点目录: {}",
+                streamGraph.getJobName(), JOB_ID, CHECKPOINT_DIR);
+        // 执行作业
+        env.execute(streamGraph);
+    }
+
+    private Properties createDebeziumProperties() {
+        Map<String, String> configMap = new HashMap<>();
+        configMap.put("decimal.handling.mode", "string");
+        configMap.put("bigint.unsigned.handling.mode", "long");
+        configMap.put("time.precision.mode", "adaptive_time_microseconds");
+        configMap.put("datetime.handling.mode", "string");
+        configMap.put("binary.handling.mode", "bytes");
+        configMap.put("json.handling.mode", "string");
+        configMap.put("geometry.handling.mode", "string");
+        configMap.put("include.unknown.datatypes", "false");
+
+        Properties props = new Properties();
+        props.putAll(configMap);
+        return props;
+    }
+
+    private MySqlSource<String> createMySqlSource(FlinkConfig.MySqlConfig mysqlConfig, Properties debeziumProps) {
+        return MySqlSource.<String>builder()
+                .debeziumProperties(debeziumProps)
+                .hostname(mysqlConfig.getHostname())
+                .port(mysqlConfig.getPort())
+                .databaseList(mysqlConfig.getDatabaseList())
+                .tableList(mysqlConfig.getTableList())
+                .username(mysqlConfig.getUsername())
+                .password(mysqlConfig.getPassword())
+                .deserializer(new JsonDebeziumDeserializationSchema())
+                .startupOptions(getStartupOptions(mysqlConfig.getStartupOptions()))
+                .serverTimeZone(mysqlConfig.getServerTimeZone())
+                .build();
+    }
+
+    private StreamExecutionEnvironment configureEnvironment() {
+        Configuration configuration = new Configuration();
+        configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, JOB_ID);
+
+        // 添加这行 - 指定具体的检查点路径
+        String latestCheckpoint = findLatestCheckpoint();
+        if (latestCheckpoint != null) {
+            configuration.setString("execution.savepoint.path", latestCheckpoint);
+            log.info("将从检查点恢复: {}", latestCheckpoint);
+        }
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+
+        // 设置状态后端
+        try {
+            // 使用HashMap状态后端 (内存中计算,但检查点存储在文件系统)
+            env.setStateBackend(new HashMapStateBackend());
+            env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(CHECKPOINT_DIR));
+            log.info("设置状态后端为: {}", CHECKPOINT_DIR);
+        } catch (Exception e) {
+            log.error("设置状态后端失败", e);
+        }
+
+        // 检查点配置
+        env.enableCheckpointing(flinkConfig.getCheckpoint().getInterval());
+        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
+        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        checkpointConfig.setMinPauseBetweenCheckpoints(500);
+        checkpointConfig.setCheckpointTimeout(60000);
+        checkpointConfig.setMaxConcurrentCheckpoints(1);
+        checkpointConfig.setTolerableCheckpointFailureNumber(3);
+        checkpointConfig.setExternalizedCheckpointCleanup(
+                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+        checkpointConfig.enableUnalignedCheckpoints();
+
+        // 设置重启策略
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
+
+
+        log.info("Flink环境配置完成,检查点间隔: {}ms, 作业ID: {}",
+                flinkConfig.getCheckpoint().getInterval(), JOB_ID);
+
+        return env;
+    }
+
+    private String findLatestCheckpoint() {
+        File checkpointDir = new File(CHECKPOINT_DIR + JOB_ID);
+        if (!checkpointDir.exists() || !checkpointDir.isDirectory()) {
+            return null;
+        }
+
+        // 查找最新的检查点
+        File[] checkpoints = checkpointDir.listFiles(file ->
+                file.isDirectory() && file.getName().startsWith("chk-"));
+
+        if (checkpoints == null || checkpoints.length == 0) {
+            return null;
+        }
+
+        // 按检查点ID排序(提取数字部分)
+        Arrays.sort(checkpoints, (f1, f2) -> {
+            int id1 = Integer.parseInt(f1.getName().substring(4));
+            int id2 = Integer.parseInt(f2.getName().substring(4));
+            return Integer.compare(id2, id1); // 降序排列
+        });
+
+        // 检查_metadata文件是否存在
+        File latest = checkpoints[0];
+        File metadata = new File(latest, "_metadata");
+        if (metadata.exists() && metadata.isFile()) {
+            return CHECKPOINT_DIR + JOB_ID + "/" + latest.getName();
+        }
+
+        return null;
+    }
+
+    private StartupOptions getStartupOptions(String option) {
+        // 如果没有检查点,使用配置的启动模式
+        log.info("将使用配置的启动模式: {}", option);
+        if (option == null) {
+            return StartupOptions.earliest();
+        }
+        switch (option.toLowerCase()) {
+            case "initial": return StartupOptions.initial();
+            case "latest": return StartupOptions.latest();
+            case "earliest": return StartupOptions.earliest();
+            default: return StartupOptions.latest();
+        }
+    }
+
+}

+ 82 - 0
fs-sync/src/main/java/com/fs/fssync/sink/AbstractCdcSinkStrategy.java

@@ -0,0 +1,82 @@
+package com.fs.fssync.sink;
+
+import cn.hutool.extra.spring.SpringUtil;
+import com.fasterxml.jackson.databind.*;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+
+@Slf4j
+public abstract class AbstractCdcSinkStrategy<T> implements CdcSinkStrategy<T>, Serializable {
+
+    private final ObjectMapper objectMapper;
+    @Autowired
+    protected RedisTemplate<String, Object> redisTemplate;
+
+    protected AbstractCdcSinkStrategy() {
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
+        this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+        // 添加此行以忽略未知属性
+        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public void process(String cdcData) throws Exception {
+
+        JsonNode rootNode = objectMapper.readTree(cdcData);
+        String op = rootNode.get("op").asText();
+
+        if ("c".equals(op) || "r".equals(op)) {
+            T after = objectMapper.treeToValue(rootNode.get("after"), getEntityClass());
+            handleCreate(after);
+        } else if ("u".equals(op)) {
+            T after = objectMapper.treeToValue(rootNode.get("after"), getEntityClass());
+            handleUpdate(after);
+        } else if ("d".equals(op)) {
+            T before = objectMapper.treeToValue(rootNode.get("before"), getEntityClass());
+            handleDelete(before);
+        } else {
+            log.warn("未知的操作类型: {}", op);
+        }
+    }
+
+    protected void handleCreate(T entity) throws Exception {
+        Object id = getIdValue(entity);
+        if (id != null) {
+            String key = getKeyPrefix() + id;
+            redisTemplate.opsForValue().set(key, entity);
+            log.info("数据已写入 Redis, key={}", key);
+        }
+    }
+
+    protected void handleUpdate(T entity) throws Exception {
+        handleCreate(entity);
+    }
+
+    protected void handleDelete(T entity) throws Exception {
+        Object id = getIdValue(entity);
+        if (id != null) {
+            String key = getKeyPrefix() + id;
+            redisTemplate.delete(key);
+            log.info("数据已从 Redis 删除, key={}", key);
+        }
+    }
+
+    protected String getIdField(){
+        return "id";
+    }
+
+    protected Object getIdValue(T entity) throws Exception {
+        // 获取ID字段的值,可以改为使用反射工具类
+        Field field = getEntityClass().getDeclaredField(getIdField());
+        field.setAccessible(true);
+        return field.get(entity);
+    }
+
+
+}

+ 23 - 0
fs-sync/src/main/java/com/fs/fssync/sink/CdcSinkStrategy.java

@@ -0,0 +1,23 @@
+package com.fs.fssync.sink;
+
+public interface CdcSinkStrategy<T> {
+    /**
+     * 检查是否能处理指定表的CDC数据
+     */
+    boolean canHandle(String table);
+
+    /**
+     * 处理CDC数据
+     */
+    void process(String cdcData) throws Exception;
+
+    /**
+     * 获取处理的实体类
+     */
+    Class<T> getEntityClass();
+
+    /**
+     * 获取Redis键前缀
+     */
+    String getKeyPrefix();
+}

+ 28 - 0
fs-sync/src/main/java/com/fs/fssync/sink/impl/FsStoreProductAttrValueSinkStrategy.java

@@ -0,0 +1,28 @@
+package com.fs.fssync.sink.impl;
+
+import com.fs.fssync.sink.AbstractCdcSinkStrategy;
+import com.fs.store.domain.FsStoreProduct;
+import com.fs.store.domain.FsStoreProductAttrValue;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+
+@Slf4j
+@Component
+public class FsStoreProductAttrValueSinkStrategy extends AbstractCdcSinkStrategy<FsStoreProductAttrValue> implements Serializable {
+    @Override
+    public boolean canHandle(String table) {
+        return "fs_store_product_attr_value".equals(table);
+    }
+
+    @Override
+    public Class<FsStoreProductAttrValue> getEntityClass() {
+        return FsStoreProductAttrValue.class;
+    }
+
+    @Override
+    public String getKeyPrefix() {
+        return "fs:store:product:attr:value:";
+    }
+}

+ 27 - 0
fs-sync/src/main/java/com/fs/fssync/sink/impl/FsStoreProductPackageSinkStrategy.java

@@ -0,0 +1,27 @@
+package com.fs.fssync.sink.impl;
+
+import com.fs.fssync.sink.AbstractCdcSinkStrategy;
+import com.fs.store.domain.FsStoreProduct;
+import com.fs.store.domain.FsStoreProductPackage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+@Slf4j
+@Component
+public class FsStoreProductPackageSinkStrategy extends AbstractCdcSinkStrategy<FsStoreProductPackage> implements Serializable {
+    @Override
+    public boolean canHandle(String table) {
+        return "fs_store_product_package".equals(table);
+    }
+
+    @Override
+    public Class<FsStoreProductPackage> getEntityClass() {
+        return FsStoreProductPackage.class;
+    }
+
+    @Override
+    public String getKeyPrefix() {
+        return "fs:store:product:package:";
+    }
+}

+ 33 - 0
fs-sync/src/main/java/com/fs/fssync/sink/impl/FsStoreProductSinkStrategy.java

@@ -0,0 +1,33 @@
+package com.fs.fssync.sink.impl;
+
+import com.fs.fssync.sink.AbstractCdcSinkStrategy;
+import com.fs.store.domain.FsStoreProduct;
+import com.fs.store.domain.FsUser;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+
+@Slf4j
+@Component
+public class FsStoreProductSinkStrategy extends AbstractCdcSinkStrategy<FsStoreProduct> implements Serializable {
+    @Override
+    public boolean canHandle(String table) {
+        return "fs_store_product".equals(table);
+    }
+
+    @Override
+    public Class<FsStoreProduct> getEntityClass() {
+        return FsStoreProduct.class;
+    }
+
+    @Override
+    public String getKeyPrefix() {
+        return "fs:store:product:";
+    }
+
+    @Override
+    protected String getIdField() {
+        return "productId";
+    }
+}

+ 34 - 0
fs-sync/src/main/java/com/fs/fssync/sink/impl/FsUserSinkStrategy.java

@@ -0,0 +1,34 @@
+package com.fs.fssync.sink.impl;
+
+import com.fs.fssync.sink.AbstractCdcSinkStrategy;
+import com.fs.store.domain.FsUser;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+
+@Slf4j
+@Component
+public class FsUserSinkStrategy extends AbstractCdcSinkStrategy<FsUser> implements Serializable {
+
+    @Override
+    public boolean canHandle(String table) {
+        return "fs_user".equals(table);
+    }
+
+    @Override
+    public Class<FsUser> getEntityClass() {
+        return FsUser.class;
+    }
+
+    @Override
+    public String getKeyPrefix() {
+        log.info("获取fs_user前缀");
+        return "fs:user:";
+    }
+
+    @Override
+    protected String getIdField() {
+        return "userId";
+    }
+}

+ 25 - 0
fs-sync/src/main/resources/application-dev.yml

@@ -0,0 +1,25 @@
+# 数据源配置
+spring:
+    jackson:
+        time-zone: GMT+8 #如果有时区问题,设置时区
+    # redis 配置
+    redis:
+        # 地址
+        host: 127.0.0.1
+        # 端口,默认为6379
+        port: 6379
+        # 密码
+        password:
+        # 连接超时时间
+        timeout: 30s
+        lettuce:
+            pool:
+                # 连接池中的最小空闲连接
+                min-idle: 0
+                # 连接池中的最大空闲连接
+                max-idle: 8
+                # 连接池的最大数据库连接数
+                max-active: 8
+                # #连接池最大阻塞等待时间(使用负值表示没有限制)
+                max-wait: -1ms
+        database: 0

+ 51 - 0
fs-sync/src/main/resources/application.yml

@@ -0,0 +1,51 @@
+logging:
+  level:
+    org.apache.flink.runtime.checkpoint: debug
+    org.apache.flink.runtime.source.coordinator.SourceCoordinator: debug
+    com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader: info
+
+flink:
+  cdc:
+    mysql:
+      hostname: gz-cdb-of55khc9.sql.tencentcdb.com
+      port: 23620
+      databaseList: fs_hospital
+      tableList:
+        - fs_hospital.fs_user
+        - fs_hospital.fs_store_product
+        - fs_hospital.fs_store_product_attr_value
+        - fs_hospital.fs_store_product_package
+      username: root
+      password: Rtyy_2023
+      serverTimeZone: Asia/Shanghai
+      startupOptions: initial # 可选值: latest, initial, specificOffset, timestamp
+  checkpoint:
+    interval: 3000 # 单位:毫秒
+  parallelism:
+    source: 1
+    sink: 1
+# 开发环境配置
+server:
+  # 服务器的HTTP端口,默认为 7011  store 7111
+  port: 7015
+  servlet:
+    # 应用的访问路径
+    context-path: /
+  tomcat:
+    # tomcat的URI编码
+    uri-encoding: UTF-8
+    # tomcat最大线程数,默认为200
+    max-threads: 800
+    # Tomcat启动初始化的线程数,默认值25
+    min-spare-threads: 30
+# Spring配置
+spring:
+  main:
+    web-application-type: none
+  # 资源信息
+  messages:
+    # 国际化资源文件路径
+    basename: i18n/messages
+  profiles:
+    active: dev
+    include: config

+ 9 - 1
fs-user-app/src/main/java/com/fs/app/utils/JwtUtils.java

@@ -1,6 +1,8 @@
 package com.fs.app.utils;
 
+import com.fs.app.exception.FSException;
 import com.fs.common.core.redis.RedisCache;
+import com.fs.common.utils.StringUtils;
 import io.jsonwebtoken.Claims;
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.SignatureAlgorithm;
@@ -8,6 +10,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.http.HttpStatus;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
@@ -44,6 +47,9 @@ public class JwtUtils {
     }
 
     public Claims getClaimByToken(String token) {
+        if (StringUtils.isEmpty(token)) {
+            throw new FSException("token不能为空", HttpStatus.UNAUTHORIZED.value());
+        }
         try {
             return Jwts.parser()
                     .setSigningKey(secret)
@@ -51,7 +57,9 @@ public class JwtUtils {
                     .getBody();
         }catch (Exception e){
             logger.debug("validate is token error ", e);
-            return null;
+            logger.info("token异常,重新登录,token: {}",token);
+
+            throw new FSException("token 失效,请重新登录", HttpStatus.UNAUTHORIZED.value());
         }
     }
 

+ 1 - 0
pom.xml

@@ -218,6 +218,7 @@
         <module>fs-user-app</module>
         <module>fs-api</module>
         <module>fs-socket</module>
+        <module>fs-sync</module>
     </modules>
     <packaging>pom</packaging>