mysql同步 clickhouse解决方案

clickhouse 设置时区

参考博客


参考文档

1
CREATE TABLE tmp ENGINE = MergeTree ORDER BY id AS SELECT * FROM mysql('hostip:3306','db','table','user','passwd');

可以先创建一个更具时间的临时表,然后同步完成后再删除

测试记录 【导入单张表】

1
2
3
4
5
6
7
8


drop table if  exists account0_test


# 导入单表的所有数据
CREATE TABLE account0_test ENGINE = MergeTree
	ORDER BY id AS SELECT * FROM mysql('localhost:3306','这是我的数据库名字','t_account','这是用户','这是密码');

获取mysql所有的表

1
2
3

 SELECT group_concat(TABLE_NAME,',')  FROM information_schema.TABLES t 
WHERE TABLE_SCHEMA =  '你的数据库名字'

生成所有的迁移命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
	"fmt"
	"strings"
)

var tables = "t_user,t_table0"

func genereateSqls(table string) string {
	var s = fmt.Sprintf("CREATE TABLE tmp ENGINE = MergeTree ORDER BY id AS SELECT * FROM mysql('host:ip','a','%s','test_user','密码');", table)
	return s

}
func main() {
	s := strings.Split(tables, ",")
	for _, v := range s {
		// fmt.Println(v)
		fmt.Println(genereateSqls(v))
	}
}

一些坑

sql 查询 如果是原类型是字符串,如果用 数字就会直接报错, 原系统写的 sql都不能用

查看数据库占用的方法

查看数据库容量

1
2
3
4
5
6
7
select
    sum(rows) as `总行数`,--总行数
    formatReadableSize(sum(data_uncompressed_bytes)) as `原始大小`,--原始大小
    formatReadableSize(sum(data_compressed_bytes)) as `压缩大小` ,--压缩大小
    round(sum(data_compressed_bytes) / sum(data_uncompressed_bytes) * 100, 0) `压缩率`
from system.parts
   

查看数据库分区信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
select
    database,
    table,
    formatReadableSize(size) as size,
    formatReadableSize(bytes_on_disk) as bytes_on_disk,
    formatReadableSize(data_uncompressed_bytes) as data_uncompressed_bytes,
    formatReadableSize(data_compressed_bytes) as data_compressed_bytes,
    compress_rate,
    rows,
    days,
    formatReadableSize(avgDaySize) as avgDaySize
from
(
    select
        database,
        table,
        sum(bytes) as size,
        sum(rows) as rows,
        min(min_date) as min_date,
        max(max_date) as max_date,
        sum(bytes_on_disk) as bytes_on_disk,
        sum(data_uncompressed_bytes) as data_uncompressed_bytes,
        sum(data_compressed_bytes) as data_compressed_bytes,
        (data_compressed_bytes / data_uncompressed_bytes) * 100 as compress_rate,
        max_date - min_date as days,
        size / (max_date - min_date) as avgDaySize
    from system.parts
    where active 
     and database = 't'
   --  and table = 'tablename'
    group by
        database,
        table
)