Kafka集群搭建+ELK集群协同工作实验记录
背景:前面已经部署了ELK集群,见ELK集群部署。这里结合kafka使用做个案例实验,用于展示如何将这些技术结合起来处理和分析日志数据。
- kafka1:10.170.0.8
- kafka2:10.170.0.9
- kafka3:10.170.0.10
目标:构建一个简单的ELK + Kafka系统,用于实时收集、处理、分析和可视化日志数据。
架构组件
- Kafka: 作为日志消息的中间存储,提供高吞吐量的日志消息队列。
- Logstash: 作为日志处理管道,从Kafka消费日志,进行处理和解析,然后输出到Elasticsearch。
- Elasticsearch: 用于存储、搜索和分析日志数据。
- Kibana: 用于日志数据的可视化和搜索分析。
一,安装配置kafka集群
1.1,安装JDK8
dnf install -y java-1.8.0-openjdk
1.2,下载kafka
wget https://archive.apache.org/dist/kafka/3.6.0/kafka-3.6.0-src.tgz
tar -zxvf kafka-3.6.0-src.tgz -C /usr/local/
mv /usr/local/kafka-3.6.0-src/ /usr/local/kafka/ && cd /usr/local/kafka/
1.3,配置zookeeper
#配置文件 /usr/local/kafka/config/zookeeper.properties
[root@kafka1 kafka]# grep -Ev "^$|[#;]" /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
maxClientCnxns=0
admin.enableServer=false
server.1=10.170.0.8:2888:3888
server.2=10.170.0.9:2888:3888
server.3=10.170.0.10:2888:3888
#创建data、log目录
mkdir -p /opt/data/zookeeper/{data,logs}
#创建myid文件,myid号按顺序排
[root@kafka1 kafka]# echo 1 > /opt/data/zookeeper/data/myid
[root@kafka2 kafka]# echo 2 > /opt/data/zookeeper/data/myid
[root@kafka3 kafka]# echo 3 > /opt/data/zookeeper/data/myid
1.4,配置kafka
#配置文件内容 /usr/local/kafka/config/server.properties
[root@kafka1 kafka]# grep -Ev "^$|[#;]" /usr/local/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://10.170.0.8:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafa/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=10.170.0.8:2181,10.170.0.9:2181,10.170.0.10:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
只需把配置好的安装包直接分发到其他节点,修改 Kafka的broker.id和 listeners就可以了。
1.5,启动zookeeper、kafka
#以下配置需要在三个节点依次启动
cd /usr/local/kafka
#启动zk
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
#启动kafka
nohup bin/kafka-server-start.sh config/server.properties &
#查验端口
[root@kafka1 kafka]# ss -nltp
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 128 0.0.0.0:22 0.0.0.0:* users:(("sshd",pid=11168,fd=3))
LISTEN 0 50 *:39197 *:* users:(("java",pid=84740,fd=332))
LISTEN 0 50 *:2181 *:* users:(("java",pid=84740,fd=342))
LISTEN 0 50 *:36827 *:* users:(("java",pid=84867,fd=332))
LISTEN 0 50 [::ffff:10.170.0.8]:9092 *:* users:(("java",pid=84867,fd=368))
LISTEN 0 50 [::ffff:10.170.0.8]:3888 *:* users:(("java",pid=84740,fd=346))
LISTEN 0 128 [::]:22 [::]:* users:(("sshd",pid=11168,fd=4))
1.6,验证kafka集群
查看集群内节点
#在kafka1上创建topic
[root@kafka1 kafka]# bin/kafka-topics.sh --create --bootstrap-server 10.170.0.8:9092 --replication-factor 1 --partitions 1 --topic testtopic
Created topic testtopic.
#在kafka2上查看刚才创建的topic
[root@kafka2 kafka]# bin/kafka-topics.sh --bootstrap-server 10.170.0.9:9092 --list
testtopic
二,kafka+ELK
由于无法再多开机器,只能一机多用了,本地dns如下😭😭😭:
10.170.0.2 es1
10.170.0.8 kafka-es2 kafka1
10.170.0.9 kafka-es3 kafka2
10.170.0.10 kafka-logstash kafka3
filebeat kibana
2.1,使用logstash从kafka收获取数据并输出到es集群
在kafka3上安装一个nginx,并安装logstash,过程略,logstash配置文件如下:
[root@kafka3 logstash]# cat conf.d/test01.conf
input {
kafka {
type => "nginx_log"
codec => "json"
topics => ["nginx"]
decorate_events => true
bootstrap_servers => "10.170.0.8:9092, 10.170.0.9:9092, 10.170.0.10:9092"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" } }
}
output {
stdout {}
if [type] == "nginx_log" { #和上面的type一致
elasticsearch {
index => "nginx-%{+YYYY.MM.dd}"
codec => "json"
hosts => ["es1:9200","kafka-es2:9200","kafka-es3:9200"]
}
}
}
2.2,使用filebeat输出到kafka
在kafka3上安装,由于之前已经安装了elasticsearch的仓库,所以dnf直接安装就行
dnf install filebeat -y
#开启nginx模块
filebeat modules enable nginx
#打开相应的日志(false改成true)
[root@kafka3 logstash]# sed '/^[[:blank:]]*#/d;/^$/d' /etc/filebeat/modules.d/nginx.yml
- module: nginx
access:
enabled: true
error:
enabled: true
#编辑filebeat配置文件
vim /etc/filebeat/filebeat.yml
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka-es2:9092", "kafka-es3:9092", "kafka-logstash:9092"]
topic: 'nginx'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
~
#kafka集群上验证kafka是否生成topic
[root@kafka3 logstash]# sh /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 10.170.0.10:9092 --list
__consumer_offsets
nginx
testtopic
#在es集群上查看索引
[root@kafka2 ~]# curl -X GET "es1:9200/_cat/indices"
green open nginx-2024.04.28 M1YQrcaZRO2cZX0J6ncbBA 1 1 59 0 393kb 236.2kb 236.2kb
2.3,安装、使用kibana查看视图
同样在kafka3上安装kibana
dnf install -y kibana
#配置文件如下
[root@kafka3 ~]# grep -Ev "^$|[#;]" /etc/kibana/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
server.name: "kibana"
elasticsearch.hosts: ["http://es1:9200","http://kafka-es2:9200","http://kafka-es3:9200"]
logging:
appenders:
file:
type: file
fileName: /var/log/kibana/kibana.log
layout:
type: json
root:
appenders:
- default
- file
pid.file: /run/kibana/kibana.pid
i18n.locale: "zh-CN"
#启动
systemctl enable kibana
systemctl restart kibana
登录web页面查看
评论
发表评论