业务中需要引入消息队列,团队最终选型定下来是kafka。所以决定本地单机部署kafka用于学习使用。

概述

kafka 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。

基本概念

  • Broker:消息中间件所在的服务器,负责创建topic,记录消息处理的过程,将消息保存在内存中,然后持久化到磁盘(发布过的消息无论消费与否都会持久化到磁盘中)。
  • Topic:每条发布到Kafka集群的消息的类别。(物理上不同的topic的消息分开存储,逻辑上一个topic的消息保存于一个或者多个broker上,用户无需关心数据存储在哪,只需指定消息的topic即可消费或者生产数据)。
  • Partotion:partition是物理上的概念,体现于磁盘,每一个topic包含一个或多个partition。
  • Producer:消息和数据的生产者,负责将消息push到指定的broker的topic中。
  • Consumer:负责从broker读取消息并消费的消费者。
  • Consumer Group:每个consumer属于一个指定的group(每个consumer都可以指定独立的group,不指定则属于默认的group)。
  • Offset:偏移量,用来确定消息是否被消费的标识,内部实现是一个递增的数字。
  • ZooKeeper:zookeeper负责维护整个集群的状态,存储各个节点的信息及状态,实现集群的高可用。

优缺点

优点

  • 基于磁盘的数据存储
  • 高伸缩性
  • 高性能
  • 分布式

缺点

  • 运维难度大
  • 对zookeeper强依赖
  • 多副本模式对带宽要求高

单机部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
version: '3.5'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
volumes:
- /var/run/docker.sock:/var/run/docker.sock
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.215.21 ## 广播主机名称,一般用IP指定
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: 120
KAFKA_MESSAGE_MAX_BYTES: 10000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
KAFKA_NUM_PARTITIONS: 3
KAFKA_DELETE_RETENTION_MS: 1000
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.215.21:9092
KAFKA_BROKER_ID: 1
kafka-manager:
image: sheepkiller/kafka-manager
container_name: kafka-manager
environment:
ZK_HOSTS: 192.168.215.21 ## 修改:宿主机IP
ports:
- "9009:9000"

参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 广播主机名称,一般用IP指定
KAFKA_ADVERTISED_HOST_NAME:
# Zookeeper连接地址,格式:zoo1:port1,zoo2:port2:/path
KAFKA_ZOOKEEPER_CONNECT:
# Kafka启动所使用的的协议及端口
KAFKA_LISTENERS:
# Kafka广播地址及端口,告诉客户端,使用什么地址和端口能连接到Kafka,不指定,宿主机以外的客户端将无法连接到Kafka
KAFKA_ADVERTISED_LISTENERS:
# 指定BrokerId,如果不指定,将会自己生成
KAFKA_BROKER_ID:
# topic的分区数
KAFKA_NUM_PARTITIONS: 3
# broker端的leader分区在想其他follower分区复制消息时候 ,允许的单条消息的最大值
KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
# broker的topic分区leader接受数据的时候,允许的单条消息的最大值,默认为1M
KAFKA_MESSAGE_MAX_BYTES: 10000000
# 日志文件保存120个小时
KAFKA_LOG_RETENTION_HOURS: 120

部署命令

1
2
3
4
5
6
# 创建并启动容器,可以使用 -f 参数指定docker-compose.yml文件
sudo docker-compose up -d
# 查看状态
sudo docker-compose ps
# 停止并删除容器,可以使用 -f 参数指定docker-compose.yml文件
sudo docker-compose down