详解MySQL同步ES的6种方案
如今的分布式架构里,MySQL和Elasticsearch(也就是ES)结合使用,已经成为解决高并发查询和复杂检索问题的常用组合。不过,要实现这两者之间高效的数据同步,可不是一件容易的事儿,它可是架构设计中必须要攻克的难题。今天,我就来和大家详细聊聊MySQL同步ES的6种主流方案,不仅会结合代码示例,还会讲讲具体的场景案例,帮大家避开常见的“坑”,在技术选型时做出最适合的选择。
一、同步双写方案
这个方案适用于那些对数据实时性要求特别高,而且业务逻辑比较简单的场景,就好比金融交易记录的同步,每一笔交易数据都要及时准确地记录和同步。在实际操作中,是在业务代码里同时往MySQL和ES里写入数据。代码示例如下:
@Transactional public void createOrder(Order order) { // 先把订单数据写入MySQL数据库 orderMapper.insert(order); // 紧接着,将相同的订单数据同步写入ES IndexRequest request = new IndexRequest("orders") .id(order.getId()) .source(JSON.toJSONString(order), XContentType.JSON); client.index(request, RequestOptions.DEFAULT); }
虽然这个方案能快速实现数据同步,但它也存在一些明显的缺点:
- 硬编码侵入问题:只要涉及到数据写入操作,就必须添加往ES写入的逻辑代码,这会让代码变得很繁琐,而且后期维护起来也比较麻烦。
- 性能瓶颈:同时往两个地方写数据,会让事务处理的时间变长,系统的TPS(每秒事务处理量)会下降30%以上,影响系统的整体性能。
- 数据一致性风险:要是往ES写入数据失败了,就可能出现数据不一致的情况。为了解决这个问题,往往需要引入补偿机制,比如建立本地事务表,再配合定时重试的方式来保证数据最终一致。
二、异步双写方案
假设在电商场景中,订单状态更新后,需要同步到ES,方便客服系统进行检索,这时候异步双写方案就派上用场了。这个方案借助MQ(消息队列)来实现解耦,让系统的各个部分可以更独立地运行。它的架构图大概是这样:业务服务先把数据写入MySQL,然后发送消息到Kafka,消费服务从Kafka读取消息,再将数据写入ES。
下面是具体的代码示例:
// 生产者端:更新产品信息时,将数据写入MySQL并发送消息到Kafka public void updateProduct(Product product) { productMapper.update(product); kafkaTemplate.send("product-update", product.getId()); } // 消费者端:监听Kafka的指定主题,获取产品ID后从MySQL查询数据并写入ES @KafkaListener(topics = "product-update") public void syncToEs(String productId) { Product product = productMapper.selectById(productId); esClient.index(product); }
这个方案的优势很明显:
- 吞吐量提升:MQ具有削峰填谷的作用,即使遇到突发的大量请求,也能很好地处理,系统可以承载万级的QPS(每秒查询率)。
- 故障隔离:就算ES出现故障,也不会影响主业务流程的正常运行。
不过,它也有一些缺陷:
- 消息堆积:如果遇到突发的大流量,消息可能会在队列中堆积,导致消费延迟。这时候就需要监控Lag值(表示消息堆积的程度),及时发现和解决问题。
- 顺序性问题:为了保证同一数据的处理顺序正确,需要通过设置分区键来确保消息按顺序被消费。
三、Logstash定时拉取方案
在用户行为日志的T+1分析场景中,比如分析前一天用户在网站上的操作记录,就可以使用Logstash定时拉取方案。这个方案的好处是对现有系统的侵入性很低,但是缺点也很明显,就是延迟比较高。
下面是它的配置示例:
input { jdbc{ jdbc_driver=>"com.mysql.jdbc.Driver" jdbc_url=>"jdbc:mysql://localhost:3306/log_db" schedule=>"*/5 * * * *" # 每5分钟执行一次查询 statement=>"SELECT * FROM user_log WHERE update_time > :sql_last_value" } } output{ elasticsearch{ hosts=>["es-host:9200"] index=>"user_logs" } }
这个方案的优点是不需要对现有代码进行改造,特别适合用来迁移历史数据。但它的致命缺点也不容忽视:
- 延迟较高:数据同步有分钟级的延迟,没办法满足实时搜索的需求。
- 全表扫描压力大:每次查询都可能涉及全表扫描,给数据库带来很大压力。虽然可以通过优化增量字段索引来缓解,但这也增加了一定的工作量。
最近我建了一些工作内推群,覆盖了各大城市,无论是HR还是正在找工作的小伙伴都可以进群交流。群里已经收集了不少内推岗位信息,如果感兴趣,大家可以加我的微信:li_su223,备注所在城市,就能进群啦。
四、Canal监听Binlog方案
在社交平台,像微博热搜更新这样需要实时搜索的场景中,Canal监听Binlog方案就非常合适。这个方案用到的技术栈主要有Canal、RocketMQ和ES,它的实时性很强,对现有系统的侵入性也比较低。
下面是关键配置:
# canal.properties canal.instance.master.address=127.0.0.1:3306 canal.mq.topic=canal.es.sync
在使用这个方案的时候,也有一些需要注意的地方:
- 数据漂移:在数据库发生DDL(数据定义语言)变更时,可能会出现数据不一致的情况。这时候就需要通过Schema Registry来管理映射关系,保证数据的一致性。
- 幂等消费:为了避免重复写入数据,需要通过
_id
这样的唯一键来保证消费的幂等性,也就是同样的数据只会被处理一次。
五、DataX批量同步方案
如果要把历史订单数据从分库分表的MySQL迁移到ES,DataX批量同步方案是个不错的选择,它是大数据迁移的首选方案。
下面是它的配置文件示例:
{ "job":{ "content":[{ "reader":{ "name":"mysqlreader", "parameter":{"splitPk":"id","querySql":"SELECT * FROM orders"} }, "writer":{ "name":"elasticsearchwriter", "parameter":{"endpoint":"http://es-host:9200","index":"orders"} } }] } }
为了让这个方案的性能更好,可以从以下两个方面进行调优:
- 调整
channel
数:适当增加channel
数可以提升并发处理能力,建议将其与ES的分片数对齐,这样能更好地发挥系统性能。 - 启用
limit
分批查询:通过设置limit
进行分批查询,可以避免因为数据量过大导致内存溢出(OOM)的问题。
六、Flink流处理方案
当商品价格发生变更时,需要结合用户画像来计算实时推荐评分,这种复杂的ETL(抽取、转换、加载)场景就很适合用Flink流处理方案。
下面是一段代码示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new CanalSource()) .map(record -> parseToPriceEvent(record)) .keyBy(event -> event.getProductId()) .connect(userProfileBroadcastStream) .process(new PriceRecommendationProcess()) .addSink(new ElasticsearchSink());
这个方案的优势在于:
- 状态管理:借助Watermark机制,它可以精准处理乱序事件,保证数据处理的准确性。
- 维表关联:通过Broadcast State,能够实现实时画像关联,满足复杂业务的需求。
方案对比与选型建议
为了让大家更清楚地了解这6种方案的差异,方便在实际工作中做出选择,我做了一张对比表格:
方案 | 实时性 | 侵入性 | 复杂度 | 适用阶段 |
---|---|---|---|---|
同步双写 | 秒级 | 高 | 低 | 小型单体项目 |
MQ异步 | 秒级 | 中 | 中 | 中型分布式系统 |
Logstash | 分钟级 | 无 | 低 | 离线分析 |
Canal | 毫秒级 | 无 | 高 | 高并发生产环境 |
DataX | 小时级 | 无 | 中 | 历史数据迁移 |
Flink | 毫秒级 | 低 | 极高 | 实时数仓 |
根据不同团队的情况,我给大家一些选型建议:
- 如果团队没有运维中间件的能力,那么可以选择Logstash或者同步双写方案,虽然它们有各自的局限性,但相对简单易操作。
- 如果对延迟要求是秒级,并且允许对现有系统进行一定改造,那么MQ异步结合本地事务表的方案是个不错的选择。
- 如果追求极致的实时性,而且资源充足,那么Canal和Flink搭配使用,可以提供更可靠的保障。
最后,要是这篇文章对大家有所帮助,或者给了你们一些启发,麻烦持续关注本站哦~