此方法只能基础实现或者测试canal是否能够监听到不适用于生产环境,如使用守护进程canal与php消费者在生产环境意外重启一旦数据有变动php消费者没有启动canal会占用大量内存堆满数据,并且守护进程在意外重启后可能存在反复启动canal。如要使用canal应先推入队列再进行消费。但目前看来canal要与业务以及es、mysql在同一台服务器上运行是不可能的,不使用守护进程canal不会自动重启但会在重启后残留一个进程。目前canal放一个单独服务器了增量同步还是要找一些其他方法。现在去研究一下Apache SeaTunnel。

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

下载canal(这里下载最新1.1.7)

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.1.17.tar.gz

解压

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

修改配置

vi conf/example/instance.properties

#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=2

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=你的服务器地址:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername=你的数据库账号
canal.instance.tsdb.dbPassword=你的数据库密码

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=ity.orders
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
#canal.mq.servers = 127.0.0.1:6379 # Redis服务器地址
#canal.mq.canalGetTimeout = 100 # 拉取消息的超时时间(毫秒)
#canal.mq.flatMessage = true # 使用FlatMessage格式
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml # 使用默认Spring配置
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################

启动Canal服务

因为ES所用java是最新版本JDK所以有些参数弃用了,需要修改配置文件

替换 /tmp/canal/bin/startup.sh中的
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
为
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"

进canal目录执行sh bin/startup.sh命令启动

在/tmp/canal下创建个example目录然后下载canal.example用于测试是否监控到binlog

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.example-1.1.17.tar.gz

不用任何配置直接启动sh example/bin/startup.sh

启动后查看example.log即可

使用canal-php组建消费者

创建SyncOrdersToES命令

php artisan make:command SyncOrdersToES

编写链接以及监听后的操作

app\console\comands\SyncOrdersToES.php

<?php

namespace App\Console\Commands;

use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Elastic\Elasticsearch\Client;
use Elastic\Elasticsearch\ClientBuilder;
use Illuminate\Console\Command;
use Predis\Client as RedisClient;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;

class SyncOrdersToES extends Command
{
    protected $signature = 'sync:orders-to-es';
    protected $description = 'Sync orders from MySQL to Elasticsearch using Canal data from Redis.';

    protected $esClient;

    public function __construct(Client $esClient)
    {
        parent::__construct();
        $this->esClient = $esClient;

    }

    public function handle()
    {
        $connector = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
        $connector->connect('127.0.0.1', 11111); // Canal Server 的地址和端口
        $connector->subscribe('1001', 'example', '.*\\..*'); // 客户端ID, Canal 实例名称, 订阅的数据库和表

        while (true) {
            $message = $connector->get(100); // 从 Canal 获取数据变更
            if ($entries = $message->getEntries()) {
                foreach ($entries as $entry) {
                    if ($entry->getEntryType() == EntryType::ROWDATA) {
                        $rowChange = new RowChange();
                        $rowChange->mergeFromString($entry->getStoreValue());
                        $tableName = $entry->getHeader()->getTableName();
                        if ($tableName == 'orders') { // 确保是 orders 表的变更
                            foreach ($rowChange->getRowDatas() as $rowData) {
                                // 根据事件类型,决定是使用 getBeforeColumns 还是 getAfterColumns
                                $columns = ($rowChange->getEventType() == EventType::DELETE) ?
                                    $rowData->getBeforeColumns() :
                                    $rowData->getAfterColumns();

                                $change = [
                                    'type' => $rowChange->getEventType(),
                                    'data' => $this->parseColumns($columns),
                                ];
                                // 打印变动数据
                                $this->info("Detected change in orders: " . json_encode($change));
                                $this->syncChangeToElasticsearch($change, $this->esClient);
                            }
                        }
                    }
                }
            }
            sleep(1); // 简单的流控制
        }
    }

    protected function syncChangeToElasticsearch($change, $esClient)
    {
        $indexParams = [
            'index' => 'orders',
            'id'    => $change['data']['id'],
            'body' => [
                'id'               => $change['data']['id'],
                'order_no'         => $change['data']['order_no'],
                'open_status'      => $change['data']['open_status'],
                'open_message'     => $change['data']['open_message'],
                'id_card_name'     => $change['data']['id_card_name'],
                'id_card'          => $change['data']['id_card'],
                'phonenum'         => $change['data']['phonenum'],
                'contact'          => $change['data']['contact'],
                'mobile'           => $change['data']['mobile'],
                'express_address'  => $change['data']['express_address'],
                'express_no'       => $change['data']['express_no'],
                'express_name'     => $change['data']['express_name'],
                'idcard_front'     => $change['data']['idcard_front'],
                'idcard_back'      => $change['data']['idcard_back'],
                'user_with_idcard' => $change['data']['user_with_idcard'],
                'pay_num'          => $change['data']['pay_num'],
                'express_status'   => $change['data']['express_status'],
                'status'           => $change['data']['status'],
                'hkgj_package_id'  => $change['data']['hkgj_package_id'],
                'package_id'       => $change['data']['package_id'],
                'bus_order_id'     => $change['data']['bus_order_id'],
                'package_name'     => $change['data']['package_name'],
                'supplier_id'      => $change['data']['supplier_id'],
                'fxuser_id'        => $change['data']['fxuser_id'],
                'fx2_id'           => $change['data']['fx2_id'],
                'tenant_id'        => $change['data']['tenant_id'],
                'anchor_id'        => $change['data']['anchor_id'],
                'pay_id'           => $change['data']['pay_id'],
                'live_room_id'     => $change['data']['live_room_id'],
                'created_at'       => date('c', strtotime($change['data']['created_at'])),
                'updated_at'       => date('c', strtotime($change['data']['updated_at'])),
                'deleted_at'       => date('c', strtotime($change['data']['deleted_at'])),
                'api_order_no'     => $change['data']['api_order_no'],
                'api_lock'         => $change['data']['api_lock'],
                'api_valid'        => $change['data']['api_valid'],
                'api_sync'         => $change['data']['api_sync'],
                'hk_sync'          => $change['data']['hk_sync'],
                'remark'           => $change['data']['remark'],
                'cid'              => $change['data']['cid'],
                'open_id'          => $change['data']['open_id'],
                'channe_id'        => $change['data']['channe_id'],
                'admin_user_id'    => $change['data']['admin_user_id'],
                'pay_status'       => $change['data']['pay_status'],
                'refund_text'      => $change['data']['refund_text'],
                'sms_status'       => $change['data']['sms_status'],
                'intercept_status' => $change['data']['intercept_status'],
                'intercept_msg'    => $change['data']['intercept_msg'],
                'nick'             => $change['data']['nick'],
                'buyer_openid'     => $change['data']['buyer_openid'],
                'author_id'        => $change['data']['author_id'],
                'author_name'      => $change['data']['author_name'],
                'dy_pacage_code'   => $change['data']['dy_pacage_code'],
                'dy_product_id'    => $change['data']['dy_product_id'],
                // 添加更多字段...
            ],
        ];

        switch ($change['type']) {
            case EventType::INSERT:
            case EventType::UPDATE:
                // 插入或更新事件
                $response = $esClient->index($indexParams);
                break;
            case EventType::DELETE:
                // 删除事件
                $esClient->delete([
                    'index' => 'orders',
                    'id'    => $change['data']['id'], // 假设 'id' 是文档ID
                ]);
                break;
        }
    }
    protected function parseColumns($columns)
    {
        $parsedData = [];
        foreach ($columns as $column) {
            // 假设 Column 类有 getName() 和 getValue() 方法来获取列名和列值
            $parsedData[$column->getName()] = $column->getValue();
        }
        return $parsedData;
    }

}

启动同步命令php artisan sync:orders-to-es