搭建高效的ELFK 7.X集群:ELK、Kafka与Filebeat的完整指南 
ELK 架构介绍与优势 
集群服务版本与配置 
服务 
版本 
 
 
Java 
1.8.0_221 
 
Elasticsearch 
7.10.1 
 
Filebeat 
7.10.1 
 
Kibana 
7.10.1 
 
Logstash 
7.10.1 
 
Cerebro 
0.9.2-1 
 
Kafka 
2.12-2.3.0 
 
Zookeeper 
3.5.6 
 
 
服务器环境说明与角色分配 
IP地址 
主机名 
配置 
角色 
 
 
10.0.11.172 
elk-master 
4C16G 
es-master、kafka+zookeeper1 
 
10.0.21.117 
elk-node1 
4C16G 
es-node1、kafka+zookeeper2 
 
10.0.11.208 
elk-node2 
4C16G 
es-node2、kafka+zookeeper3 
 
10.0.10.242 
elk-kibana 
4C16G 
logstash、kibana、cerebro 
 
 
系统参数优化 
  三个节点都需要执行
修改主机名以确保集群稳定性 
1
 2
 3
  
hostnamectl set-hostname elk-master
 hostnamectl set-hostname elk-node1
 hostnamectl set-hostname elk-node2
  
 
增加文件描述符以支持高并发 
1
 2
 3
 4
 5
 6
 7
 8
  
cat >>/etc/security/limits.conf<< EOF
  *               soft      nofile          65536
 *               hard      nofile          65536
 *               soft      nproc           65536
 *               hard      nproc           65536
 *               hard      memlock         unlimited
 *               soft      memlock         unlimited
 EOF 
 
 
修改默认限制内存以优化性能 
1
 2
 3
 4
 5
  
cat >>/etc/systemd/system.conf<< EOF
  DefaultLimitNOFILE=65536
 DefaultLimitNPROC=32000
 DefaultLimitMEMLOCK=infinity
 EOF 
 
 
优化内核设置以支持 Elasticsearch 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
  
cat >>/etc/sysctl.conf<< EOF
  # 关闭交换内存
 vm.swappiness =0
 # 影响Java线程数量,建议修改为262144或者更高
 vm.max_map_count= 262144
 # 优化内核listen连接
 net.core.somaxconn=65535
 # 最大打开文件描述符数,建议修改为655360或者更高
 fs.file-max=655360
 # 开启IPv4转发
 net.ipv4.ip_forward= 1
 EOF 
 
 
修改 Hostname 配置文件以确保网络通信 
1
 2
 3
 4
 5
  
cat >>/etc/hosts<< EOF
  elk-master  10.0.11.172
 elk-node1   10.0.21.117
 elk-node2   10.0.11.208
 EOF 
 
 
重启服务器以使配置生效 
部署 Zookeeper 
  三个节点都需要执行
创建 Zookeeper 项目目录 
1
 2
 3
 4
  
#存放快照日志 
mkdir zkdata 
 #存放事务日志 
mkdir zklogs 
  
 
下载并解压 Zookeeper 
1
 2
 3
  
wget  http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz
 tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz 
 mv apache-zookeeper-3.5.6-bin zookeeper
  
 
修改 Zookeeper 配置文件 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
  
[ root@elk-master zookeeper] # cat conf/zoo.cfg  |grep  -v ^# 
# 服务器之间或客户端与服务器之间维持心跳的时间间隔 
# tickTime以毫秒为单位。 
tickTime = 2000  
# 集群中的follower服务器(F)与leader服务器(L)之间的初始连接心跳数 
initLimit = 10 
# 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数 
syncLimit = 5 
# 数据保存目录 
dataDir = ../zkdata
# 日志保存目录 
dataLogDir = ../zklogs
# 客户端连接端口 
clientPort = 2181 
# 客户端最大连接数。根据自己实际情况设置,默认为60个 
maxClientCnxns = 60 
# 三个接点配置,格式为: server.服务编号=服务地址、LF通信端口、选举端口 
server.1= 10.0.11.172:2888:3888
 server.2= 10.0.21.117:2888:3888
 server.3= 10.0.11.208:2888:3888
  
 
写入节点标记 
  分别在三个节点/home/tools/zookeeper/zkdata/myid写入节点标记
  
    
      master节点 
    
      node1节点 
    
      node2节点 
    
  
  
  
  master的操作 
1
  
echo  "1"  > /home/tools/zookeeper/zkdata/myid
 
 
 
 
  node1的操作 
1
  
echo  "2"  > /home/tools/zookeeper/zkdata/myid
 
 
 
 
  node2的操作 
1
  
echo  "3"  > /home/tools/zookeeper/zkdata/myid
 
 
 
 
 
启动 Zookeeper 集群 
1
 2
 3
 4
 5
  
[ root@elk-master zookeeper] # cd /home/tools/zookeeper/bin/ 
[ root@elk-master bin] # ./zkServer.sh start  
ZooKeeper JMX enabled by default
 Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg
 Starting zookeeper ... STARTED
  
 
检查 Zookeeper 集群状态 
1
 2
 3
 4
 5
  
[ root@elk-master bin] # sh /home/tools/zookeeper/bin/zkServer.sh status  
ZooKeeper JMX enabled by default
 Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg
 Client port found: 2181. Client address: localhost.
 Mode: leader
  
 
设置全局变量 
1
 2
 3
 4
 5
  
cat >>/etc/profile<< EOF
  export ZOOKEEPER_INSTALL=/home/tools/zookeeper/
 export PATH=$PATH:$ZOOKEEPER_INSTALL/bin
 export PATH
 EOF 
 
 
 
这样就可以全局使用zkServer.sh命令了
部署 Kafka 集群 
  三个节点都需要执行
下载并解压 Kafka 压缩包 
1
 2
 3
 4
 5
 6
  
[ root@elk-master tools] # mkdir kafka 
[ root@elk-master tools] # cd kafka/ 
[ root@elk-master kafka] # wget https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz 
[ root@elk-master kafka] # tar xf kafka_2.12-2.3.0.tgz  
[ root@elk-master kafka] # mv kafka_2.12-2.3.0 kafka 
[ root@elk-master kafka] # cd kafka/config/ 
 
 
配置 Kafka 服务器 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
  
[ root@elk-master config] # cat /home/tools/kafka/kafka/config/server.properties 
############################# Server Basics #############################  
# broker的id,值为整数,且必须唯一,在一个集群中不能重复 
broker.id= 1 
 
 ############################# Socket Server Settings #############################  
# kafka默认监听的端口为9092 (默认与主机名进行连接) 
listeners = PLAINTEXT://:9092
 # 处理网络请求的线程数量,默认为3个 
num.network.threads= 3 
 
 # 执行磁盘IO操作的线程数量,默认为8个  
num.io.threads= 8 
 
 # socket服务发送数据的缓冲区大小,默认100KB 
socket.send.buffer.bytes= 102400 
 
 # socket服务接受数据的缓冲区大小,默认100KB 
socket.receive.buffer.bytes= 102400 
 
 # socket服务所能接受的一个请求的最大大小,默认为100M 
socket.request.max.bytes= 104857600 
 
 ############################# Log Basics #############################  
# kafka存储消息数据的目录 
log.dirs= ../kfkdata
 
 # 每个topic默认的partition数量 
num.partitions= 3 
 
 # 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量 
num.recovery.threads.per.data.dir= 1 
 
 ############################# Log Flush Policy #############################  
 # 消息刷新到磁盘中的消息条数阈值 
#log.flush.interval.messages=10000 
 # 消息刷新到磁盘中的最大时间间隔,1s 
#log.flush.interval.ms=1000 
 ############################# Log Retention Policy #############################  
 # 日志保留小时数,超时会自动删除,默认为7天 
log.retention.hours= 168 
 
 # 日志保留大小,超出大小会自动删除,默认为1G 
#log.retention.bytes=1073741824 
 # 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件 
log.segment.bytes= 1073741824 
 
 # 每隔多长时间检测数据是否达到删除条件,300s 
log.retention.check.interval.ms= 300000 
 
 ############################# Zookeeper #############################  
# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开 
zookeeper.connect= 10.0.11.172,10.0.21.117,10.0.11.208
 
 # 连接zookeeper的超时时间,6s 
zookeeper.connection.timeout.ms= 6000 
  
 
创建数据存储的目录 
1
  
[ root@elk-master config] # mkdir ../kfkdata 
 
 
修改broker.id 
  分别在三个节点依次修改/home/tools/kafka/kafka/config/server.properties配置文件
 
 
 
启动 Kafka 集群 
1
 2
 3
 4
 5
  
cd  /home/tools/kafka/kafka/bin/
#启动测试 
./kafka-server-start.sh ../config/server.properties
 #放入后台 
./kafka-server-start.sh -daemon ../config/server.properties
  
 
测试 
  任意节点均可执行
 
在创建topic在集群中的任意节点 发布消息订阅消息验证结果 
 
  
    
      创建topic 
    
      消息发布 
    
      topic消息订阅 
    
  
  
  
  
1
 2
 3
 4
 5
 6
  
[ root@elk-master bin] # ./kafka-topics.sh \ 
--create \
   --zookeeper 10.0.11.172:2181,10.0.21.117:2181,10.0.11.208:2181 \
  --partitions 3  \
  --replication-factor 1  \
  --topic logs
 
 
 
 
  
1
 2
 3
  
[ root@elk-master bin] # ./kafka-console-producer.sh \ 
--broker-list 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \
   --topic logs
 
 
 
 
  
1
 2
 3
 4
  
[ root@elk-master bin] #  ./kafka-console-consumer.sh \ 
--bootstrap-server 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \
   --topic logs \
  --from-beginning
 
 
 
 
 
 
 
部署 Elasticsearch 
  三个节点都需要执行
下载并安装 Elasticsearch 
1
 2
  
wget https://pan.cnsre.cn/d/Package/Linux/ELK/elasticsearch-7.10.1-x86_64.rpm
 [ root@elk-master package] #  rpm -ivh elasticsearch-7.10.1-x86_64.rpm  
 
 
备份配置文件 
1
 2
  
cd  /etc/elasticsearch
cp elasticsearch.yml  elasticsearch.yml.bak
  
 
修改 Elasticsearch 配置文件 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
  
cat >/etc/elasticsearch/elasticsearch.yml << EOF
  #集群名
 cluster.name: elk-cluster
 
 #node名
 node.name: elk-1
 
 #数据存储路径
 path.data: /home/elasticsearch/esdata
 
 #数据快照路径
 path.repo: /home/backup/essnapshot
 
 #日志存储路径
 path.logs: /home/elasticsearch/eslogs
 
 #es绑定的ip地址,根据自己机器ip进行修改
 network.host: 0.0.0.0
 
 #服务端口
 http.port: 9200
 
 #集群master需要和node名设置一致
 discovery.seed_hosts: ["10.0.11.172", "10.0.21.117", "10.0.11.208"]
 cluster.initial_master_nodes: ["10.0.11.172","10.0.21.117","10.0.11.208"]
 
 #允许跨域请求
 http.cors.enabled: true
 #* 表示支持所有域名
 http.cors.allow-origin: "*"
 #添加请求header
 http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
 
 #生产必须为true,内存锁定检查,目的是内存地址直接映射,减少一次copy时间
 bootstrap.memory_lock: true
 #系统过滤检查,防止数据损坏,考虑集群安全,生产设置成false
 bootstrap.system_call_filter: false
 
 #xpack配置
 xpack.security.enabled: true
 xpack.security.transport.ssl.enabled: true
 xpack.security.transport.ssl.verification_mode: certificate
 xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12
 xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12
 EOF 
 
 
修改JVM 
将jvm.options文件中22-23行的8g设置为你的服务内存的一半 
 
1
 2
 3
  
[ root@elk-node1 elasticsearch] # cat -n  jvm.options |grep 8g 
    22   -Xms8g
     23   -Xmx8g
  
 
修改其他节点配置 
  分别在三个节点修改/etc/elasticsearch/elasticsearch.yml配置文件
 
最终展示 
 
分配权限 
因为自定义数据、日志存储目录,所以要把权限给到目录
1
 2
 3
 4
  
mkdir  -p /home/elasticsearch/{ esdata,eslogs} 
 chown  elasticsearch:elasticsearch  /home/elasticsearch/*
 mkdir  -p /home/backup/essnapshot
 chown elasticsearch:elasticsearch /home/backup/essnapshot
  
 
启动 Elasticsearch 服务 
三个节点全部启动并加入开机启动
1
 2
  
systemctl start elasticsearch
 systemctl enable   elasticsearch
  
 
使用xpack进行安全认证 
xpack的安全功能 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
  
Unexpected response code [ 503]  from calling PUT http://10.0.11.172:9200/_security/user/apm_system/_password?pretty
 Cause: Cluster state has not been recovered yet, cannot write to the [ null]  index
 
 Possible next steps:
 * Try running this tool again.
 * Try running with the --verbose parameter for  additional messages.
 * Check the elasticsearch logs for  additional error details.
 * Use the change password API manually. 
 
 ERROR: Failed to set  password for  user [ apm_system] .
  
 
申请证书 
  下面的操作,在其中一个节点操作即可
1
 2
  
/usr/share/elasticsearch/bin/elasticsearch-certutil ca
 /usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12
  
 
两条命令均一路回车即可,不需要给秘钥再添加密码 
证书创建完成之后,默认在es的数据目录。 
将证书拷贝到etc下,并给上权限。
[root@elk-master ~]# ls /usr/share/elasticsearch/elastic-*
/usr/share/elasticsearch/elastic-certificates.p12
/usr/share/elasticsearch/elastic-stack-ca.p12
cp /usr/share/elasticsearch/elastic-* /etc/elasticsearch/
chown elasticsearch.elasticsearch /etc/elasticsearch/elastic*
做完之后,将证书拷贝到其他节点
为内置账号添加密码 
ES中内置了几个管理其他集成组件的账号apm_system, beats_system, elastic, kibana, logstash_system, remote_monitoring_user使用之前,首先需要设置下密码。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
  
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
 Initiating the setup of passwords for  reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.
 You will be prompted to enter passwords as the process progresses.
 Please confirm that you would like to continue  [ y/N] y
 Enter password for  [ elastic] :
 Reenter password for  [ elastic] :
 Enter password for  [ apm_system] :
 Reenter password for  [ apm_system] :
 Enter password for  [ kibana] :
 Reenter password for  [ kibana] :
 Enter password for  [ logstash_system] :
 Reenter password for  [ logstash_system] :
 Enter password for  [ beats_system] :
 Reenter password for  [ beats_system] :
 Enter password for  [ remote_monitoring_user] :
 Reenter password for  [ remote_monitoring_user] :
 Changed password for  user [ apm_system] 
 Changed password for  user [ kibana] 
 Changed password for  user [ logstash_system] 
 Changed password for  user [ beats_system] 
 Changed password for  user [ remote_monitoring_user] 
 Changed password for  user [ elastic] 
  
 
部署 Cerebro 管理工具 
下载并安装 Cerebro 
1
 2
  
wget https://pan.cnsre.cn/d/Package/Linux/ELK/cerebro-0.9.2-1.noarch.rpm
 rpm -ivh cerebro-0.9.2-1.noarch.rpm 
  
 
修改 Cerebro 配置文件 
修改/etc/cerebro/application.conf配置文件 
找到对应配置修改为以下内容 
  
    
      修改内容一 
    
      修改内容二 
    
  
  
 
  
1
 2
  
data.path: "/var/lib/cerebro/cerebro.db" 
 #data.path = "./cerebro.db" 
 
 
 
 
  
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
  
hosts  =  [ 
  #{ 
   #  host = "http://localhost:9200" 
   #  name = "Localhost cluster" 
   #  headers-whitelist = [ "x-proxy-user", "x-proxy-roles", "X-Forwarded-For" ] 
   #} 
   # Example of host with authentication 
   { 
     host  =  "http://10.0.11.172:9200" 
     name  =  "elk-cluster" 
     auth  =  { 
       username  =  "elastic" 
       password  =  "123" 
     } 
   } 
 ] 
 
 
 
 
 
报错 
  cerebro[8073]: No java installations was detected.
 
启动服务后报错No java,但是我的环境是有JAVA的。也做了全局变量 
感觉很奇怪…
解决方法 
  在启动服务文件中加入JAVA_HOME
找到服务启动文件/usr/share/cerebro/bin/cerebro 
修改/usr/share/cerebro/bin/cerebro中的JAVA_HOME 
具体如下,根据自己的JAVA_HOME填写路径 
  
    
      修改前 
    
      修改后 
    
  
  
 
  
1
 2
 3
 4
 5
 6
 7
  
  if  [[  -n " $bundled_jvm "  ]] ;   then 
     echo  " $bundled_jvm /bin/java" 
   elif  [[  -n " $JAVA_HOME "  ]]  &&  [[  -x " $JAVA_HOME /bin/java"  ]] ;   then 
     echo  " $JAVA_HOME /bin/java" 
   else 
     echo  "java" 
   fi 
  
 
 
 
  
1
 2
 3
 4
 5
 6
 7
  
  if  [[  -n " $bundled_jvm "  ]] ;   then 
     echo  " $bundled_jvm /bin/java" 
   elif  [[  -n "/home/tools/jdk1.8.0_221"  ]]  &&  [[  -x "/home/tools/jdk1.8.0_221/bin/java"  ]] ;   then 
     echo  "/home/tools/jdk1.8.0_221/bin/java" 
   else 
     echo  "java" 
   fi 
  
 
 
 
 
 
 
启动服务 
1
 2
 3
  
systemctl  start cerebro.service
 systemctl  enable  cerebro.service
 systemctl  status  cerebro.service
  
 
可以看到监听的是9000端口 
访问试下 
部署 Kibana 
下载并安装 Kibana 
1
 2
  
https://artifacts.elastic.co/downloads/kibana/kibana-7.10.1-x86_64.rpm
 rpm -ivh kibana-7.10.1-x86_64.rpm
  
 
修改备份配置文件 
1
 2
  
cd  /etc/kibana/
mv kibana.yml kibana.yml.bak
  
 
 
1
 2
 3
 4
 5
 6
 7
  
vim kibana.yml
 server.port: 5601 
 server.host: 0.0.0.0
 elasticsearch.hosts: [ "http://10.0.11.172:9200/" ,"http://10.0.21.117:9200/" ,"http://10.0.11.208:9200/" ] 
 elasticsearch.username: "elastic" 
 elasticsearch.password: "123" 
 i18n.locale: "zh-CN" 
  
 
启动 Kibana 服务器 
1
 2
 3
  
systemctl  start   kibana.service
 systemctl  enable   kibana.service
 systemctl  status  kibana.service
  
 
访问WEB 
访问http://IP:5601 
部署 Filebeat 
1
 2
 3
 4
  
wget https://pan.cnsre.cn/d/Package/Linux/ELK/filebeat-7.10.1-x86_64.rpm
 rpm -ivh filebeat-7.10.1-x86_64.rpm 
 cd  /etc/filebeat/
cp filebeat.yml filebeat.yml.bak
  
 
修改 Filebeat 配置文件 
修改Filebeat配置文件,把日志推送到Kafka
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
  
#=========================== Filebeat inputs ============================= 
 max_procs :   1                       #限制Filebeat的进程数量,其实就是内核数,避免过多抢占业务资源 
 queue.mem.events :   256              # 存储于内存队列的事件数,排队发送 (默认4096) 
 queue.mem.flush.min_events :   128    # 小于 queue.mem.events ,增加此值可提高吞吐量 (默认值2048) 
 filebeat.inputs :                   # inputs为复数,表名type可以有多个 
  - type :   log                        # 输入类型 
    enable :   true                     # 启用这个type配置 
    paths : 
      - /home/homeconnect/logs/AspectLog/aspect.log    # 监控tomcat  的业务日志 
    json.keys_under_root :   true       #默认Flase,还会将json解析的日志存储至messages字段 
    json.overwrite_keys :   true        #覆盖默认的key,使用自定义json格式的key 
    max_bytes :   20480                 # 单条日志的大小限制,建议限制(默认为10M,queue.mem.events * max_bytes 将是占有内存的一部) 
    fields :                          # 额外的字段 
       source :   test-prod-tomcat-aspect-a      # 自定义source字段,用于es建议索引(字段名小写,我记得大写好像不行) 
 
  - type :   log        # 输入类型 
    enable :   true     # 启用这个type配置 
    paths : 
      - /home/tools/apache-tomcat-8.5.23/logs/localhost_access_log.*.log    # 监控tomcat access日志 
    json.keys_under_root :   true     #默认Flase,还会将json解析的日志存储至messages字段 
    json.overwrite_keys :   true      #覆盖默认的key,使用自定义json格式的key 
    max_bytes :   20480               # 单条日志的大小限制,建议限制(默认为10M,queue.mem.events * max_bytes 将是占有内存的一部分) 
    fields :                        # 额外的字段 
       source :   test-prod-tomcat-access-a      # 自定义source字段,用于es建议索引 
 
 # 自定义es的索引需要把ilm设置为false 
 setup.ilm.enabled :   false 
 
 #=============================== output =============================== 
 output.kafka :           # 输出到Kafka 
      enabled :   true       # 该output配置是否启用 
      hosts :   [ "10.0.11.172:9092" , "10.0.21.117:9092" , "10.0.11.208:9092" ]   # Kafka节点列表 
      topic :   'logstash-%{[fields.source]}'    # Kafka会创建该topic,然后logstash(可以过滤修改)会传给es作为索引名称 
      partition.hash : 
        reachable_only :   true   # 是否只发往可达分区 
      compression :   gzip        # 压缩 
      max_message_bytes :   1000000    # Event最大字节数。默认1000000。应小于等于Kafka broker message.max.bytes值 
      required_acks :   1    # Kafka ack等级 
      worker :   1    # Kafka output的最大并发数 
      bulk_max_size :   2048      # 单次发往Kafka的最大事件数 
      logging.to_files :   true     # 输出所有日志到file,默认true, 达到日志文件大小限制时,日志文件会自动限制替换 
 
 #=============================== other =============================== 
 close_older :   30m           # 如果文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1h 
 force_close_files :   false   # 这个选项关闭一个文件,当文件名称的变化。只在window建议为true 
 # 没有新日志采集后多长时间关闭文件句柄,默认5分钟,设置成1分钟,加快文件句柄关闭 
 close_inactive :   1m 
 # 传输了3h后荏没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key point 
 close_timeout :   3h 
 # 这个配置项也应该配置上,默认值是0表示不清理,不清理的意思是采集过的文件描述在registry文件里永不清理,在运行一段时间后,registry会变大,可能会带来问题 
 clean_inactive :   72h 
 # 设置了clean_inactive后就需要设置ignore_older,且要保证ignore_older < clean_inactive 
 ignore_older :   70h                                                                                          
  
 
启动服务 
1
 2
 3
  
systemctl  start  filebeat.service
 systemctl  enable   filebeat.service
 systemctl  status  filebeat.service
  
 
部署 Logstash 
下载安装 
1
 2
 3
  
wget https://pan.cnsre.cn/d/Package/Linux/ELK/logstash-7.10.1-x86_64.rpm
 rpm -ivh logstash-7.10.1-x86_64.rpm
 mv logstash.yml logstash.yml.bak
  
 
修改 Logstash 配置文件 
修改logstash.yml
1
 2
 3
 4
 5
 6
  
vim logstash.yml 
 http.host :   "0.0.0.0" 
 # 指发送到Elasticsearch的批量请求的大小,值越大,处理则通常更高效,但增加了内存开销 
 pipeline.batch.size :   3000 
 # 指调整Logstash管道的延迟,过了该时间则logstash开始执行过滤器和输出 
 pipeline.batch.delay :   200 
  
 
修改配置文件,从Kafka获取日志
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
  
[ root@elk-kibana conf.d] # cat /etc/logstash/conf.d/get-kafka-logs.conf  
input {                                    # 输入组件 
   kafka {                                   # 从Kafka消费数据 
     bootstrap_servers  = > [ "10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092" ] 
     codec  = > "json"                         # 数据格式 
     #topics => ["3in1-topi"]                   # 使用Kafka传过来的topic 
     topics_pattern  = > "logstash-.*"              # 使用正则匹配topic 
     consumer_threads  = > 3                   # 消费线程数量 
     decorate_events  = > true                 # 可向事件添加Kafka元数据,比如主题、消息大小的选项,这将向Logstash事件中添加一个名为Kafka的字段 
     auto_offset_reset  = > "latest"           # 自动重置偏移量到最新的偏移量 
     #group_id => "logstash-node"            # 消费组ID,多个有相同group_id的Logstash实例为一个消费组 
     #client_id => "logstash1"               # 客户端ID 
     fetch_max_wait_ms  = > "1000"             # 指当没有足够的数据立即满足fetch_min_bytes时,服务器在回答fetch请求之前将阻塞的最长时间 
   } 
 } 
 filter{ 
    # 当非业务字段时,无traceId则移除 
    #if ([message] =~ "traceId=null") {          # 过滤组件,这里只是展示,无实际意义,根据自己的业务需求进行过滤 
    #   drop {} 
    #} 
 mutate { 
     convert  = > [ "Request time" , "float" ] 
     } 
         if  [ ip]  !=  "-"  { 
         geoip { 
                        source  = > "ip" 
                         target  = > "geoip" 
                        # database => "/usr/share/GeoIP/GeoIPCity.dat" 
                         add_field  = > [  "[geoip][coordinates]" , "%{[geoip][longitude]}"  ] 
                         add_field  = > [  "[geoip][coordinates]" , "%{[geoip][latitude]}"   ] 
                 } 
                    mutate { 
                         convert  = > [  "[geoip][coordinates]" , "float" ] 
                 } 
         } 
  } 
 
 
 output {            # 输出组件 
   elasticsearch {    # Logstash输出到ES 
     hosts  = > [ "10.0.11.172:9200" ,"10.0.21.117:9200" ,"10.0.11.208:9200" ] 
     index  = > "logstash-%{[fields][source]}-%{+YYYY-MM-dd}"   # 直接在日志中匹配 
     #index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}"  # 以日期建索引 
     user  = > "elastic" 
     password  = > "123" 
   } 
   #stdout { 
   #    codec => rubydebug 
  #} 
 } 
 
 
测试接收日志 
测试是否能接收到数据
1
  
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/get-kafka-logs.conf
  
 
下边把logstash设置为使用systemd启动 
修改/etc/systemd/system/logstash.service文件
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
  
[ Unit] 
Description = root
 [ Service] 
Type = simple
User = root
Group = root
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist. 
# Prefixing the path with '-' makes it try to load, but if the file doesn't 
# exist, it continues onward. 
EnvironmentFile = -/etc/default/logstash
EnvironmentFile = -/etc/sysconfig/logstash
ExecStart = /usr/share/logstash/bin/logstash "--path.settings"  "/etc/logstash