標(biāo)題就是Kafka的定義,它用夠替代傳統(tǒng)的消息隊(duì)列用于解耦合數(shù)據(jù)處理,緩存未處理消息等,同時(shí)具有更高的吞吐率,支持分區(qū)、多副本、冗余,因此被廣泛用于大規(guī)模消息數(shù)據(jù)處理應(yīng)用。

我主要使用它來作數(shù)據(jù)實(shí)時(shí)計(jì)算,統(tǒng)計(jì)各種報(bào)表,如:小時(shí)報(bào)表、周報(bào)表、月報(bào)表、年報(bào)表等,以及其它報(bào)表,如:復(fù)購率統(tǒng)計(jì),當(dāng)然還有其它用途,這里只是拋磚引玉。
Kafka使用場(chǎng)景
日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者、緩存消息等。
用戶活動(dòng)跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動(dòng),如瀏覽網(wǎng)頁、搜索、點(diǎn)擊等活動(dòng),這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實(shí)時(shí)的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。
運(yùn)營指標(biāo):Kafka也經(jīng)常用來記錄運(yùn)營監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告。
流式處理:比如spark streaming和storm

Kafka特性
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個(gè)topic可以分多個(gè)partition, consumer group 對(duì)partition進(jìn)行consume操作。
可擴(kuò)展性:kafka集群支持熱擴(kuò)展
持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失?。?/p>
高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫
基本概念介紹
Broker:Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這些服務(wù)器就是Broker
Topic:每條發(fā)布到Kafka集群的消息都必須有一個(gè)Topic
Partition:是物理概念上的分區(qū),為了提供系統(tǒng)吞吐率,在物理上每個(gè)Topic會(huì)分成一個(gè)或多個(gè)Partition,每個(gè)Partition對(duì)應(yīng)一個(gè)文件夾
Producer:消息產(chǎn)生者,負(fù)責(zé)生產(chǎn)消息并發(fā)送到Kafka Broker
Consumer:消息消費(fèi)者,向kafka broker讀取消息并處理的客戶端。
Consumer Group:每個(gè)Consumer屬于一個(gè)特定的組,組可以用來實(shí)現(xiàn)一條消息被組內(nèi)多個(gè)成員消費(fèi)等功能。
下面介紹Kafka 在Centos 7上的安裝和使用,包括功能驗(yàn)證和集群的簡(jiǎn)單配置。
1.安裝JDK
若已安裝jdk環(huán)境,跳過此步驟。Kafka 使用Zookeeper 來保存相關(guān)配置信息,Kafka及Zookeeper 依賴Java 運(yùn)行環(huán)境,從oracle網(wǎng)站下載JDK 安裝包,解壓安裝:
$ tar zxvf jdk-8u65-linux-x64.tar.gz
$ mv jdk1.8.0_65 java
設(shè)置Java 環(huán)境變量:
JAVA_HOME=/opt/java
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME PATH
也可以選擇yum install安裝,相應(yīng)設(shè)置環(huán)境變量。
2. 安裝Kafka
從官網(wǎng)(
http://kafka.apache.org/downloads.html)下載Kafka 安裝包,解壓安裝:
tar zxvf kafka_2.11-0.8.2.2.tgz
mv kafka_2.11-0.8.2.2 /opt/zookeeper/kafka
cd /opt/zookeeper/kafka
3.相關(guān)配置
kafak的配置文件在/opt/kafka_2.11/config下叫server.propertie,釋義如下:
broker.id=0 #當(dāng)前機(jī)器在集群中的唯一標(biāo)識(shí),和zookeeper的myid性質(zhì)一樣,但是不管你怎么配,別配0就是,不然創(chuàng)建Topic的時(shí)候回報(bào)錯(cuò)。
port=19092 #當(dāng)前kafka對(duì)外提供服務(wù)的端口默認(rèn)是9092
host.name=192.168.7.100 #這個(gè)參數(shù)默認(rèn)是關(guān)閉的,在0.8.1有個(gè)bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個(gè)是borker進(jìn)行網(wǎng)絡(luò)處理的線程數(shù)
num.io.threads=8 #這個(gè)是borker進(jìn)行I/O處理的線程數(shù)
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個(gè)目錄可以配置為“,”逗號(hào)分割的表達(dá)式,上面的num.io.threads要大于這個(gè)目錄的個(gè)數(shù)這個(gè)目錄,如果配置多個(gè)目錄,新創(chuàng)建的topic他把消息持久化的地方是,當(dāng)前以逗號(hào)分割的目錄中,那個(gè)分區(qū)數(shù)最少就放那一個(gè)
socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲(chǔ)到緩沖區(qū)了到達(dá)一定的大小后在發(fā)送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當(dāng)數(shù)據(jù)到達(dá)一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 #這個(gè)參數(shù)是向kafka請(qǐng)求消息或者向kafka發(fā)送消息的請(qǐng)請(qǐng)求的最大數(shù),這個(gè)值不能超過java的堆棧大小
num.partitions=1 #默認(rèn)的分區(qū)數(shù),一個(gè)topic默認(rèn)1個(gè)分區(qū)數(shù)
log.retention.hours=168 #默認(rèn)消息的最大持久化時(shí)間,168小時(shí),7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數(shù),如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù)
replica.fetch.max.bytes=5242880 #取消息的最大直接數(shù)
log.segment.bytes=1073741824 #這個(gè)參數(shù)是:因?yàn)閗afka的消息是以追加的形式落地到文件,當(dāng)超過這個(gè)值的時(shí)候,kafka會(huì)新起一個(gè)文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時(shí)間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #設(shè)置zookeeper的連接端口
實(shí)際上需要修改的就幾個(gè):
broker.id=133 #每臺(tái)服務(wù)器的broker.id都不能相同
host.name=192-168-253-133 #主機(jī)名
listeners=
PLAINTEXT://PLAINTEXT://192.168.1.220:9092 #監(jiān)聽地址advertised.listeners=PLAINTEXT://192.168.1.220:9092
并開啟端口
#在log.retention.hours=168 下追加
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#設(shè)置zookeeper的連接端口
zookeeper.connect=192.168.1.220:2181
4.啟動(dòng)Zookeeper
使用安裝包中的腳本啟動(dòng)單節(jié)點(diǎn)Zookeeper 實(shí)例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[2015-10-26 04:26:59,585] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)….
當(dāng)kafka啟動(dòng)成功后,在終端上輸入“jps”,會(huì)得到相應(yīng)如下:
7253 Jps
5850 ZooKeeperMain
6076 QuorumPeerMain
6093 Kafka
QuorumPeerMain是zookeeper的守護(hù)進(jìn)程,kafka是kafka的守護(hù)進(jìn)程。
5.啟動(dòng)Kafka 服務(wù)
使用kafka-server-start.sh 啟動(dòng)kafka 服務(wù):
bin/kafka-server-start.sh config/server.properties
[2015-10-26 04:28:56,115] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-10-26 04:28:56,141] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
6.創(chuàng)建topic
使用kafka-topics.sh 創(chuàng)建單分區(qū)單副本的topic test:
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
7.查看topic
bin/kafka-topics.sh –list –zookeeper localhost:2181
test
8.產(chǎn)生消息
使用kafka-console-producer.sh 發(fā)送消息:
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
Hello world!
Hello Kafka!
9.消費(fèi)消息
使用kafka-console-consumer.sh 接收消息并在終端打印:
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 kafka:2181 –topic test –from-beginning
Hello world!
Hello Kafka!
10.刪除Topic
bin/kafka-topics.sh –delete –zookeeper localhost:2181 –topic test
11.查看描述 Topic 信息
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test
12.關(guān)閉
bin/kafka-server-stop.sh
13.關(guān)不掉可以強(qiáng)制kill
kill -9 進(jìn)程id
14.查看消費(fèi)分組
bin/kafka-consumer-groups.sh –bootstrap-server 127.0.0.1:9092 –list
test2
test
console-consumer-95156
test1
15.查看特定consumer group 詳情,使用–group與–describe參數(shù)
bin/kafka-consumer-groups.sh –bootstrap-server 127.0.0.1:9092 –group test –describe
集群配置

1.單機(jī)多broker 集群配置
利用單節(jié)點(diǎn)部署多個(gè)broker。 不同的broker 設(shè)置不同的 id,監(jiān)聽端口及日志目錄。 例如:
cp config/server.properties config/server-1.properties
編輯配置:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
啟動(dòng)Kafka服務(wù):
bin/kafka-server-start.sh config/server-1.properties &
啟動(dòng)多個(gè)服務(wù),按上文類似方式產(chǎn)生和消費(fèi)消息。
2.多機(jī)多broker 集群配置
分別在多個(gè)節(jié)點(diǎn)按上述方式安裝Kafka,配置啟動(dòng)多個(gè)Zookeeper 實(shí)例。 例如: 在192.168.1.221,192.168.1.222,192.168.1.223 三臺(tái)機(jī)器部署,Zookeeper配置如下:
initLimit=5
syncLimit=2
server.1=192.168.1.221:2888:3888
server.2=192.168.1.222:2888:3888
server.3=192.168.1.223:2888:3888
分別配置多個(gè)機(jī)器上的Kafka服務(wù) 設(shè)置不同的broke id,zookeeper.connect設(shè)置如下:
zookeeper.connect=192.168.1.221:2181,192.168.1.222:2181,192.168.1.223:2181
啟動(dòng)Zookeeper與Kafka服務(wù),按上文方式產(chǎn)生和消費(fèi)消息,驗(yàn)證集群功能。
好了,Kafka已經(jīng)安裝完畢。它支持Java 及多種其它語言客戶端,可與Hadoop、Storm、Spark等其它大數(shù)據(jù)工具結(jié)合使用。
趕快試試吧!
版權(quán)聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶自發(fā)貢獻(xiàn),該文觀點(diǎn)僅代表作者本人。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如發(fā)現(xiàn)本站有涉嫌抄襲侵權(quán)/違法違規(guī)的內(nèi)容, 請(qǐng)發(fā)送郵件至2705686032@qq.com 舉報(bào),一經(jīng)查實(shí),本站將立刻刪除。文章鏈接:http://m.z1146.cn/zx/yunwei/4299.html