仿牌网站优化wordpress如何修改电子邮箱
2026/4/18 0:39:45 网站建设 项目流程
仿牌网站优化,wordpress如何修改电子邮箱,甘肃住房与城乡建设厅网站,现在网站建设还用测浏览器吗?? 当前需要处理的业务场景: 将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询. 同步数据到 es. 概述 1. 什么是 CDC 2. 什么是 Flink CDC 3. Flink CDC Connectors 和 Flink 的版本映射 实战 1. 宽表查询 1.1 …?? 当前需要处理的业务场景:将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询.同步数据到 es.概述1. 什么是 CDC2. 什么是 Flink CDC3. Flink CDC Connectors 和 Flink 的版本映射实战1. 宽表查询1.1 创建 mysql 表1.2 启动 Flink 集群和 Flink SQL CLI1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表1.4 关联订单数据并且将其写入 Elasticsearch 中1.5 修改 MySQL 中的数据本文用到的 mysql 版本为 8.0.28ES 版本为 7.17.3flink 版本为 1.13.6flink-sql-connector-mysql-cdc 版本为 2.2.1flink-sql-connector-elasticsearch 版本为 7_2.11-1.13.2概述1. 什么是 CDCCDC (Change Data Capture) 是 变更数据获取的简称。核心思想是监测并捕获数据库的变动(数据或数据表的插入、更新以及删除等)将这些变更按发生的顺序完整地记录下来写入到消息中间件中以供其他服务进行订阅并消费。2. 什么是 Flink CDCFlink 社区开发了 flink-cdc-connectors 组件这个一个可以直接从 MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source 组件。3. Flink CDC Connectors 和 Flink 的版本映射Flink CDC Conectors VersionFlink Version1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.*1.13.*实战1. 宽表查询在商城项目中商品、订单、物流的数据往往是存储在 MySQL 中为了加速查询的效率可以将数据组织成一张宽表并实时地把它写到 ElasticSearch 中。确保要监听的 mysql 服务器开启了 binlog 和对应监听 binlog 的账号有相对应的权限。1.1 创建 mysql 表-- MySQL CREATE DATABASE mydb; USE mydb; CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT 101; ? INSERT INTO products VALUES (default,scooter,Small 2-wheel scooter), ? ? ? (default,car battery,12V car battery), ? ? ? (default,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3), ? ? ? (default,hammer,12oz carpenters hammer), ? ? ? (default,hammer,14oz carpenters hammer), ? ? ? (default,hammer,16oz carpenters hammer), ? ? ? (default,rocks,box of assorted rocks), ? ? ? (default,jacket,water resistent black wind breaker), ? ? ? (default,spare tire,24 inch spare tire); ? CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL -- Whether order has been placed ) AUTO_INCREMENT 10001; ? INSERT INTO orders VALUES (default, 2020-07-30 10:08:22, Jark, 50.50, 102, false), ? ? ? (default, 2020-07-30 10:11:09, Sally, 15.00, 105, false), ? ? ? (default, 2020-07-30 12:00:30, Edward, 25.25, 106, false); ? ? ? CREATE TABLE shipments ( shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_id INTEGER NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ) AUTO_INCREMENT 10001; INSERT INTO shipments VALUES (default,10001,Beijing,Shanghai,false), ? ? ? (default,10002,Hangzhou,Shanghai,false), ? ? ? (default,10003,Shanghai,Hangzhou,false);1.2 启动 Flink 集群和 Flink SQL CLI# 进入 Flink 目录 cd flink-13.6 ? # 启动 Flink 集群.启动成功的话可以在 http://ip:8081/ 访问到对应的 Flink Web UI ./bin/start-cluster.sh ? # 启动 Flink SQL CLI ./bin/sql-client.sh1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表首先开启checkpoint, 每隔 3s 做一次checkpoint.-- Flink SQL Flink SQL SET execution.checkpointing.interval 3s;然后对于数据库中的表products、orders、shipments使用 Flink SQL CLI 创建对应的表用于同步这些底层数据库表的数据。-- Flink SQL Flink SQL CREATE TABLE products ( ? id INT, ? name STRING, ? description STRING, ? ?PRIMARY KEY (id) NOT ENFORCED ?) WITH ( ? ?connector mysql-cdc, ? ?hostname 192.168.110.100, ? ?port 3380, ? ?username root, ? ?password root, ? ?database-name flink_db, ? ?table-name products ?); ? ? Flink SQL CREATE TABLE orders ( ? order_id INT, ? order_date TIMESTAMP(0), ? customer_name STRING, ? price DECIMAL(10, 5), ? product_id INT, ? order_status BOOLEAN, ? PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( ? connector mysql-cdc, ? ?hostname 192.168.110.100, ? ?port 3380, ? ?username root, ? ?password root, ? ?database-name flink_db, ? table-name orders ); ? ? Flink SQL CREATE TABLE shipments ( ? shipment_id INT, ? order_id INT, ? origin STRING, ? destination STRING, ? is_arrived BOOLEAN, ? PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( ? connector mysql-cdc, ? ?hostname 192.168.110.100, ? ?port 3380, ? ?username root, ? ?password root, ? ?database-name flink_db, ? table-name shipments );最后创建enriched_orders表用来将关联后的订单数据写入 Elasticsearch 中-- Flink SQL Flink SQL CREATE TABLE enriched_orders ( ? order_id INT, ? order_date TIMESTAMP(0), ? customer_name STRING, ? price DECIMAL(10, 5), ? product_id INT, ? order_status BOOLEAN, ? product_name STRING, ? product_description STRING, ? shipment_id INT, ? origin STRING, ? destination STRING, ? is_arrived BOOLEAN, ? PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( ? ? connector elasticsearch-7, ? ? hosts http://114.132.43.99:9200, ? ? index enriched_orders );1.4 关联订单数据并且将其写入 Elasticsearch 中使用 Flink SQL 将订单表order与商品表products物流信息表shipments关联并将关联后的订单信息写入 Elasticsearch 中。-- Flink SQL Flink SQL INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id p.id LEFT JOIN shipments AS s ON o.order_id s.order_id;现在可以通过 Kibana 查询包含商品和物流信息的订单数据。首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index patternenriched_orders.然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.1.5 修改 MySQL 中的数据--MySQL INSERT INTO orders VALUES (default, 2020-07-30 15:22:00, Jark, 29.71, 104, false); ? INSERT INTO shipments VALUES (default,10004,Shanghai,Beijing,false);每执行一步就刷新一次 Kibana可以看到 Kibana 中显示的订单数据将实时更新.

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询