Kafka 集群安装与使用
概念和基本架构
Kafka 介绍
Kafka 是最初由 Linkedin 公司开发,是⼀个分布式、分区的、多副本的、多⽣产者、多订阅者,基于 zookeeper 协调的分布式⽇志系统(也可以当做 MQ 系统),常⻅可以⽤于 web/nginx ⽇志、访问⽇志,消息服务等等,Linkedin 于 2010 年贡献给了 Apache 基⾦会并成为顶级开源项⽬。
主要应⽤场景是:⽇志收集系统和消息系统。
Kafka 主要设计⽬标如下:
- 以时间复杂度为 O (1) 的⽅式提供消息持久化能⼒,即使对 TB 级以上数据也能保证常数时间的访问性能。
- ⾼吞吐率。即使在⾮常廉价的商⽤机器上也能做到单机⽀持每秒 100K 条消息的传输。
- ⽀持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 partition 内的消息顺序传输。同时⽀持离线数据处理和实时数据处理。
- ⽀持在线⽔平扩展
有两种主要的消息传递模式:点对点传递模式、发布 - 订阅模式。⼤部分的消息系统选⽤发布 - 订阅模式。Kafka 就是⼀种发布 - 订阅模式。
对于消息中间件,消息分推拉两种模式。Kafka 只有消息的拉取,没有推送,可以通过轮询实现消息的推送。
Kafka 在⼀个或多个可以跨越多个数据中⼼的服务器上作为集群运⾏。
Kafka 集群中按照主题分类管理,⼀个主题可以有多个分区,⼀个分区可以有多个副本分区。
每个记录由⼀个键,⼀个值和⼀个时间戳组成。
Kafka 具有四个核⼼ API:
Producer API:允许应⽤程序将记录流发布到⼀个或多个 Kafka 主题。
Consumer API:允许应⽤程序订阅⼀个或多个主题并处理为其⽣成的记录流。
Streams API:允许应⽤程序充当流处理器,使⽤⼀个或多个主题的输⼊流,并⽣成⼀个或多个输出主题的输出流,从⽽有效地将输⼊流转换为输出流。
Connector API:允许构建和运⾏将 Kafka 主题连接到现有应⽤程序或数据系统的可重⽤⽣产者或使⽤者。例如,关系数据库的连接器可能会捕获对表的所有更改。
Kafka 优势
⾼吞吐量:单机每秒处理⼏⼗上百万的消息量。即使存储了许多 TB 的消息,它也保持稳定的性能。
⾼性能:单节点⽀持上千个客户端,并保证零停机和零数据丢失。
持久化数据存储:将消息持久化到磁盘。通过将数据持久化到硬盘以及 replication 防⽌数据丢失。
零拷⻉
顺序读,顺序写
利⽤ Linux 的⻚缓存
分布式系统,易于向外扩展。所有的 Producer、Broker 和 Consumer 都会有多个,均为分布的。⽆需停机即可扩展机器。多个 Producer、Consumer 可能是不同的应⽤。
可靠性 - Kafka 是分布式,分区,复制和容错的。
客户端状态维护:消息被处理的状态是在 Consumer 端维护,⽽不是由 server 端维护。当失败时能⾃动平衡。
⽀持 online 和 offline 的场景。
⽀持多种客户端语⾔。Kafka ⽀持 Java、.NET、PHP、Python 等多种语⾔。
Kafka 应⽤场景
⽇志收集:⼀个公司可以⽤ Kafka 可以收集各种服务的 Log,通过 Kafka 以统⼀接⼝服务的⽅式开放给各种
Consumer;
消息系统:解耦⽣产者和消费者、缓存消息等;
⽤户活动跟踪:Kafka 经常被⽤来记录 Web ⽤户或者 App ⽤户的各种活动,如浏览⽹⻚、搜索、点击等活动,这些活动信息被各个服务器发布到 Kafka 的 Topic 中,然后消费者通过订阅这些 Topic 来做实时的监控分析,亦可保存到数据库;
运营指标:Kafka 也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产各种操作的集中反馈,⽐如报警和报告;
流式处理:⽐如 Spark Streaming 和 Storm。
Kafka 单机版安装
需要安装好 JDK8 和 Zookeeper
安装 Kafka
解压 kafka
1 | tar -zxvf kafka_2.12-1.0.1.tgz -C /opt/servers |
- 设置环境变量
1 | vim /etc/profile |
- 生效
1 | source /etc/profile |
配置 Kafka
1 | cd $KAFKA_HOME/config |
- 配置 Kafka 元数据的位置
1 | - zookeeper.connect=localhost:2181 |
配置 kafka 持久化消息目录
创建目录
mkdir -p /var/kafka/kafka-logs
1 | - log.dirs=/tmp/kafka-logs |
- JVM 内存配置
如果机器内存较小,需要调整启动配置
vim $KAFKA_HOME/bin/kafka-server-start.sh
1
2- export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G
+ export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
启动 Kafka
1 | 启动 zookeeper |