此方法只能基础实现或者测试canal是否能够监听到不适用于生产环境,如使用守护进程canal与php消费者在生产环境意外重启一旦数据有变动php消费者没有启动canal会占用大量内存堆满数据,并且守护进程在意外重启后可能存在反复启动canal。如要使用canal应先推入队列再进行消费。但目前看来canal要与业务以及es、mysql在同一台服务器上运行是不可能的,不使用守护进程canal不会自动重启但会在重启后残留一个进程。目前canal放一个单独服务器了增量同步还是要找一些其他方法。现在去研究一下Apache SeaTunnel。
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下载canal(这里下载最新1.1.7)
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.1.17.tar.gz
解压
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
修改配置
vi conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=2
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=你的服务器地址:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername=你的数据库账号
canal.instance.tsdb.dbPassword=你的数据库密码
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=ity.orders
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
#canal.mq.servers = 127.0.0.1:6379 # Redis服务器地址
#canal.mq.canalGetTimeout = 100 # 拉取消息的超时时间(毫秒)
#canal.mq.flatMessage = true # 使用FlatMessage格式
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml # 使用默认Spring配置
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################
启动Canal服务
因为ES所用java是最新版本JDK所以有些参数弃用了,需要修改配置文件
替换 /tmp/canal/bin/startup.sh中的
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
为
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
进canal目录执行sh bin/startup.sh
命令启动
在/tmp/canal下创建个example目录然后下载canal.example用于测试是否监控到binlog
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.example-1.1.17.tar.gz
不用任何配置直接启动sh example/bin/startup.sh
启动后查看example.log即可
使用canal-php组建消费者
创建SyncOrdersToES命令
php artisan make:command SyncOrdersToES
编写链接以及监听后的操作
app\console\comands\SyncOrdersToES.php
<?php
namespace App\Console\Commands;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Elastic\Elasticsearch\Client;
use Elastic\Elasticsearch\ClientBuilder;
use Illuminate\Console\Command;
use Predis\Client as RedisClient;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
class SyncOrdersToES extends Command
{
protected $signature = 'sync:orders-to-es';
protected $description = 'Sync orders from MySQL to Elasticsearch using Canal data from Redis.';
protected $esClient;
public function __construct(Client $esClient)
{
parent::__construct();
$this->esClient = $esClient;
}
public function handle()
{
$connector = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
$connector->connect('127.0.0.1', 11111); // Canal Server 的地址和端口
$connector->subscribe('1001', 'example', '.*\\..*'); // 客户端ID, Canal 实例名称, 订阅的数据库和表
while (true) {
$message = $connector->get(100); // 从 Canal 获取数据变更
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
if ($entry->getEntryType() == EntryType::ROWDATA) {
$rowChange = new RowChange();
$rowChange->mergeFromString($entry->getStoreValue());
$tableName = $entry->getHeader()->getTableName();
if ($tableName == 'orders') { // 确保是 orders 表的变更
foreach ($rowChange->getRowDatas() as $rowData) {
// 根据事件类型,决定是使用 getBeforeColumns 还是 getAfterColumns
$columns = ($rowChange->getEventType() == EventType::DELETE) ?
$rowData->getBeforeColumns() :
$rowData->getAfterColumns();
$change = [
'type' => $rowChange->getEventType(),
'data' => $this->parseColumns($columns),
];
// 打印变动数据
$this->info("Detected change in orders: " . json_encode($change));
$this->syncChangeToElasticsearch($change, $this->esClient);
}
}
}
}
}
sleep(1); // 简单的流控制
}
}
protected function syncChangeToElasticsearch($change, $esClient)
{
$indexParams = [
'index' => 'orders',
'id' => $change['data']['id'],
'body' => [
'id' => $change['data']['id'],
'order_no' => $change['data']['order_no'],
'open_status' => $change['data']['open_status'],
'open_message' => $change['data']['open_message'],
'id_card_name' => $change['data']['id_card_name'],
'id_card' => $change['data']['id_card'],
'phonenum' => $change['data']['phonenum'],
'contact' => $change['data']['contact'],
'mobile' => $change['data']['mobile'],
'express_address' => $change['data']['express_address'],
'express_no' => $change['data']['express_no'],
'express_name' => $change['data']['express_name'],
'idcard_front' => $change['data']['idcard_front'],
'idcard_back' => $change['data']['idcard_back'],
'user_with_idcard' => $change['data']['user_with_idcard'],
'pay_num' => $change['data']['pay_num'],
'express_status' => $change['data']['express_status'],
'status' => $change['data']['status'],
'hkgj_package_id' => $change['data']['hkgj_package_id'],
'package_id' => $change['data']['package_id'],
'bus_order_id' => $change['data']['bus_order_id'],
'package_name' => $change['data']['package_name'],
'supplier_id' => $change['data']['supplier_id'],
'fxuser_id' => $change['data']['fxuser_id'],
'fx2_id' => $change['data']['fx2_id'],
'tenant_id' => $change['data']['tenant_id'],
'anchor_id' => $change['data']['anchor_id'],
'pay_id' => $change['data']['pay_id'],
'live_room_id' => $change['data']['live_room_id'],
'created_at' => date('c', strtotime($change['data']['created_at'])),
'updated_at' => date('c', strtotime($change['data']['updated_at'])),
'deleted_at' => date('c', strtotime($change['data']['deleted_at'])),
'api_order_no' => $change['data']['api_order_no'],
'api_lock' => $change['data']['api_lock'],
'api_valid' => $change['data']['api_valid'],
'api_sync' => $change['data']['api_sync'],
'hk_sync' => $change['data']['hk_sync'],
'remark' => $change['data']['remark'],
'cid' => $change['data']['cid'],
'open_id' => $change['data']['open_id'],
'channe_id' => $change['data']['channe_id'],
'admin_user_id' => $change['data']['admin_user_id'],
'pay_status' => $change['data']['pay_status'],
'refund_text' => $change['data']['refund_text'],
'sms_status' => $change['data']['sms_status'],
'intercept_status' => $change['data']['intercept_status'],
'intercept_msg' => $change['data']['intercept_msg'],
'nick' => $change['data']['nick'],
'buyer_openid' => $change['data']['buyer_openid'],
'author_id' => $change['data']['author_id'],
'author_name' => $change['data']['author_name'],
'dy_pacage_code' => $change['data']['dy_pacage_code'],
'dy_product_id' => $change['data']['dy_product_id'],
// 添加更多字段...
],
];
switch ($change['type']) {
case EventType::INSERT:
case EventType::UPDATE:
// 插入或更新事件
$response = $esClient->index($indexParams);
break;
case EventType::DELETE:
// 删除事件
$esClient->delete([
'index' => 'orders',
'id' => $change['data']['id'], // 假设 'id' 是文档ID
]);
break;
}
}
protected function parseColumns($columns)
{
$parsedData = [];
foreach ($columns as $column) {
// 假设 Column 类有 getName() 和 getValue() 方法来获取列名和列值
$parsedData[$column->getName()] = $column->getValue();
}
return $parsedData;
}
}
启动同步命令php artisan sync:orders-to-es