canal太复杂?试试这个超方便的binlog同步包,完美适配springboot

使用binlog的原因

近期需要重构一个老系统,需要从几个服务中实时同步订单的修改到重构表里。
这里就面临两个选择,

  1. 在每个服务的mysql操作前埋点,发送修改信息到队列或服务上。这种方案需要修改多个服务的代码并且测试对原系统的影响,有额外开发和测试成本。
  2. 同步mysql的binlog,根据表的insert和update更新新表。但是需要维护一个binlog同步的服务

本次选择了binlog同步的方式。搭建的binlog服务也可以用在之后新系统的缓存更新和同步ES索引上,相对于埋点这种只有一次性作用的方式,性价比较高。

工具调研:canal和mysql-binlog-connector-java

1.canal

要实现binlog同步服务,使用较多的开源方案是canal,运行比较稳定,而且功能也很丰富。

但是在实际部署服务的时候,遇到了一些问题:

  • canal采用了client-server的模式,至少需要部署两个集群。我们的项目都是使用私有云部署,为了稳定运行,就有额外的资源和维护开销。
    • 后来发现canal server可以投递信息到kafka。但是我们的消息队列是自研的,只能尝试去改源码。
  • canal的server是完整独立的包,无法直接用springboot嵌套。而我们的基础组件都依赖于springboot,比如监控,配置中心等。
  • canal的HA部署使用的是zookeeper,很可惜我们并没有可用的zookeper集群,也没有资源重新部署一个。
  • 单机部署的时候,已经处理的binlog postion是保存在文件里面的,我们用的私有云docker,重启后全丢失。

2.mysql-binlog-connector-java

调研同时也发现了另一个binlog同步工具, mysql-binlog-connector-java

这是一个开源的binlog同步工具,功能很简单,就是接收binlog信息。作为一个依赖jar可以很容易在springboot中使用。

但是没有对binlog的内容做格式化处理,使用很不方便。当然更没有保存信息和分布式部署功能。

自研工具包binlogportal

基于这些问题,我们需要一个具有以下特性的binlog同步工具:

  • 可以使用springboot加载运行,具有较好的扩展性
    • 说白了就是作为一个jar包,开放出接口可以自定义处理binlog信息的方式
  • 可以使用redis实现binlog position的保存和分布式部署

为了满足这些条件,通过对 mysql-binlog-connector-java 封装后,实现了自研的工具binlogportal。

  • 提供了binlogportal-spring-boot-starter包,可使用spring boot快速部署
  • 使用redis保存binlog position信息,重启后可从上次position位置开始
  • 当前支持insert和update的结构化
  • 提供默认的http事件处理器。可通过实现IEventHandler接口,自定义事件处理器
  • 使用redis作为分布式协调器,可多机部署实现高可用

使用说明

Mysql配置

  • Mysql需要开启binlog并设置为row模式
  • 同步binlog使用的mysql账号,需要添加REPLICATION权限,示例如下:
CREATE USER binlogportal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlogportal'@'%';
GRANT ALL PRIVILEGES ON *.* TO 'binlogportal'@'%';
FLUSH PRIVILEGES;

通过spring boot构建项目

  • 直接依赖binlogportal-spring-boot-starter
<dependency>
  <groupId>com.insistingon.binlogportal</groupId>
  <artifactId>binlogportal-spring-boot-starter</artifactId>
  <version>1.0.5</version>
</dependency>
  • 通过spring boot的application.yml配置启动器
binlogportal:
  enable: true # 是否启用autoconfig
  distributed-enable: true # 是否启用分布式部署
  distributed-redis: # distributed-enable为true时,要提供一个redis作为分布式协调器
    host: 127.0.0.1
    port: 6379
    auth:
  position-redis: # 保存binlog position的redis,必须配置
    host: 127.0.0.1
    port: 6379
    auth:
  db-config: # 数据库配置,可以有多个,key自定义即可
    d1:
      host: 0.0.0.0
      port: 3306
      user-name: binlogportal
      password: 123456
      handler-list: [logEventHandler] # 该数据库使用的事件处理器,名称为spring的bean name
  http-handler: # 启用自带的http事件处理器,可发送请求
    url-list: [http://127.0.0.1:8988/testit] # 要发送的url列表,http参数为统一的格式
    result-callback: httpCallBack # 配置自定义的结果处理器,需要实现IHttpCallback接口,值为bean name
  • Starter启动
    • spring boot autoconfig启动成功后,会把BinlogPortalStarter的实例注入到IOC中
    • 项目中通过注入的方式获取binlogPortalStarter使用
    • binlogPortalStarter.start()会为每个mysql库创建一个线程处理binlog
    • 下面是使用CommandLineRunner启动starter的一个例子
@Slf4j
@Component
public class BinlogSync implements CommandLineRunner {
    @Resource
    BinlogPortalStarter binlogPortalStarter;

    public void run(String... args) throws Exception {
        try {
            binlogPortalStarter.start();
        } catch (BinlogPortalException e) {
            log.error(e.getMessage(), e);
        }
    }
}

非spring boot项目

  • 非spring boot项目,可以使用基础包
<dependency>
  <groupId>com.insistingon.binlogportal</groupId>
  <artifactId>binlogportal</artifactId>
  <version>1.0.5</version>
</dependency>
  • 依赖后实现配置类 BinlogPortalConfigSyncConfig ,传入Starter中运行即可
public class TestClass{
 public static void main(String[] args) {
        SyncConfig syncConfig = new SyncConfig();
        syncConfig.setHost("0.0.0.0");
        syncConfig.setPort(3306);
        syncConfig.setUserName("binlogportal");
        syncConfig.setPassword("123456");

        BinlogPortalConfig binlogPortalConfig = new BinlogPortalConfig();
        binlogPortalConfig.addSyncConfig(syncConfig);

        RedisConfig redisConfig = new RedisConfig("127.0.0.1", 6379);
        RedisPositionHandler redisPositionHandler = new RedisPositionHandler(redisConfig);
        binlogPortalConfig.setPositionHandler(redisPositionHandler);

        binlogPortalConfig.setDistributedHandler(new RedisDistributedHandler(redisConfig));

        BinlogPortalStarter binlogPortalStarter = new BinlogPortalStarter();
        binlogPortalStarter.setBinlogPortalConfig(binlogPortalConfig);
        try {
            binlogPortalStarter.start();
        } catch (BinlogPortalException e) {
            e.printStackTrace();
        }
    }
}

2.分布式部署实现

项目中高可用实现是基于redis的分布式锁。

每个实例都会加载全部数据库的配置,在创建binlog连接之前,先要获取redis锁,获取锁后会定时刷新锁的过期时间。所有实例会定时重新抢锁。

同一个mysql库的binlog文件和position会保存在redis里,如果一个实例宕机。新抢到锁的实例在初始化时,会使用上个实例已保存的binlog信息继续获取。

项目源码可在Git上查看。 项目的Git地址:

以上内容属个人学习总结,如有不当之处,欢迎在评论中指正


作者:dothetrick
原文:canal太复杂?试试这个超方便的binlog同步包,完美适配springboot - 掘金