数据迁移

数据迁移流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#先将上次的备份数据清空
delete /logger_mes_bak
delete /logger_mcs_bak
delete /logger_mcs_command_bak
delete /logger_mcs_exception_bak

# 备份索引
PUT /logger_mcs_exception_bak
{
"settings": {
"number_of_shards": 6,
"number_of_replicas": 0,
"refresh_interval": -1
},
"mappings" : {
"properties" : {
"carrier" : {
"type" : "keyword"
},
"createTime" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss.SSS || yyyy-MM-dd HH:mm:ss"
},
"data" : {
"type" : "keyword"
},
"detail" : {
"type" : "keyword"
},
"device" : {
"type" : "keyword"
},
"logType" : {
"type" : "keyword"
},
"vehicle" : {
"type" : "keyword"
}
}
}
}

# 旧索引数据拷贝至备份索引,reindex多参数配置,进行异步操作
POST /_reindex
{
"source": {
"index": "logger_mcs_exception"
},
"dest": {
"index": "logger_mcs_exception_bak"
}
}

# 刷新备份索引
PUT /logger_mcs_exception_bak/_settings
{
"refresh_interval": "1s",
"number_of_replicas": 1
}

# 删除旧索引
DELETE /logger_mcs_exception

# 创建新索引
PUT /logger_mcs_exception
{
"settings": {
"number_of_shards": 6,
"number_of_replicas": 0,
"refresh_interval": -1
},
"mappings" : {
"properties" : {
"carrier" : {
"type" : "keyword"
},
"createTime" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss.SSS || yyyy-MM-dd HH:mm:ss"
},
"data" : {
"type" : "keyword"
},
"detail" : {
"type" : "keyword"
},
"device" : {
"type" : "keyword"
},
"logType" : {
"type" : "keyword"
},
"vehicle" : {
"type" : "keyword"
}
}
}
}

# 备份数据拷贝到新索引
POST /_reindex
{
"source": {
"index": "logger_mcs_exception_bak"
},
"dest": {
"index": "logger_mcs_exception"
}
}

# 刷新新索引
PUT /logger_mcs_exception/_settings
{
"refresh_interval": "1s",
"number_of_replicas": 1
}

reindex 数据迁移

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-reindex.html#docs-reindex-api-request-body

script 使用脚本操作,有两个配置项,source:脚本内容,lang:脚本语言类型(默认实现是painless),详情见 官方文档 Request body

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 该语句执行成功后 右侧会出现"task" : "任务id",把任务id记录下来,不要左右引号
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "logger_mes"
},
"dest": {
"index": "logger_mes_bak"
},
"script": {
"source": "ctx._source.remove(\"returnCode\");ctx._source.remove(\"returnMessage\")"
}
}

# 把<task-id>换成上一步操作得到的任务id,不要引号,执行后可在右侧查看迁移情况,"completed" : true代表完成迁移
GET _tasks/<task-id>

# 如果es集群负载过大,可通过任务id停止操作
POST _tasks/<task-id>/_cancel

reindex 来源数据和终点数据可进行查询配置迁移的内容,详情见 官方文档 Request body

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
POST /_reindex
{
"source": {
"index": "logger_mes",
"query": {
"range": {
"createTime": {
"gte": ,
"lte":
}
}
}
},
"dest": {
"index": "logger_mes_bak"
}
}

常用API

delete by query

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-update-by-query.html

scroll_size:单次内存操作数据量

slices:异步线程,最好配置为auto,es会自动分配和分片数一致,异步线程数大于分片数反而会导致性能低下

wait_for_completion:异步执行操作,请求response为task-id,查询任务跟进请求执行情况

refresh:请求执行成功后刷新所有受影响的分片

1
2
3
4
5
6
7
8
9
10
POST /<index_name>/_delete_by_query?scroll_size=5000&slices=10&wait_for_completion=false&refresh=true
{
"query": {
"range": {
"date": {
"lt": ""
}
}
}
}

使用java进行code实现时需注意(RestHighLevelClient)

1
2
3
submitDeleteByQueryTask wait_for_completion = false es实现的异步操作,返回值taskid,可根据id查看
deleteByQueryAsync wait_for_completion = true 代码层面实现异步
deleteByQuery wait_for_completion = true 同步删除
1
2
3
4
5
6
7
// 原生api翻译一下,执行任务有上述三种方法,代码实现多了一种异步
DeleteByQueryRequest request = new DeleteByQueryRequest(index);
request.setBatchSize(5000);
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
request.setRefresh(true);

TaskSubmissionResponse taskSubmissionResponse = esClient.submitDeleteByQueryTask(request, RequestOptions.DEFAULT);

这里需要注意的时BatchSize参数对应scroll_size,虽然注释有写,但是初看代码时有疑惑,因为这个参数实际赋值如下

1
2
3
4
public DeleteByQueryRequest setBatchSize(int size) {
getSearchRequest().source().size(size);
return this;
}

使用的时size参数,刚好我在代码实现时有考虑类似数据库限制每次操作的数据量,因为query时就有这个size参数,所以自己看源码时就有疑问,deleteByQuery是否支持size限制数据量,BatchSize和scroll_size是否为同一个字段,下面我来解密

1
2
3
4
// RequestConverters.class#prepareDeleteByQueryRequest(...) 598
if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize()));
}

这里我们就能看到实际在deleteByQuery时使用这个字段的值赋给了scroll_size,本来这个字段是query的通用字段,但是在es对delete的实现中封装了一层,构造请求时分成select、update、delete三种,各个参数的应用点也就不同了

单表全数据

1
2
3
4
GET /logger_moc/_search
{
"track_total_hits": true
}

查询全部索引情况

1
GET /_cat/indices?v

节点堆内存,分片设置

关于ES分片设置的6个建议 - 掘金 (juejin.cn)

es分片设置前需要对索引存储进行评估,最好每个分片处于1~50G

1
2
3
4
5
GET /_cat/nodes?v&h=heap*

GET /_nodes/stats/jvm?pretty

GET /_nodes/stats/jvm?filter_path=nodes.*.jvm.mem.heap_*

集群级别配置修改

1
2
3
4
5
# 获取所有集群级别的配置
get _cluster/settings
# response主要有两个主体
persistent:这些设置会一直生效,即使集群重启。这些设置可以通过 API 或者配置文件 elasticsearch.yml 来进行设置。
transient:这些设置在集群重启之前一直会生效。一旦整个集群重启,这些设置就被清除。这些设置可以通过 API 来进行设置。

docker 通过插件查看容器启动命名

https://hub.docker.com/r/cucker/get_command_4_run_container

1
2
3
4
5
6
7
8
9
docker pull cucker/get_command_4_run_container

# 封装插件命令,--privileged 给予容器权限
echo "alias get_run_command='docker run --privileged --rm -v /var/run/docker.sock:/var/run/docker.sock cucker/get_command_4_run_container'" >> ~/.bashrc

# 启用命令
. ~/.bashrc

get_run_command <容器NAME>/<容器ID>

ES ILM & rollover 索引滚动

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ilm-index-lifecycle.html

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ilm-rollover.html

ILM config

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ilm-settings.html

1
2
3
4
5
6
7
8
9
# 集群级别配置修改ILM策略应用频率,默认值10min
indices.lifecycle.poll_interval

PUT _cluster/settings
{
"transient": {
"indices.lifecycle.poll_interval": "1m"
}
}

ILM policy

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ilm-rollover.html

rollover 滚动api搭配ILM自动进行生命周期管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
PUT _ilm/policy/<policy_name>
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_age": "30d"
}
}
},
"delete": {
"min_age": "180d",
"actions": {
"delete": {}
}
}
}
}
}

index template

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PUT _template/<template_name>
{
"index_patterns": [
"<index_name-predfix>-*"
],
"settings": {
"index": {
"lifecycle.name": "<policy_name>",
"lifecycle.rollover_alias": "<your_alias>",
"number_of_shards": "3"
}
},
"mappings": {
"properties": {
.....
}
}
}

init index & set alias

is_write_index:表示当前索引是写入索引

1
2
3
4
5
6
7
8
PUT /<index_name>-000001
{
"aliases": {
"<your_alias>": {
"is_write_index": true
}
}
}

关于 is_write_index 参数在 rollover index api 中有记载,索引滚动有多种场景,详情看文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-rollover-index.html#

我们这里别名对应多个索引,需要指定哪个索引是当前最新写入的,所以初始化时设置 “is_write_index”: true,触发滚动后新索引会被设置为”is_write_index”: true,旧索引值被修改为false

手动执行滚动策略

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-rollover-index.html#

1
2
3
4
5
6
POST /<your_alias>/_rollover
{
"conditions": {
"max_docs": 50000
}
}

ELK

filebeat

配置项:https://www.elastic.co/guide/en/beats/filebeat/7.9/configuring-howto-filebeat.html

模板化配置文件:https://www.elastic.co/guide/en/beats/filebeat/7.9/filebeat-reference-yml.html

filebeat日志多行匹配:https://www.elastic.co/guide/en/beats/filebeat/7.9/multiline-examples.html#multiline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# /u01/soft/filebeat/filebeat.yml filebeat.yml 输入源input、modules。modules是一些日志收集方式的官方实现,input则是自定义收集
filebeat.inputs:
- type: log
paths:
- /usr/share/filebeat/logs/mcs-service.log
fields:
type: mcs
# 跨多行日志处理,例如堆栈信息,每次日志打印开头必是日期,所以这里采用日期分割,这里和你的logback输出相对应
multiline.type: pattern
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after

- type: log
paths:
- /usr/share/filebeat/logs/mes-service.log
fields:
type: mes
multiline.type: pattern
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after
# 忽略指定时间跨度前的文件
ignore_older: 2h
# 指定时间范围内文件未发生改变,则关闭文件处理
close_inactive: 1m
# 文件刷新频率,默认时间10s
scan_frequency: 10s

output.logstash:
hosts: ["128.168.11.101:5044"]
processors:
- drop_fields:
fields: ["agent","ecs","host","input","log"]
1
2
3
4
5
6
7
8
9
docker pull elastic/filebeat:7.9.3

docker run -d \
--name filebeat \
--privileged=true \
-v /u01/soft/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \
-v /u01/soft/logs:/usr/share/filebeat/logs \
-w /usr/share/filebeat \
elastic/filebeat:7.9.3

收集器多个日志源处理

https://www.elastic.co/guide/en/beats/filebeat/7.9/filebeat-input-log.html#filebeat-input-log-fields

自定义fields字段,logstash进行分类判断

logstash

logstash官方文档:https://www.elastic.co/guide/en/logstash/7.9/input-plugins.html

1
2
3
# /u01/soft/logstash/config/logstash.yml logstash.yml
http.host: "127.0.0.1"
http.port: 9600
1
2
3
4
5
6
7
8
9
10
# 格式对比
logback:
%date %level [%thread] %logger{36} [%file : %line] %msg%n
logstash:
"(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3}) %{LOGLEVEL:level} \[%{DATA:thread}\] %{JAVACLASS:javaClass} \[%{DATA:class}\] %{GREEDYDATA:data}"

logback:
%d{yyyy-MM-dd HH:mm:ss.SSS} %ip %level [%thread] [%file : %line] %msg%n
logstash:
"(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}) %{IP:ip} %{LOGLEVEL:level} \[%{DATA:thread}\] \[%{DATA:class}\] %{GREEDYDATA:data}"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# /u01/soft/logstash/pipeline/logstash.conf logstash.conf
input {
beats {
port => 5044
}
}

filter {
if [fields][type] == "mcs" {
grok {
match => {"message" => "(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}) %{IP:ip} %{LOGLEVEL:level} \[%{DATA:thread}\] \[%{DATA:class}\] %{GREEDYDATA:data}"}
}
}
else if [fields][type] == "mes" {
grok {
match => {"message" => "(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}) %{LOGLEVEL:level} \[%{DATA:thread}\] \[%{DATA:class}\] %{GREEDYDATA:data}"}
}
}

date {
match => ["log_time", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
remove_field => ["@version","tags","message","log_time","thread"]
}
}

output {
if [fields][type] == "mcs" {
elasticsearch {
hosts => ["http://128.168.11.112:9200"]
index => "mcslog-%{+YYYY.MM.dd}"
}
}
else if [fields][type] == "mes" {
elasticsearch {
hosts => ["http://128.168.11.112:9200"]
index => "meslog-%{+YYYY.MM.dd}"
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 索引自动删除策略
PUT _ilm/policy/logstash_auto_delete
{
"policy": {
"phases": {
"delete": {
"min_age": "7d",
"actions": {
"delete": {}
}
}
}
}
}

索引模板配置:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-templates-v1.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 自定义日志索引模板 meslog同理
PUT _template/mcslog
{
"index_patterns": [
"mcslog-*"
],
"settings": {
"index": {
"lifecycle": {
"name": "logstash_auto_delete"
},
"number_of_shards": "2"
# 单机设置副本分片0 "number_of_replicas" : "0"
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"level": {
"type": "text"
},
"ip": {
"type": "text"
},
"class": {
"type": "text"
},
"data": {
"type": "text"
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
docker pull logstash:7.9.3

docker run -d \
--name logstash \
--privileged=true \
-p 9600:9600 \
-p 5044:5044 \
-v /u01/soft/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml \
-v /u01/soft/logstash/pipeline:/usr/share/logstash/pipeline \
-w /usr/share/logstash \
logstash:7.9.3

-v /u01/soft/logstash/data:/usr/share/logstash/data \

kibana

kibana配置:https://www.elastic.co/guide/en/kibana/7.9/settings.html

1
2
3
4
5
6
# /u01/soft/kibana/kibana.yml kibana.yml
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://128.168.11.112:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: zh-CN
1
2
3
4
5
6
7
8
9
docker pull kibana:7.9.3

docker run -d \
--name kibana \
--privileged=true \
-p 5601:5601 \
-v /u01/soft/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml \
-w /usr/share/kibana \
kibana:7.9.3
1
2
dateFormat	YYYY-MM-DD HH:mm:ss.SSS
dateFormat:tz Asia/Shanghai

logback切割日志,filebeat收集日志是否会冲突?

logback切割日志,将当前log文件变成我们自定义的zip,并生成新的log,此时filebeat收集日志会不会受到影响?

https://www.jianshu.com/p/e98287437d41

可参考官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.9/filebeat-input-docker.html#filebeat-input-docker-close-inactive

close_inactive、scan_frequency等相关配置

简单说filebeat有刷新机制,不会长期监听一个没有新增内容的文件

镜像导入导出

1
2
3
docker save -o <镜像存储输出路径,保存为tar> <镜像id or 镜像名:版本号>
上传文件进行宿主机导入
docker load -i <tar包上传路径>