🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
### 配置 Saga Server Saga Server负责管理回滚任务,Saga-server的实现是基于SpringBoot,并内置H2数据库。在启动Saga-Server前,需要安装和启动Kafka。进入命令行,直接启动Saga-Server ``` java -jar sql-saga-microservice-server-3.1.5-RELEASE.jar ``` 默认情况下,会链接本地的`127.0.0.1:9092`的kafka,并且,使用H2数据库,数据存放在~/.h2目录下 你也可以通过SpringBoot机制,配置kafka 和数据库,如下配置项 ```properties server.port=18081 server.shutdown=graceful #数据库配置 spring.datasource.url=jdbc:h2:file:~/.h2/saga-server;AUTO_SERVER=TRUE spring.datasource.username=sa spring.datasource.password= spring.datasource.driver-class-name=org.h2.Driver # kafka配置 spring.kafka.bootstrapServers=${kafka_server:127.0.0.1:9092} spring.kafka.consumer.group-id=${kafka_group:saga-group} spring.kafka.consumer.max-poll-records=100 spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.listener.type=batch beetlsql-saga.kafka.server-topic=saga-server-topic beetlsql-saga.kafka.client-topic-prefix=saga-client beetlsql.sqlManagers=mySqlManager beetlsql.mySqlManager.ds=datasource beetlsql.mySqlManager.basePackage=org.beetl.sql.saga.ms.server beetlsql.mySqlManager.dbStyle=org.beetl.sql.core.db.H2Style ``` 关于Kakfa Topic说明 * beetlsql-saga.kafka.server-topic, server监听的topic,客户端产生的回滚任务或者回滚结果都发送到此topic * beetlsql-saga.kafka.client-topic-prefix ,客户端topic前缀,比如客户端是orderApp,那么,Saga-Server会发送orderApp相关的回滚任务到saga-client-orderApp里。每个客户端都有一个topic接受来自Saga—Server的回滚任务。 对于每个客户端,安装方式类似上一章的多库实现 ```java <dependency> <groupId>com.ibeetl</groupId> <artifactId>sql-springboot-starter</artifactId> <version>${version}</version> </dependency> <dependency> <groupId>com.ibeetl</groupId> <artifactId>sql-saga-client</artifactId> <version>${version}</version> </dependency> ``` client包自带了`SagaClientConfig` 用于设置`SagaContext.sagaContextFactory = new SagaClientContextFactory(this);` 因此基于Spring Boot的客户端必须扫描包`org.beetl.sql.saga.ms.client`,比如 ```java @SpringBootApplication(scanBasePackages = {"com.xxx", "org.beetl.sql.saga.ms.client"}) @EnableKafka public class UserApplication { public static void main(String[] args) { SpringApplication.run(UserApplication.class, args); } } ``` > 可以参考源码DemoApplication来了解如何配置和使用Saga Client 客户端需要配置appName以及跟Saga-Server交互的Topic,如下 ```properties spring.application.name=demoSystem beetlsql-saga.kafka.client-topic-prefix=saga-client beetlsql-saga.kafka.server-topic=saga-server-topic spring.kafka.bootstrapServers=${kafka_server:127.0.0.1:9092} spring.kafka.consumer.max-poll-records=1 spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.enable-auto-commit=true spring.kafka.listener.type=batch ``` 第一行是标准的Spring Boot 配置,后面俩行跟Saga-Server一样的配置 完成如上配置后,既可以同样的方式使用BeetlSQL的Saga事务管理,以源码`DemoController`为例子 ```java SagaContext sagaContext = SagaContext.sagaContextFactory.current(); try { sagaContext.start(gid); //模拟调用俩个微服务,订单和用户 rest.postForEntity(orderAddUrl, null,String.class, paras); rest.postForEntity(userBalanceUpdateUrl, null,String.class, paras); if (1 == 1) { throw new RuntimeException("模拟失败,查询saga-server 看效果"); } } catch (Exception e) { sagaContext.rollback(); return e.getMessage(); } ``` orderAddUrl 是订单服务,实现如下 ```java @Service public class OrderService { @Autowired OrderMapper orderMapper; @Transactional(propagation=Propagation.NEVER) public void addOrder(String orderId,String userId,Integer fee){ SagaContext sagaContext = SagaContext.sagaContextFactory.current(); try{ sagaContext.start(orderId); OrderEntity orderEntity = new OrderEntity(); orderEntity.setFee(fee); orderEntity.setUserId(userId); orderEntity.setProductId("any"); orderMapper.insert(orderEntity); sagaContext.commit(); }catch (Exception e){ sagaContext.rollback(); throw new RuntimeException(e); } } } ``` userBalanceUpdateUrl是余额操作,实现如下 ```java @Service public class UserService { @Autowired UserMapper userMapper; @Transactional(propagation= Propagation.NEVER) public void update(String orderId,String userId,Integer fee){ SagaContext sagaContext = SagaContext.sagaContextFactory.current(); try{ sagaContext.start(orderId); UserEntity user = userMapper.unique(userId); user.setBalance(user.getBalance()-fee); userMapper.updateById(user); sagaContext.commit(); }catch (Exception e){ sagaContext.rollback(); throw new RuntimeException(e); } } } ```