数据迁移流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#先将上次的备份数据清空
delete /logger_mes_bak
delete /logger_mcs_bak
delete /logger_mcs_command_bak
delete /logger_mcs_exception_bak

# 备份索引
PUT /logger_mcs_exception_bak
{
"settings": {
"number_of_shards": 6,
"number_of_replicas": 0,
"refresh_interval": -1
},
"mappings" : {
"properties" : {
"carrier" : {
"type" : "keyword"
},
"createTime" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss.SSS || yyyy-MM-dd HH:mm:ss"
},
"data" : {
"type" : "keyword"
},
"detail" : {
"type" : "keyword"
},
"device" : {
"type" : "keyword"
},
"logType" : {
"type" : "keyword"
},
"vehicle" : {
"type" : "keyword"
}
}
}
}

# 旧索引数据拷贝至备份索引,reindex多参数配置,进行异步操作
POST /_reindex
{
"source": {
"index": "logger_mcs_exception"
},
"dest": {
"index": "logger_mcs_exception_bak"
}
}

# 刷新备份索引
PUT /logger_mcs_exception_bak/_settings
{
"refresh_interval": "1s",
"number_of_replicas": 1
}

# 删除旧索引
DELETE /logger_mcs_exception

# 创建新索引
PUT /logger_mcs_exception
{
"settings": {
"number_of_shards": 6,
"number_of_replicas": 0,
"refresh_interval": -1
},
"mappings" : {
"properties" : {
"carrier" : {
"type" : "keyword"
},
"createTime" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss.SSS || yyyy-MM-dd HH:mm:ss"
},
"data" : {
"type" : "keyword"
},
"detail" : {
"type" : "keyword"
},
"device" : {
"type" : "keyword"
},
"logType" : {
"type" : "keyword"
},
"vehicle" : {
"type" : "keyword"
}
}
}
}

# 备份数据拷贝到新索引
POST /_reindex
{
"source": {
"index": "logger_mcs_exception_bak"
},
"dest": {
"index": "logger_mcs_exception"
}
}

# 刷新新索引
PUT /logger_mcs_exception/_settings
{
"refresh_interval": "1s",
"number_of_replicas": 1
}

滚动删除,减小服务器压力

1
2
3
4
5
6
7
8
9
10
POST /<index_name>/_delete_by_query?scroll_size=5000&slices=10&wait_for_completion=false&refresh=true
{
"query": {
"range": {
"date": {
"lt": ""
}
}
}
}

单表全数据

1
2
3
4
GET /logger_moc/_search
{
"track_total_hits": true
}

查询全部索引情况

1
GET /_cat/indices?v

reindex 数据迁移

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-reindex.html#docs-reindex-api-request-body

script 使用脚本操作,有两个配置项,source:脚本内容,lang:脚本语言类型(默认实现是painless),详情见 官方文档 Request body

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 该语句执行成功后 右侧会出现"task" : "任务id",把任务id记录下来,不要左右引号
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "logger_mes"
},
"dest": {
"index": "logger_mes_bak"
},
"script": {
"source": "ctx._source.remove(\"returnCode\");ctx._source.remove(\"returnMessage\")"
}
}

# 把<task-id>换成上一步操作得到的任务id,不要引号,执行后可在右侧查看迁移情况,"completed" : true代表完成迁移
GET _tasks/<task-id>

# 如果es集群负载过大,可通过任务id停止操作
POST _tasks/<task-id>/_cancel

reindex 来源数据和终点数据可进行查询配置迁移的内容,详情见 官方文档 Request body

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
POST /_reindex
{
"source": {
"index": "logger_mes",
"query": {
"range": {
"createTime": {
"gte": ,
"lte":
}
}
}
},
"dest": {
"index": "logger_mes_bak"
}
}

通过插件查看容器启动命名

https://hub.docker.com/r/cucker/get_command_4_run_container

1
2
3
4
5
6
7
8
9
docker pull cucker/get_command_4_run_container

# 封装插件命令,--privileged 给予容器权限
echo "alias get_run_command='docker run --privileged --rm -v /var/run/docker.sock:/var/run/docker.sock cucker/get_command_4_run_container'" >> ~/.bashrc

# 启用命令
. ~/.bashrc

get_run_command <容器NAME>/<容器ID>

节点堆内存,分片设置

关于ES分片设置的6个建议 - 掘金 (juejin.cn)

1
es分片设置前需要对索引存储进行评估,最好每个分片处于1~50G
1
2
3
4
5
GET /_cat/nodes?v&h=heap*

GET /_nodes/stats/jvm?pretty

GET /_nodes/stats/jvm?filter_path=nodes.*.jvm.mem.heap_*

ES客户端 deleteByQuery

1
2
3
submitDeleteByQueryTask wait_for_completion = false es实现的异步操作,返回值taskid,可根据id查看
deleteByQueryAsync wait_for_completion = true 代码层面实现异步
deleteByQuery wait_for_completion = true 同步删除

ES rollover 索引滚动

创建索引模板

指定索引前缀、别名、分片配置、索引结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
PUT _template/mcs_command_bak
{
"index_patterns": [
"logger_mcs_command_bak-*"
],
"aliases": {
"mcs_command_bak": {}
},
"settings": {
"index": {
"number_of_shards": "3"
}
},
"mappings" : {
"properties" : {
"carrier" : {
"type" : "keyword"
},
"commandData" : {
"type" : "keyword"
},
"commandId" : {
"type" : "keyword"
},
"createTime" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss.SSS || yyyy-MM-dd HH:mm:ss"
},
"dataSource" : {
"type" : "keyword"
},
"dataTarget" : {
"type" : "keyword"
},
"endPoint" : {
"type" : "keyword"
},
"endType" : {
"type" : "keyword"
},
"event" : {
"type" : "keyword"
},
"location" : {
"type" : "keyword"
},
"logType" : {
"type" : "keyword"
}
}
}
}

创建新索引

1
PUT /logger_mcs_command_bak-000001

指定别名

1
2
3
4
5
6
7
8
9
10
11
12
POST /_aliases
{
"actions": [
{
"add": {
"index": "logger_mcs_command_bak-000001",
"alias": "mcs_command_bak",
"is_write_index": true
}
}
]
}

滚动策略

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-rollover-index.html#

1
2
3
4
5
6
POST /mcs_command_bak/_rollover
{
"conditions": {
"max_docs": 50000
}
}

我新建一个log_bak索引并配置rollover策略,rollover生效后使用reindex向log_bak索引导入数据,rollover策略是否不生效

ELK

filebeat

配置项:https://www.elastic.co/guide/en/beats/filebeat/7.9/configuring-howto-filebeat.html

模板化配置文件:https://www.elastic.co/guide/en/beats/filebeat/7.9/filebeat-reference-yml.html

filebeat日志多行匹配:https://www.elastic.co/guide/en/beats/filebeat/7.9/multiline-examples.html#multiline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# /u01/soft/filebeat/filebeat.yml filebeat.yml 输入源input、modules。modules是一些日志收集方式的官方实现,input则是自定义收集
filebeat.inputs:
- type: log
paths:
- /usr/share/filebeat/logs/*.log
multiline.type: pattern
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after
# 忽略指定时间跨度前的文件
ignore_older: 2h
# 指定时间范围内文件未发生改变,则关闭文件处理
close_inactive: 1m
# 文件刷新频率,默认时间10s
scan_frequency: 10s

output.logstash:
hosts: ["128.168.11.101:5044"]
processors:
- drop_fields:
fields: ["agent","ecs","host","input","log"]
1
2
3
4
5
6
7
8
9
docker pull elastic/filebeat:7.9.3

docker run -d \
--name filebeat \
--privileged=true \
-v /u01/soft/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \
-v /u01/soft/mcs/logs:/usr/share/filebeat/logs \
-w /usr/share/filebeat \
elastic/filebeat:7.9.3

logstash

logstash官方文档:https://www.elastic.co/guide/en/logstash/7.9/input-plugins.html

1
2
3
# /u01/soft/logstash/config/logstash.yml logstash.yml
http.host: "127.0.0.1"
http.port: 9600
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# /u01/soft/logstash/pipeline/logstash.conf logstash.conf
input {
beats {
port => 5044
}
}

filter {
grok {
match => {"message" => "(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3}) %{LOGLEVEL:level} \[%{DATA:thread}\] %{JAVACLASS:javaClass} \[%{DATA:class}\] %{GREEDYDATA:data}"}
}
date {
match => ["log_time", "yyyy-MM-dd HH:mm:ss,SSS"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
remove_field => ["@version","tags","message","log_time","thread", "javaClass"]
}
}

output {
elasticsearch {
hosts => ["http://128.168.11.112:9200"]
index => "mcslog-%{+YYYY.MM.dd}"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 索引自动删除策略
PUT _ilm/policy/logstash_auto_delete
{
"policy": {
"phases": {
"delete": {
"min_age": "7d",
"actions": {
"delete": {}
}
}
}
}
}

索引模板配置:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-templates-v1.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 自定义日志索引模板
PUT _template/mcslog
{
"index_patterns": [
"mcslog-*"
],
"settings": {
"index": {
"lifecycle": {
"name": "logstash_auto_delete"
},
"number_of_shards": "6"
# 单机设置副本分片0 "number_of_replicas" : "0"
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"level": {
"type": "text"
},
"class": {
"type": "text"
},
"data": {
"type": "text"
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
docker pull logstash:7.9.3

docker run -d \
--name logstash \
--privileged=true \
-p 9600:9600 \
-p 5044:5044 \
-v /u01/soft/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml \
-v /u01/soft/logstash/pipeline:/usr/share/logstash/pipeline \
-w /usr/share/logstash \
logstash:7.9.3

-v /u01/soft/logstash/data:/usr/share/logstash/data \

kibana

kibana配置:https://www.elastic.co/guide/en/kibana/7.9/settings.html

1
2
3
4
5
6
# /u01/soft/kibana/kibana.yml kibana.yml
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://128.168.11.112:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: zh-CN
1
2
3
4
5
6
7
8
9
docker pull kibana:7.9.3

docker run -d \
--name kibana \
--privileged=true \
-p 5601:5601 \
-v /u01/soft/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml \
-w /usr/share/kibana \
kibana:7.9.3
1
2
dateFormat	YYYY-MM-DD HH:mm:ss.SSS
dateFormat:tz Asia/Shanghai

logback切割日志,filebeat收集日志是否会冲突?

logback切割日志,将当前log文件变成我们自定义的zip,并生成新的log,此时filebeat收集日志会不会受到影响?

https://www.jianshu.com/p/e98287437d41

可参考官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.9/filebeat-input-docker.html#filebeat-input-docker-close-inactive

close_inactive、scan_frequency等相关配置

简单说filebeat有刷新机制,不会长期监听一个没有新增内容的文件