觀熱點(diǎn):大數(shù)據(jù)NiFi(二十一):監(jiān)控日志文件生產(chǎn)到Kafka
?監(jiān)控日志文件生產(chǎn)到Kafka
案例:監(jiān)控某個(gè)目錄下的文件內(nèi)容,將消息生產(chǎn)到Kafka中。
此案例使用到“TailFile”和“PublishKafka_1_0”處理器。
(資料圖片)
一、???????配置“TailFile”處理器
創(chuàng)建“TailFile”處理器并配置:
注意:以上需要在NiFi集群中的每個(gè)節(jié)點(diǎn)上創(chuàng)建“/root/test/logdata”文件,“l(fā)ogdata”是文件,而非目錄。
二、配置“PublishKafka_1_0”處理器
“PublishKafka_1_0”處理器作用是使用Kafka 1.0生產(chǎn)者API將FlowFile的內(nèi)容作為消息發(fā)送給Apache Kafka。發(fā)送的內(nèi)容可以是單獨(dú)的FlowFile,也可以通過用戶指定分隔符分割的FlowFile內(nèi)容。
關(guān)于“PublishKafka_1_0”處理器的“Properties”主要配置的說明如下:
配置項(xiàng) | 默認(rèn)值 | 允許值 | 描述 |
---|---|---|---|
Kafka Brokers(Kafka節(jié)點(diǎn)) | localhost:9092 | 逗號(hào)分割的Kafka集群Broker列表。格式:host:port | |
Topic Name(topic 名稱) | 將消息生產(chǎn)到的Topic 名稱。 | ||
Delivery Guarantee(數(shù)據(jù)傳遞保證) | 0 | 指定保證消息被發(fā)送到Kafka的要求。對(duì)應(yīng)Kafka的"acks"屬性??梢耘渲玫捻?xiàng)如下:Best Effort (盡力交付,相當(dāng)于ack=0):在向Kafka節(jié)點(diǎn)寫出消息后,F(xiàn)lowFile將被路由到成功,而不需要等待響應(yīng)。這提供了最好的性能,但可能會(huì)導(dǎo)致數(shù)據(jù)丟失。例如:消息寫出到Kafka節(jié)點(diǎn),但是對(duì)應(yīng)節(jié)點(diǎn)掛掉,這時(shí)將消息路由到成功。Guarantee Single Node Delivery(保證單節(jié)點(diǎn)交付,相當(dāng)于ack=1,Kafka中的默認(rèn)配置):KafkaProducer把消息發(fā)送出去,至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log,但是并沒有等待所有follower是否成功寫入。該情況下,如果follower沒有成功備份數(shù)據(jù),而此時(shí)leader剛好又掛掉了,就會(huì)導(dǎo)致消息丟失。該選項(xiàng)就是如果消息被單個(gè)Kafka節(jié)點(diǎn)接收到,F(xiàn)lowFile將被路由到成功,無論它是否被復(fù)制,但如果Kafka節(jié)點(diǎn)崩潰,可能會(huì)導(dǎo)致數(shù)據(jù)丟失。Guarantee Replicated Delivery(保證復(fù)制交付,相當(dāng)于ack=-1):FlowFile數(shù)據(jù)寫出后,Kafka topic ISR列表離跟leader保持同步的那些follower都要把消息同步過去,該消息才會(huì)被認(rèn)為成功,否則路由到失敗。 | |
Use Transactions(使用事務(wù)) | true | ?true?false | 指定NiFi是否應(yīng)該在與Kafka通信時(shí)提供事務(wù)性保證。如果發(fā)送數(shù)據(jù)到Kafka有問題,并且這個(gè)屬性設(shè)置為false,那么已經(jīng)發(fā)送到Kafka的消息將繼續(xù)發(fā)送,并被傳遞給消費(fèi)者。如果這個(gè)設(shè)置為true,那么Kafka事務(wù)將被回滾,這樣這些消息對(duì)消費(fèi)者是不可用的。將此設(shè)置為true需要將 |
在向Kafka節(jié)點(diǎn)寫出消息后,F(xiàn)lowFile將被路由到成功,而不需要等待響應(yīng)。這提供了最好的性能,但可能會(huì)導(dǎo)致數(shù)據(jù)丟失。例如:消息寫出到Kafka節(jié)點(diǎn),但是對(duì)應(yīng)節(jié)點(diǎn)掛掉,這時(shí)將消息路由到成功。
Guarantee Single Node Delivery(保證單節(jié)點(diǎn)交付,相當(dāng)于ack=1,Kafka中的默認(rèn)配置):KafkaProducer把消息發(fā)送出去,至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log,但是并沒有等待所有follower是否成功寫入。該情況下,如果follower沒有成功備份數(shù)據(jù),而此時(shí)leader剛好又掛掉了,就會(huì)導(dǎo)致消息丟失。該選項(xiàng)就是如果消息被單個(gè)Kafka節(jié)點(diǎn)接收到,F(xiàn)lowFile將被路由到成功,無論它是否被復(fù)制,但如果Kafka節(jié)點(diǎn)崩潰,可能會(huì)導(dǎo)致數(shù)據(jù)丟失。 Guarantee Replicated Delivery(保證復(fù)制交付,相當(dāng)于ack=-1): FlowFile數(shù)據(jù)寫出后,Kafka topic ISR列表離跟leader保持同步的那些follower都要把消息同步過去,該消息才會(huì)被認(rèn)為成功,否則路由到失敗。 Use Transactions(使用事務(wù))true true false 指定NiFi是否應(yīng)該在與Kafka通信時(shí)提供事務(wù)性保證。如果發(fā)送數(shù)據(jù)到Kafka有問題,并且這個(gè)屬性設(shè)置為false,那么已經(jīng)發(fā)送到Kafka的消息將繼續(xù)發(fā)送,并被傳遞給消費(fèi)者。如果這個(gè)設(shè)置為true,那么Kafka事務(wù)將被回滾,這樣這些消息對(duì)消費(fèi)者是不可用的。將此設(shè)置為true需要將
“PublishKafka_1_0”處理器配置如下:
1、創(chuàng)建“PublishKafka_1_0”處理器
2、配置“PROPERTIES”
注意:以上topic 可以在Kafka中創(chuàng)建好,也可以執(zhí)行時(shí)自動(dòng)創(chuàng)建。
3、連接“TailFile”處理器和“PublishKafka_1_0”處理器
連接“TailFile”處理器和“PublishKafka_1_0”處理器,并設(shè)置“PublishKafka_1_0”處理器“failure”和“success”路由關(guān)系為自動(dòng)終止。
三、運(yùn)行測(cè)試
1、啟動(dòng)Kafka集群,啟動(dòng)NiFi處理流程
2、向/root/test/logdata文件中寫入數(shù)據(jù)并保存
向NiFi集群中的其中一臺(tái)節(jié)點(diǎn)的“l(fā)ogdata”中寫入以下數(shù)據(jù)即可
[root@node1 test]# echo "hello world1" > /root/test/logdata[root@node1 test]# echo "hello world2" >> /root/test/logdata[root@node1 test]# echo "hello world3" >> /root/test/logdata
3、查看Kafka中自動(dòng)創(chuàng)建的“nifi_topic”中的數(shù)據(jù)
以上數(shù)據(jù)每寫入一行,有個(gè)空行,這是由于“TailFile”處理器監(jiān)控?cái)?shù)據(jù)導(dǎo)致的,實(shí)際就是寫入了3條數(shù)據(jù),可以通過后期業(yè)務(wù)處理時(shí),對(duì)數(shù)據(jù)進(jìn)行trim處理即可。
關(guān)鍵詞:
[責(zé)任編輯:xwzkw]
相關(guān)閱讀
- (2023-03-09)觀熱點(diǎn):大數(shù)據(jù)NiFi(二十一):監(jiān)控日志文件生產(chǎn)到Kafka
- (2023-03-09)釹鐵硼磁鐵斷裂的原因
- (2023-03-09)【速看料】防溺水知識(shí) 文字_防溺水知識(shí)資料文字
- (2023-03-09)最資訊丨怎么查電腦現(xiàn)在的功耗_查看自己電腦功耗方法是什么
- (2023-03-08)天府農(nóng)博島迎來沐春季
- (2023-03-08)世界熱議:東北大米的特點(diǎn)是什么_東北大米的特點(diǎn)
- (2023-03-08)當(dāng)前聚焦:上海車展上市 東風(fēng)標(biāo)致408X將于3月21日開啟預(yù)售
- (2023-03-08)【焦點(diǎn)熱聞】幸福觸手可及播出時(shí)間
- (2023-03-08)三八節(jié),7or9用一支舞表達(dá)“舒適基本式”
- (2023-03-08)天生“悅”野,奇瑞TJ-1操穩(wěn)測(cè)試露出,這把“穩(wěn)”了
- (2023-03-08)每日聚焦:謝爾比GT350H福特賽車野馬絕非小馬
- (2023-03-08)與廣大女性共同成長(zhǎng),短視頻情感欄目“曼曼來了”將進(jìn)行全新品牌升級(jí)
- (2023-03-08)愛巢測(cè)—AMH居家檢測(cè),女性健康不容忽視的檢測(cè)項(xiàng)目
- (2023-03-08)輪胎上找不到生產(chǎn)日期是什么原因_輪胎上找不到生產(chǎn)日期
- (2023-03-08)全球動(dòng)態(tài):姜子牙被稱為姜太公的原因
- (2023-03-08)全球觀點(diǎn):03月08日09時(shí)山西運(yùn)城疫情數(shù)據(jù) 陽了以后為什么會(huì)腰疼?應(yīng)該怎么辦?
- (2023-03-08)世界觀察:孤門一輝
- (2023-03-08)南京市江寧區(qū)人社局領(lǐng)導(dǎo)一行蒞臨云生集團(tuán)走訪交流
- (2023-03-08)職場(chǎng)內(nèi)外貌受到指責(zé)的女性比率達(dá)到23%
- (2023-03-08)全國(guó)政協(xié)委員、四川銀行董事長(zhǎng)林罡:數(shù)字化融入發(fā)展血脈
- (2023-03-08)夏雨主演的有什么好看的電影
- (2023-03-08)天天觀熱點(diǎn):網(wǎng)紅板塊3月7日跌2.21%,新華制藥領(lǐng)跌,主力資金凈流出13.42億元
- (2023-03-08)【播資訊】樊長(zhǎng)使
- (2023-03-08)全球今日?qǐng)?bào)丨文件夾無法刪除的解決方法是什么_文件夾無法刪除的解決方法
- (2023-03-07)環(huán)球時(shí)訊:吉林工商學(xué)院幾本
- (2023-03-07)老爺爺疑惑查血糖不戳手指,醫(yī)生詳解靜脈血和指尖血“門道”
- (2023-03-07)焦點(diǎn)快播:最多九套全新外觀 《星戰(zhàn)幸存者》豪華版內(nèi)容介紹
- (2023-03-07)大玉兒傳奇
- (2023-03-07)港珠澳大橋出入境人數(shù)已超71萬人次
- (2023-03-07)贏渠梁怎么死的