概念和基本架构

Kafka 介绍

Kafka是最初由Linkedin公司开发,是⼀个分布式、分区的、多副本的、多⽣产者、多订阅者,基于zookeeper协调的分布式⽇志系统(也可以当做MQ系统),常⻅可以⽤于web/nginx⽇志、访问⽇志,消息服务等等,Linkedin于2010年贡献给了Apache基⾦会并成为顶级开源项⽬。

主要应⽤场景是:⽇志收集系统和消息系统。

Kafka主要设计⽬标如下:

  • 以时间复杂度为O(1)的⽅式提供消息持久化能⼒,即使对TB级以上数据也能保证常数时间的访问性能。
  • ⾼吞吐率。即使在⾮常廉价的商⽤机器上也能做到单机⽀持每秒100K条消息的传输。
  • ⽀持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。同时⽀持离线数据处理和实时数据处理。
  • ⽀持在线⽔平扩展

img

有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。⼤部分的消息系统选⽤发布-订阅模式。Kafka就是⼀种发布-订阅模式

对于消息中间件,消息分推拉两种模式。Kafka只有消息的拉取,没有推送,可以通过轮询实现消息的推送。

  1. Kafka在⼀个或多个可以跨越多个数据中⼼的服务器上作为集群运⾏。

  2. Kafka集群中按照主题分类管理,⼀个主题可以有多个分区,⼀个分区可以有多个副本分区。

  3. 每个记录由⼀个键,⼀个值和⼀个时间戳组成。

Kafka具有四个核⼼API:

  1. Producer API:允许应⽤程序将记录流发布到⼀个或多个Kafka主题。

  2. Consumer API:允许应⽤程序订阅⼀个或多个主题并处理为其⽣成的记录流。

  3. Streams API:允许应⽤程序充当流处理器,使⽤⼀个或多个主题的输⼊流,并⽣成⼀个或多个输出主题的输出流,从⽽有效地将输⼊流转换为输出流。

  4. Connector API:允许构建和运⾏将Kafka主题连接到现有应⽤程序或数据系统的可重⽤⽣产者或使⽤者。例如,关系数据库的连接器可能会捕获对表的所有更改。

Kafka优势

  1. ⾼吞吐量:单机每秒处理⼏⼗上百万的消息量。即使存储了许多TB的消息,它也保持稳定的性能。

  2. ⾼性能:单节点⽀持上千个客户端,并保证零停机和零数据丢失。

  3. 持久化数据存储:将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防⽌数据丢失。

    1. 零拷⻉

    2. 顺序读,顺序写

    3. 利⽤Linux的⻚缓存

  4. 分布式系统,易于向外扩展。所有的Producer、Broker和Consumer都会有多个,均为分布的。⽆需停机即可扩展机器。多个Producer、Consumer可能是不同的应⽤。

  5. 可靠性 - Kafka是分布式,分区,复制和容错的。

  6. 客户端状态维护:消息被处理的状态是在Consumer端维护,⽽不是由server端维护。当失败时能⾃动平衡。

  7. ⽀持online和offline的场景。

  8. ⽀持多种客户端语⾔。Kafka⽀持Java、.NET、PHP、Python等多种语⾔。

Kafka应⽤场景

⽇志收集:⼀个公司可以⽤Kafka可以收集各种服务的Log,通过Kafka以统⼀接⼝服务的⽅式开放给各种

Consumer;

消息系统:解耦⽣产者和消费者、缓存消息等;

⽤户活动跟踪:Kafka经常被⽤来记录Web⽤户或者App⽤户的各种活动,如浏览⽹⻚、搜索、点击等活动,这些活动信息被各个服务器发布到Kafka的Topic中,然后消费者通过订阅这些Topic来做实时的监控分析,亦可保存到数据库;

运营指标:Kafka也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产各种操作的集中反馈,⽐如报警和报告;

流式处理:⽐如Spark Streaming和Storm。

Kafka 单机版安装

需要安装好 JDK8 和 Zookeeper

安装 Kafka

  1. 下载 kafka_2.12-1.0.1.tgz

  2. 解压 kafka

1
tar -zxvf kafka_2.12-1.0.1.tgz -C /opt/servers
  1. 设置环境变量
1
2
3
4
5
vim /etc/profile

# Kafka
export KAFKA_HOME=/opt/servers/kafka_2.12-1.0.1
export PATH=$PATH:$KAFKA_HOME/bin
  1. 生效
1
source /etc/profile

配置 Kafka

1
2
3
cd $KAFKA_HOME/config
ls
vim server.properties
  1. 配置 Kafka 元数据的位置
1
2
- zookeeper.connect=localhost:2181
+ zookeeper.connect=localhost:2181/myKafak
  1. 配置 kafka 持久化消息目录

    创建目录 mkdir -p /var/kafka/kafka-logs

1
2
3
- log.dirs=/tmp/kafka-logs
# dir 不是 dirs
+ log.dir=/var/kafka/kafka-logs
  1. JVM内存配置

如果机器内存较小,需要调整启动配置

vim $KAFKA_HOME/bin/kafka-server-start.sh

1
2
- export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G
+ export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

启动 Kafka

1
2
3
4
5
6
7
8
9
# 启动 zookeeper 
zkServer.sh start

cd $KAFKA_HOME/bin

# 前台运行
kafka-server-start.sh ../config/server.properties
# 后台运行
kafka-server-start.sh -daemon ../config/server.properties