Kafka 集群安装与使用

概念和基本架构

Kafka 介绍

Kafka 是最初由 Linkedin 公司开发,是⼀个分布式、分区的、多副本的、多⽣产者、多订阅者,基于 zookeeper 协调的分布式⽇志系统(也可以当做 MQ 系统),常⻅可以⽤于 web/nginx ⽇志、访问⽇志,消息服务等等,Linkedin 于 2010 年贡献给了 Apache 基⾦会并成为顶级开源项⽬。

主要应⽤场景是:⽇志收集系统和消息系统。

Kafka 主要设计⽬标如下:

  • 以时间复杂度为 O (1) 的⽅式提供消息持久化能⼒,即使对 TB 级以上数据也能保证常数时间的访问性能。
  • ⾼吞吐率。即使在⾮常廉价的商⽤机器上也能做到单机⽀持每秒 100K 条消息的传输。
  • ⽀持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 partition 内的消息顺序传输。同时⽀持离线数据处理和实时数据处理。
  • ⽀持在线⽔平扩展

img

有两种主要的消息传递模式:点对点传递模式、发布 - 订阅模式。⼤部分的消息系统选⽤发布 - 订阅模式。Kafka 就是⼀种发布 - 订阅模式

对于消息中间件,消息分推拉两种模式。Kafka 只有消息的拉取,没有推送,可以通过轮询实现消息的推送。

  1. Kafka 在⼀个或多个可以跨越多个数据中⼼的服务器上作为集群运⾏。

  2. Kafka 集群中按照主题分类管理,⼀个主题可以有多个分区,⼀个分区可以有多个副本分区。

  3. 每个记录由⼀个键,⼀个值和⼀个时间戳组成。

Kafka 具有四个核⼼ API:

  1. Producer API:允许应⽤程序将记录流发布到⼀个或多个 Kafka 主题。

  2. Consumer API:允许应⽤程序订阅⼀个或多个主题并处理为其⽣成的记录流。

  3. Streams API:允许应⽤程序充当流处理器,使⽤⼀个或多个主题的输⼊流,并⽣成⼀个或多个输出主题的输出流,从⽽有效地将输⼊流转换为输出流。

  4. Connector API:允许构建和运⾏将 Kafka 主题连接到现有应⽤程序或数据系统的可重⽤⽣产者或使⽤者。例如,关系数据库的连接器可能会捕获对表的所有更改。

Kafka 优势

  1. ⾼吞吐量:单机每秒处理⼏⼗上百万的消息量。即使存储了许多 TB 的消息,它也保持稳定的性能。

  2. ⾼性能:单节点⽀持上千个客户端,并保证零停机和零数据丢失。

  3. 持久化数据存储:将消息持久化到磁盘。通过将数据持久化到硬盘以及 replication 防⽌数据丢失。

    1. 零拷⻉

    2. 顺序读,顺序写

    3. 利⽤ Linux 的⻚缓存

  4. 分布式系统,易于向外扩展。所有的 Producer、Broker 和 Consumer 都会有多个,均为分布的。⽆需停机即可扩展机器。多个 Producer、Consumer 可能是不同的应⽤。

  5. 可靠性 - Kafka 是分布式,分区,复制和容错的。

  6. 客户端状态维护:消息被处理的状态是在 Consumer 端维护,⽽不是由 server 端维护。当失败时能⾃动平衡。

  7. ⽀持 online 和 offline 的场景。

  8. ⽀持多种客户端语⾔。Kafka ⽀持 Java、.NET、PHP、Python 等多种语⾔。

Kafka 应⽤场景

⽇志收集:⼀个公司可以⽤ Kafka 可以收集各种服务的 Log,通过 Kafka 以统⼀接⼝服务的⽅式开放给各种

Consumer;

消息系统:解耦⽣产者和消费者、缓存消息等;

⽤户活动跟踪:Kafka 经常被⽤来记录 Web ⽤户或者 App ⽤户的各种活动,如浏览⽹⻚、搜索、点击等活动,这些活动信息被各个服务器发布到 Kafka 的 Topic 中,然后消费者通过订阅这些 Topic 来做实时的监控分析,亦可保存到数据库;

运营指标:Kafka 也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产各种操作的集中反馈,⽐如报警和报告;

流式处理:⽐如 Spark Streaming 和 Storm。

Kafka 单机版安装

需要安装好 JDK8 和 Zookeeper

安装 Kafka

  1. 下载 kafka_2.12-1.0.1.tgz

  2. 解压 kafka

1
tar -zxvf kafka_2.12-1.0.1.tgz -C /opt/servers
  1. 设置环境变量
1
2
3
4
5
vim /etc/profile

# Kafka
export KAFKA_HOME=/opt/servers/kafka_2.12-1.0.1
export PATH=$PATH:$KAFKA_HOME/bin
  1. 生效
1
source /etc/profile

配置 Kafka

1
2
3
cd $KAFKA_HOME/config
ls
vim server.properties
  1. 配置 Kafka 元数据的位置
1
2
- zookeeper.connect=localhost:2181
+ zookeeper.connect=localhost:2181/myKafak
  1. 配置 kafka 持久化消息目录

    创建目录 mkdir -p /var/kafka/kafka-logs

1
2
3
- log.dirs=/tmp/kafka-logs
# dir 不是 dirs
+ log.dir=/var/kafka/kafka-logs
  1. JVM 内存配置

如果机器内存较小,需要调整启动配置

vim $KAFKA_HOME/bin/kafka-server-start.sh

1
2
- export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G
+ export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

启动 Kafka

1
2
3
4
5
6
7
8
9
# 启动 zookeeper 
zkServer.sh start

cd $KAFKA_HOME/bin

# 前台运行
kafka-server-start.sh ../config/server.properties
# 后台运行
kafka-server-start.sh -daemon ../config/server.properties