最近一次数据迁移, 需要将 MySQL 的数据导出, 处理后导入到新表和 ES. 这里做个简单记录, 方便后续查询.
注: 为了写文章方便及隐私安全, 实际内容会有所简化. 例如表结构简化, 数据库连接部分全部用 xxx 表示, 目录及文件名均为化名等.
实践过程
原表:
book_db 库
- b_book(id,create_time,update_time,price,title,intro)
新表:
book 库
- - book(id,price,title,create_time,update_time)
- - book_ext(id,book_id,intro,create_time)
MySQL 导出
- mkdir -p /tmp/
- # 导出原始数据
- MySQL -hxxx -uxxx -pxxx book_db --default-character-set=utf8 -e 'select id,create_time,update_time,price,title,intro from b_book' | sed 's/NULL//g'> /tmp/b_book.CSV
sed 's/NULL//g'是因为导出的数据有些字段存的 NULL, 新表不需要存储 NULL, 所以去掉.
导出的数据每行默认以 \ t 分隔, 第一行包含字段名. 这里我们删掉第一行:
sed -i '1d' /tmp/b_book.CSV
数据处理
- cd /tmp/
- # 处理 create_time,update_time,price, 并生成文件 book.CSV
- cat b_book.CSV | awk -F '\t' -v OFS='@@@' '{gsub(/[-:]/," ",$2); $2=mktime($2);gsub(/[-:]/,"",$3);$3=mktime($3);$4=$4*100;$6="";print $0}'> book.CSV
- # 生成文件 book_ext.CSV
- cat b_book.CSV | awk -F '\t' -v OFS='@@@' '{print $1,$6}'> book_ext.CSV
- # 生成文件 book_es.CSV
- cat b_book.CSV | awk -F '\t' -v OFS='@@@' '{$4=$4*100;print $0}'> book_es.CSV
因为原表里时间都是 datetime 格式, 新表是时间戳格式, 这里处理成时间戳格式. 价格原表是以元为单位, 这里 * 100 是为了处理成以分为单位.
-v OFS='@@@'表示输出的时候每列以 @@@为分隔符. 原因是原表里的 intro 字段存储的是 html, 可能包含常用转义字符, 这里使用 @@@确保能正确分隔每列.
导入到 MySQL
- MySQL -hxxx -uxxx -pxxx book
- Load Data LOCAL InFile '/tmp/book.csv' Into Table book
- character set utf8
- Fields Terminated By '@@@' Enclosed By ''Escaped By'' Lines Terminated By '\n'
- (id,create_time,update_time,price,title);
- Load Data LOCAL InFile '/tmp/book_ext.csv' Into Table book_ext
- character set utf8
- Fields Terminated By '@@@' Enclosed By ''Escaped By'' Lines Terminated By '\n'
- (book_id,intro);
说明:
Terminated 字段分隔符(列分隔符). 一般是空格或者 \ t
Enclosed 字段括起字符. 没有为空字符即可
Escaped 转义字符. 没有为空字符即可
Terminated 记录分隔符(行结束符)
Into Table 代表插入, 记录已存在 (唯一键约束) 则失败不再往下执行. Replace Into Table 代表覆盖, 记录已存在则覆盖(是整条记录覆盖, 没有列出的字段给默认值).Ignore Into Table 遇到已存在直接跳过.
导入到 ES
由于生产的 book_es.CSV 文件比较大, 所以这里按 20000 条生成一个文件, 防止文件过大, ES 导入失败.
- cd /tmp/
- awk '{filename ="book_es.csv."int((NR-1)/20000)".csv"; print>> filename}' book_es.CSV
ConvertBookToEs.PHP 是 PHP 脚本, 生成 ES 批量导入的文件. 见附录. 执行后生成很多 book_es.CSV.*.CSV.JSON 文件.
PHP ConvertBookToEs.PHP
importToEs.sh 是 ES 批量导入脚本, 如下:
- #!/bin/bash
- for file in `ls /tmp/book_es.csv.*.csv.json`
- do
- echo $file;
- curl -XPOST http://xxx:9200/book/doc/_bulk -H "Content-Type: application/json" --data-binary "@$file">> importToEs.log
- done
执行脚本:
sh importToEs.sh
等待数分钟, 便执行完毕了.
实现 MySQL LOAD DATA 按字段更新
为了将大量数据加载到 MySQL 中, LOAD DATA INFILE 是迄今为止最快的选择. 但是, 虽然这可以以 INSERT IGNORE 或 REPLACE 的方式使用, 但目前不支持 ON DUPLICATE KEY UPDATE.
如果我们想批量更新某个字段, ON DUPLICATE KEY UPDATE 如何使用 LOAD DATA INFILE 模拟?
Stack Overflow 上有网友给了答案. 步骤是:
1)创建一个新的临时表.
CREATE TEMPORARY TABLE temporary_table LIKE target_table;
2)从临时表中删除所有索引以加快速度.(可选)
- SHOW INDEX FROM temporary_table;
- DROP INDEX `PRIMARY` ON temporary_table;
- DROP INDEX `some_other_index` ON temporary_table;
3)将 CSV 加载到临时表中
- LOAD DATA INFILE 'your_file.csv'
- INTO TABLE temporary_table
- Fields Terminated By '\t' Enclosed By ''Escaped By'' Lines Terminated By '\n'
- (field1, field2);
4)使用 ON DUPLICATE KEY UPDATE 复制数据
- SHOW COLUMNS FROM target_table;
- INSERT INTO target_table
- SELECT * FROM temporary_table
- ON DUPLICATE KEY UPDATE field1 = VALUES(field1), field2 = VALUES(field2);
MySQL 将假定 = 之前的部分引用 INSERT INTO 子句中指定的列, 第二部分引用 SELECT 列.
5)删除临时表
DROP TEMPORARY TABLE temporary_table;
使用 SHOW INDEX FROM 和 SHOW COLUMNS FROM 此过程可以针对任何给定的表自动执行.
注: 官方文档里 INSERT ... SELECT ON DUPLICATE KEY UPDATE 语句被标记为基于语句的复制不安全. 所以上述方案请在充分测试后再实施. 详见:
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
附录
ConvertBookToEs.PHP
- <?PHP
- /**
- * 转换 wish_book 为 ES 批量格式(JSON)
- */
- //id,create_time,update_time,price,title,intro
- function dealBook($file)
- {
- $fp = fopen($file, 'r');
- while (!feof($fp)) {
- $line = explode('@@@', fgets($fp, 65535));
- if ($line && isset($line[1])) {
- $arr_head = [
- 'index' => [
- '_id' => (int)$line[0]
- ]
- ];
- $arr = [
- 'id' => (int)$line[0],
- 'create_time' => strtotime($line[1]),
- 'update_time' => strtotime($line[2]),
- 'price' => intval($line[3]),
- 'title' => (string)$line[4],
- 'intro' => (string)$line[18],
- ];
- file_put_contents($file . '.json', json_encode($arr_head, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND);
- file_put_contents($file . '.json', json_encode($arr, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND);
- }
- }
- }
- try {
- // 处理 CSV 文件为 es bluk JSON 格式
- // 参考 https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
- $files = glob("/tmp/book_es.csv.*.csv");
- if (false === $files) {
- exit("can not find csv file");
- }
- $pids = [];
- foreach ($files as $i => $file) {
- $pid = pcntl_fork();
- if ($pid <0) {
- exit("could not fork");
- }
- if ($pid> 0) {
- $pids[$pid] = $pid;
- } else {
- echo time() . "new process, pid:" . getmypid() . PHP_EOL;
- dealBook($file);
- exit();
- }
- }
- while (count($pids)) {
- foreach ($pids as $key => $pid) {
- $res = pcntl_waitpid($pid, $status, WNOHANG);
- if ($res == -1 || $res> 0) {
- echo 'Child process exit,pid' . $pid . PHP_EOL;
- unset($pids[$key]);
- }
- }
- sleep(1);
- }
- } catch (Exception $e) {
- $message = $e->getFile() . ':' . $e->getLine() . ' ' . $e->getMessage();
- echo $message;
- }
参考
1,Linux 命令行文本工具 - 飞鸿影~ - 博客园
https://www.cnblogs.com/52fhy/p/5836429.html
2,mysqldump 导出 CSV 格式 --fields-terminated-by=, : 字段分割符; - superhosts 的专栏 - CSDN 博客
- https://blog.csdn.net/superhosts/article/details/26054997
- 3,Batch Processing | Elasticsearch Reference [6.4] | Elastic
- https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
4,MySQL 导入数据 load data infile 用法整理 - conanwang - 博客园
- https://www.cnblogs.com/conanwang/p/5890753.html
- 5,MySQL LOAD DATA INFILE with ON DUPLICATE KEY UPDATE - Stack Overflow
- https://stackoverflow.com/questions/15271202/mysql-load-data-infile-with-on-duplicate-key-update
6,MySQL - INSERT INTO ... SELECT FROM ... ON DUPLICATE KEY UPDATE - Stack Overflow
https://stackoverflow.com/questions/2472229/insert-into-select-from-on-duplicate-key-update
7,MySQL :: MySQL 5.6 参考手册:: 13.2.5.2 INSERT ... ON DUPLICATE KEY UPDATE 语法
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
8, 复制表结构和数据 SQL 语句 - becket - 博客园
https://www.cnblogs.com/zhengxu/articles/2206894.html
来源: https://www.cnblogs.com/52fhy/p/10051338.html