MySQL如何实时同步数据到ES?试试这款阿里开源的神器( 三 )

  • 添加配置文件canal-adapter/conf/es7/product.yml , 用于配置MySQL中的表与Elasticsearch中索引的映射关系;
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值destination: example# canal的instance或者MQ的topicgroupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据esMapping:_index: canal_product # es 的索引名称_id: _id# es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配sql: "SELECTp.id AS _id,p.title,p.sub_title,p.price,p.picFROMproduct p"# sql映射etlCondition: "where a.c_time>={}"#etl的条件参数commitBatch: 3000# 提交批大小
  • 使用startup.sh脚本启动canal-adapter服务;
sh bin/startup.sh
  • 启动成功后可使用如下命令查看服务日志信息;
tail -f logs/adapter/adapter.log20-10-26 16:52:55.148 [main] INFOc.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed2020-10-26 16:52:57.005 [main] INFOc.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 2020-10-26 16:52:57.376 [main] INFOc.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded2020-10-26 16:52:58.615 [main] INFOc.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed2020-10-26 16:52:58.651 [main] INFOc.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /mydata/canal-adapter/plugin2020-10-26 16:52:59.043 [main] INFOc.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed2020-10-26 16:52:59.044 [main] INFOc.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......2020-10-26 16:52:59.057 [Thread-4] INFOc.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============2020-10-26 16:52:59.100 [main] INFOorg.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]2020-10-26 16:52:59.153 [main] INFOorg.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read2020-10-26 16:52:59.590 [main] INFOo.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''2020-10-26 16:52:59.626 [main] INFOc.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 31.278 seconds (JVM running for 33.99)2020-10-26 16:52:59.930 [Thread-4] INFOc.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
  • 如果需要停止canal-adapter服务可以使用如下命令 。
sh bin/stop.sh数据同步演示
经过上面的一系列步骤 , canal的数据同步功能已经基本可以使用了 , 下面我们来演示下数据同步功能 。
  • 首先我们需要在Elasticsearch中创建索引 , 和MySQL中的product表相对应 , 直接在Kibana的Dev Tools中使用如下命令创建即可;
PUT canal_product{"mappings": {"properties": {"title": {"type": "text"},"sub_title": {"type": "text"},"pic": {"type": "text"},"price": {"type": "double"}}}}
MySQL如何实时同步数据到ES?试试这款阿里开源的神器文章插图
  • 创建完成后可以查看下索引的结构;
GET canal_product/_mapping
MySQL如何实时同步数据到ES?试试这款阿里开源的神器文章插图
  • 之后使用如下SQL语句在数据库中创建一条记录;
INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 5, '小米8', ' 全面屏游戏智能手机 6GB+64GB', 1999.00, NULL );
  • 创建成功后 , 在Elasticsearch中搜索下 , 发现数据已经同步了;
GET canal_product/_search
MySQL如何实时同步数据到ES?试试这款阿里开源的神器文章插图
  • 再使用如下SQL对数据进行修改;
UPDATE product SET title='小米10' WHERE id=5
  • 修改成功后 , 在Elasticsearch中搜索下 , 发现数据已经修改了;

MySQL如何实时同步数据到ES?试试这款阿里开源的神器文章插图
  • 再使用如下SQL对数据进行删除操作;
DELETE FROM product WHERE id=5
  • 删除成功后 , 在Elasticsearch中搜索下 , 发现数据已经删除了 , 至此MySQL同步到Elasticsearch的功能完成了!

MySQL如何实时同步数据到ES?试试这款阿里开源的神器文章插图
canal-admin使用