如今的分布式架构里,MySQL和Elasticsearch(也就是ES)结合使用,已经成为解决高并发查询和复杂检索问题的常用组合。不过,要实现这两者之间高效的数据同步,可不是一件容易的事儿,它可是架构设计中必须要攻克的难题。今天,我就来和大家详细聊聊MySQL同步ES的6种主流方案,不仅会结合代码示例,还会讲讲具体的场景案例,帮大家避开常见的“坑”,在技术选型时做出最适合的选择。

一、同步双写方案

这个方案适用于那些对数据实时性要求特别高,而且业务逻辑比较简单的场景,就好比金融交易记录的同步,每一笔交易数据都要及时准确地记录和同步。在实际操作中,是在业务代码里同时往MySQL和ES里写入数据。代码示例如下:

@Transactional public void createOrder(Order order) { // 先把订单数据写入MySQL数据库 orderMapper.insert(order); // 紧接着,将相同的订单数据同步写入ES IndexRequest request = new IndexRequest("orders") .id(order.getId()) .source(JSON.toJSONString(order), XContentType.JSON); client.index(request, RequestOptions.DEFAULT); } 

虽然这个方案能快速实现数据同步,但它也存在一些明显的缺点:

  • 硬编码侵入问题:只要涉及到数据写入操作,就必须添加往ES写入的逻辑代码,这会让代码变得很繁琐,而且后期维护起来也比较麻烦。
  • 性能瓶颈:同时往两个地方写数据,会让事务处理的时间变长,系统的TPS(每秒事务处理量)会下降30%以上,影响系统的整体性能。
  • 数据一致性风险:要是往ES写入数据失败了,就可能出现数据不一致的情况。为了解决这个问题,往往需要引入补偿机制,比如建立本地事务表,再配合定时重试的方式来保证数据最终一致。

二、异步双写方案

假设在电商场景中,订单状态更新后,需要同步到ES,方便客服系统进行检索,这时候异步双写方案就派上用场了。这个方案借助MQ(消息队列)来实现解耦,让系统的各个部分可以更独立地运行。它的架构图大概是这样:业务服务先把数据写入MySQL,然后发送消息到Kafka,消费服务从Kafka读取消息,再将数据写入ES。

下面是具体的代码示例:

// 生产者端:更新产品信息时,将数据写入MySQL并发送消息到Kafka public void updateProduct(Product product) { productMapper.update(product); kafkaTemplate.send("product-update", product.getId()); } // 消费者端:监听Kafka的指定主题,获取产品ID后从MySQL查询数据并写入ES @KafkaListener(topics = "product-update") public void syncToEs(String productId) { Product product = productMapper.selectById(productId); esClient.index(product); } 

这个方案的优势很明显:

  • 吞吐量提升:MQ具有削峰填谷的作用,即使遇到突发的大量请求,也能很好地处理,系统可以承载万级的QPS(每秒查询率)。
  • 故障隔离:就算ES出现故障,也不会影响主业务流程的正常运行。

不过,它也有一些缺陷:

  • 消息堆积:如果遇到突发的大流量,消息可能会在队列中堆积,导致消费延迟。这时候就需要监控Lag值(表示消息堆积的程度),及时发现和解决问题。
  • 顺序性问题:为了保证同一数据的处理顺序正确,需要通过设置分区键来确保消息按顺序被消费。

三、Logstash定时拉取方案

在用户行为日志的T+1分析场景中,比如分析前一天用户在网站上的操作记录,就可以使用Logstash定时拉取方案。这个方案的好处是对现有系统的侵入性很低,但是缺点也很明显,就是延迟比较高。

下面是它的配置示例:

input { jdbc{ jdbc_driver=>"com.mysql.jdbc.Driver" jdbc_url=>"jdbc:mysql://localhost:3306/log_db" schedule=>"*/5 * * * *" # 每5分钟执行一次查询 statement=>"SELECT * FROM user_log WHERE update_time > :sql_last_value" } } output{ elasticsearch{ hosts=>["es-host:9200"] index=>"user_logs" } } 

这个方案的优点是不需要对现有代码进行改造,特别适合用来迁移历史数据。但它的致命缺点也不容忽视:

  • 延迟较高:数据同步有分钟级的延迟,没办法满足实时搜索的需求。
  • 全表扫描压力大:每次查询都可能涉及全表扫描,给数据库带来很大压力。虽然可以通过优化增量字段索引来缓解,但这也增加了一定的工作量。

最近我建了一些工作内推群,覆盖了各大城市,无论是HR还是正在找工作的小伙伴都可以进群交流。群里已经收集了不少内推岗位信息,如果感兴趣,大家可以加我的微信:li_su223,备注所在城市,就能进群啦。

四、Canal监听Binlog方案

在社交平台,像微博热搜更新这样需要实时搜索的场景中,Canal监听Binlog方案就非常合适。这个方案用到的技术栈主要有Canal、RocketMQ和ES,它的实时性很强,对现有系统的侵入性也比较低。

下面是关键配置:

# canal.properties canal.instance.master.address=127.0.0.1:3306 canal.mq.topic=canal.es.sync 

在使用这个方案的时候,也有一些需要注意的地方:

  • 数据漂移:在数据库发生DDL(数据定义语言)变更时,可能会出现数据不一致的情况。这时候就需要通过Schema Registry来管理映射关系,保证数据的一致性。
  • 幂等消费:为了避免重复写入数据,需要通过_id这样的唯一键来保证消费的幂等性,也就是同样的数据只会被处理一次。

五、DataX批量同步方案

如果要把历史订单数据从分库分表的MySQL迁移到ES,DataX批量同步方案是个不错的选择,它是大数据迁移的首选方案。

下面是它的配置文件示例:

{ "job":{ "content":[{ "reader":{ "name":"mysqlreader", "parameter":{"splitPk":"id","querySql":"SELECT * FROM orders"} }, "writer":{ "name":"elasticsearchwriter", "parameter":{"endpoint":"http://es-host:9200","index":"orders"} } }] } } 

为了让这个方案的性能更好,可以从以下两个方面进行调优:

  • 调整channel:适当增加channel数可以提升并发处理能力,建议将其与ES的分片数对齐,这样能更好地发挥系统性能。
  • 启用limit分批查询:通过设置limit进行分批查询,可以避免因为数据量过大导致内存溢出(OOM)的问题。

六、Flink流处理方案

当商品价格发生变更时,需要结合用户画像来计算实时推荐评分,这种复杂的ETL(抽取、转换、加载)场景就很适合用Flink流处理方案。

下面是一段代码示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new CanalSource()) .map(record -> parseToPriceEvent(record)) .keyBy(event -> event.getProductId()) .connect(userProfileBroadcastStream) .process(new PriceRecommendationProcess()) .addSink(new ElasticsearchSink()); 

这个方案的优势在于:

  • 状态管理:借助Watermark机制,它可以精准处理乱序事件,保证数据处理的准确性。
  • 维表关联:通过Broadcast State,能够实现实时画像关联,满足复杂业务的需求。

方案对比与选型建议

为了让大家更清楚地了解这6种方案的差异,方便在实际工作中做出选择,我做了一张对比表格:

方案实时性侵入性复杂度适用阶段
同步双写秒级小型单体项目
MQ异步秒级中型分布式系统
Logstash分钟级离线分析
Canal毫秒级高并发生产环境
DataX小时级历史数据迁移
Flink毫秒级极高实时数仓

根据不同团队的情况,我给大家一些选型建议:

  • 如果团队没有运维中间件的能力,那么可以选择Logstash或者同步双写方案,虽然它们有各自的局限性,但相对简单易操作。
  • 如果对延迟要求是秒级,并且允许对现有系统进行一定改造,那么MQ异步结合本地事务表的方案是个不错的选择。
  • 如果追求极致的实时性,而且资源充足,那么Canal和Flink搭配使用,可以提供更可靠的保障。

最后,要是这篇文章对大家有所帮助,或者给了你们一些启发,麻烦持续关注本站哦~