案例:將mysql中新增的數(shù)據(jù)實(shí)時(shí)同步到Hive中。
以上案例需要用到的處理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。
首先通過(guò)“CaptureChangeMySQL”讀取MySQL中數(shù)據(jù)的變化(需要開(kāi)啟MySQL binlog日志),將Binlog中變化的數(shù)據(jù)同步到“RouteOnAttribute”處理器,通過(guò)此處理器獲取上游數(shù)據(jù)屬性,獲取對(duì)應(yīng)binlog操作類型,再將想要處理的數(shù)據(jù)路由到“EvaluateJsonPath”處理器,該處理器可以將json格式的binlog數(shù)據(jù)解析,通過(guò)自定義json 表達(dá)式獲取json數(shù)據(jù)中的屬性放入FlowFile屬性,將FlowFile通過(guò)“ReplaceText”處理器獲取上游FowFile屬性,動(dòng)態(tài)拼接sql替換所有的FlowFile內(nèi)容,將拼接好的sql組成FlowFile路由到“PutHiveQL”將數(shù)據(jù)寫(xiě)入到Hive表。
(資料圖片)
mysql-binlog是MySQL數(shù)據(jù)庫(kù)的二進(jìn)制日志,記錄了所有的DDL和DML(除了數(shù)據(jù)查詢語(yǔ)句)語(yǔ)句信息。一般來(lái)說(shuō)開(kāi)啟二進(jìn)制日志大概會(huì)有1%的性能損耗。這里需要開(kāi)啟MySQL的binlog日志方便后期使用“CaptureChangeMySQL”處理器來(lái)獲取MySQL中的CDC事件。MySQL的版本最好是5.7版本之上。
[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";
在/etc/my.cnf文件中[mysqld]下寫(xiě)入以下內(nèi)容:
[mysqld]#隨機(jī)指定一個(gè)不能和其他集群中機(jī)器重名的字符串server-id=123#配置binlog日志目錄,配置后會(huì)自動(dòng)開(kāi)啟binlog日志,并寫(xiě)入該目錄log-bin=/var/lib/mysql/mysql-bin
[root@node2 ~]# service mysqld restart[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";
“CaptureChangeMySQL”主要是從MySQL數(shù)據(jù)庫(kù)捕獲CDC(Change Data Capture)事件。CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作發(fā)生時(shí)的順序輸出為單獨(dú)的FlowFile文件。
關(guān)于“CaptureChangeMySQL”處理器的“Properties”主要配置的說(shuō)明如下:
配置項(xiàng) | 默認(rèn)值 | 允許值 | 描述 |
---|---|---|---|
MySQL Hosts(MySQL 節(jié)點(diǎn)) | MySQL集群節(jié)點(diǎn)相對(duì)應(yīng)的主機(jī)名/端口項(xiàng)的列表。多個(gè)節(jié)點(diǎn)使用逗號(hào)分隔,格式為:host1:port、host2:port…,處理器將嘗試按順序連接到列表中的主機(jī)。如果一個(gè)節(jié)點(diǎn)關(guān)閉,并且群集啟用了故障轉(zhuǎn)移,那么處理器將連接到活動(dòng)節(jié)點(diǎn)。 | ||
MySQL Driver Class Name(MySQL驅(qū)動(dòng)名稱) | com.mysql.jdbc.Driver | MySQL數(shù)據(jù)庫(kù)驅(qū)動(dòng)程序類的類名。 | |
MySQL Driver Location(s)(MySQL驅(qū)動(dòng)的位置) | 包含MySQL驅(qū)動(dòng)程序包及其依賴項(xiàng)的文件/文件夾和/或url的逗號(hào)分隔列表(如果有),例如"/var/tmp/mysql-connector-java-5.1.38-bin.jar文件"。 | ||
Username(用戶名) | 訪問(wèn)MySQL集群的用戶名。 | ||
Password(密碼) | 訪問(wèn)MySQL集群的密碼。 | ||
Database/Schema Name Pattern(匹配數(shù)據(jù)庫(kù)/Schema) | 用于根據(jù)CDC事件列表匹配數(shù)據(jù)庫(kù)(或模式,具體取決于RDBMS類型)的正則表達(dá)式。正則表達(dá)式必須與存儲(chǔ)在RDBMS中的數(shù)據(jù)庫(kù)名稱匹配。如果未設(shè)置屬性,則數(shù)據(jù)庫(kù)名稱將不會(huì)用于篩選CDC事件。 | ||
Table Name Pattern(匹配表) | 用于匹配影響匹配表的CDC事件的正則表達(dá)式(regex)。regex必須與存儲(chǔ)在數(shù)據(jù)庫(kù)中的表名匹配。如果未設(shè)置屬性,則不會(huì)根據(jù)表名篩選任何事件。 | ||
Max Wait Time(最大連接等待時(shí)長(zhǎng)) | 30 seconds | 允許建立連接的最長(zhǎng)時(shí)間,零表示實(shí)際上沒(méi)有限制。 | |
Distributed Map Cache Client(分布式緩存客戶端) | 指定用于保存處理器所需的各種表、列等信息的分布式映射緩存客戶端控制器服務(wù)。如果未指定,則生成的事件將不包括列類型或名稱等信息。 | ||
Retrieve All Records(檢索所有記錄) | true | ?true?false | 指定是否獲取所有可用的CDC事件,而不考慮當(dāng)前的binlog文件名或位置。如果處理器狀態(tài)中存在binlog文件名和位置值,則忽略此屬性的值。這允許4種不同的配置:1).如果處理器State中存在binlog數(shù)據(jù),則State用來(lái)確定開(kāi)始位置,并忽略Retrieve All Records的值。(目前NiFi版本測(cè)試有問(wèn)題)2).如果處理器State中不存在binlog數(shù)據(jù),此值設(shè)置為true意味著從頭開(kāi)始讀取Binlog 數(shù)據(jù)。3).如果處理器State中不存在binlog數(shù)據(jù),并且沒(méi)有指定binlog文件名和位置,此值設(shè)置為false意味著從binlog尾部開(kāi)始讀取數(shù)據(jù)。4).如果處理器State中不存在binlog數(shù)據(jù),并指定binlog文件名和位置,此值設(shè)置為false意味著從指定binlog尾部開(kāi)始讀取數(shù)據(jù)。 |
Include Begin/Commit Events(包含開(kāi)始/提交事件) | false | ?true?false | 指定是否發(fā)出與二進(jìn)制日志中的開(kāi)始或提交事件相對(duì)應(yīng)的事件。如果下游流中需要開(kāi)始/提交事件,則設(shè)置為true,否則設(shè)置為false,這將抑制這些事件的生成并可以提高流性能。 |
Include DDL Events(標(biāo)準(zhǔn)表/列名) | false | ?true?false | 指定是否發(fā)出與數(shù)據(jù)定義語(yǔ)言(DDL)事件對(duì)應(yīng)的事件,如ALTER TABLE、TRUNCATE TABLE。如果下游流中需要DDL事件,則設(shè)置為true,否則設(shè)置為false。為false時(shí)這將抑制這些事件的生成,并可以提高流性能。 |
配置步驟如下:
監(jiān)控mysql變化需要設(shè)置“DistributedMapCacheClient”控制服務(wù),其對(duì)應(yīng)的Server中存儲(chǔ)處理器所需的各種表、列等信息,所以這里需要首先配置“DistributeMapCacheServer”控制服務(wù)。
?
?
由于這里使用“CaptureChangeMySQL”處理器監(jiān)控“MySQL”中的數(shù)據(jù),所以設(shè)置調(diào)度訪問(wèn)周期為“10s”,防止一直監(jiān)聽(tīng)MySQL binlog數(shù)據(jù),帶來(lái)性能消耗。
?
在“CaptureChangeMySQL”處理器中配置“PROPERTIES”,配置如下:
MySQL Host : 192.168.179.5:3306MySQL Driver Class Name:com.mysql.jdbc.DriverMySQL Driver Location(s):/root/test/mysql-connector-java-5.1.47.jar注意:這里需要在每臺(tái)NiFi節(jié)點(diǎn)上創(chuàng)建對(duì)應(yīng)目錄,上傳mysql驅(qū)動(dòng)包。
“PROPERTIES”配置如下:
此外,在“PROPERTIES”中還需要配置“Distributed Map Cache Client”控制服務(wù),來(lái)讀取“DistributeMapCacheServer”控制服務(wù)中的緩存數(shù)據(jù):
?
另外,這里我們只是監(jiān)控表“test2”對(duì)應(yīng)的CDC事件,這里設(shè)置匹配表名為“test2”,最終“PROPERTIES”的配置如下:
注意:以上“Table Name Pattern”這里配置對(duì)應(yīng)的Value值為:test2,也可以不配置,不配置會(huì)監(jiān)控所有MySQL表的變化對(duì)應(yīng)的binlog事件。當(dāng)后面向Hive表中插入新增和更新數(shù)據(jù)時(shí),對(duì)應(yīng)MySQL中的元數(shù)據(jù)表也會(huì)變化,也會(huì)監(jiān)控到對(duì)應(yīng)的binlog事件。為了避免后期出現(xiàn)監(jiān)控到其他表的binlog日志,這里建議配置上“test2”。
登錄mysql ,使用“mynifi”庫(kù),創(chuàng)建表“test2”。暫時(shí)設(shè)置“CaptureChangeMySQL”處理器“success”事件自動(dòng)終止并啟動(dòng),向表中插入對(duì)應(yīng)的數(shù)據(jù)查看“CaptureChangeMySQL”處理器能否正常監(jiān)控事件。
在mysql中創(chuàng)建對(duì)應(yīng)的表:
use mynifi;create table test2 (id int,name varchar(255),age int);
啟動(dòng)“CaptureChangeMySQL”處理器:
向表“test2”中插入以下數(shù)據(jù):
insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;
可以在“CaptureChangeMySQL”處理器中右鍵“View data provenance”查看捕獲到的“insert”、“update”、“delete”事件:
注意問(wèn)題:在配置好“CaptureChangeMySQL”處理器啟動(dòng)后,當(dāng)MySQL中有數(shù)據(jù)插入、修改、刪除時(shí)當(dāng)前處理器會(huì)讀取MySql binlog日志,并在當(dāng)前處理器中記錄讀取binlog的位置狀態(tài)。正常來(lái)說(shuō)這里關(guān)閉“CaptureChangeMySQL”處理器后再次啟動(dòng),會(huì)接著保存的binlog位置繼續(xù)讀取(可以參照“PROPERTIES”屬性中“Retrieve All Records”配置說(shuō)明),但是經(jīng)過(guò)測(cè)試,此NiFi版本出現(xiàn)以下錯(cuò)誤(無(wú)效的binlog位置,目測(cè)是一個(gè)版本bug錯(cuò)誤):
所以在之后的測(cè)試中,我們可以將“CaptureChangeMysql”處理器讀取binlog的狀態(tài)清空,然后再次啟動(dòng)即可,這里會(huì)重復(fù)讀取MySQL之前已經(jīng)檢測(cè)到的新增、修改、刪除數(shù)據(jù)。
清空“CaptureChangeMysql”讀取binlog狀態(tài):
“RouteOnAttribute”是根據(jù)FlowFile的屬性使用屬性表達(dá)式進(jìn)行數(shù)據(jù)路由。
關(guān)于“RouteOnAttribute”處理器的“Properties”主要配置的說(shuō)明如下:
配置項(xiàng) | 默認(rèn)值 | 描述 |
---|---|---|
Routing Strategy(路由策略) | Route to Property name | 指定在計(jì)算表達(dá)式語(yǔ)言時(shí)如何使用哪個(gè)關(guān)系。有如下幾個(gè)關(guān)系可選擇:?Route to Property nameFlowFile的副本將被路由到對(duì)應(yīng)的表達(dá)式計(jì)算結(jié)果為"true"的每個(gè)關(guān)系。?Route to "matched" if all match要求所有用戶定義的表達(dá)式求值都為"true",才認(rèn)為FlowFile是匹配的。?Route to "matched" if any matches至少有一個(gè)用戶定義的表達(dá)式求值為"true",才能認(rèn)為FlowFile是匹配的。 |
注意:該處理器允許用戶自定義屬性并指定該屬性的匹配表達(dá)式。屬性與動(dòng)態(tài)屬性指定的屬性表達(dá)式相匹配的FileFlow,映射到動(dòng)態(tài)屬性上。
配置如下:
注意:以上自定義的屬性中update、insert、delete對(duì)應(yīng)的json 表達(dá)式寫(xiě)法為:${cdc.event.type:equals("delete")},代表匹配對(duì)應(yīng)類型的FlowFile,“cdc.event.type”是上游FlowFile中的屬性,“equales”是對(duì)應(yīng)的方法,“delete”使用單引號(hào)引起,表示匹配的CDC事件。
“EvaluatejsonPath”處理器將根據(jù)上游“RouteOnAttribute”匹配的事件將內(nèi)容映射成FlowFile屬性,方便后期拼接SQL獲取數(shù)據(jù),上游匹配到的FlowFile中的數(shù)據(jù)格式為:
EvaluatejsonPath”處理器配置如下:
連接關(guān)系中,我們這里只關(guān)注“insert”和“update”的數(shù)據(jù),后期獲取對(duì)應(yīng)的屬性將插入和更新的數(shù)據(jù)插入到Hive表中,對(duì)于“delete”的數(shù)據(jù)可以路由到其他關(guān)系中,例如需要將刪除數(shù)據(jù)插入到另外的Hive表中,可以再設(shè)置個(gè)分支處理。這里我們將“delete”和“failure”的數(shù)據(jù)設(shè)置自動(dòng)終止關(guān)系。
設(shè)置“RouteOnAttribute”處理器其他匹配路由關(guān)系為自動(dòng)終止:
“ReplaceText”處理器可以獲取“EvaluatejsonPath”轉(zhuǎn)換后FlowFile中的屬性來(lái)替換原有數(shù)據(jù)組成一個(gè)“insert into ... values (... ...)”語(yǔ)句,方便后續(xù)將數(shù)據(jù)插入到Hive中?!癛eplaceText”處理器的配置如下:
在“Replacement Value”中配置“insert into ${tablename} values (${id},"${name}",${age})”
注意:
以上獲取的tablename名稱為“test2”,后面這個(gè)sql是要將數(shù)據(jù)插入到Hive中的,所以這里在Hive中也應(yīng)該創(chuàng)建“test2”的表名稱,或者將表名稱寫(xiě)成固定表,后期在Hive中創(chuàng)建對(duì)應(yīng)的表即可。
另外,需要注意${name}在插入Hive中時(shí)對(duì)應(yīng)的列為字符串,這里需要加上單引號(hào)。
配置“EvaluatjsonPath”處理器“failure”和“unmatch”路由關(guān)系為自動(dòng)終止。
訪問(wèn)Hive有兩種方式:HiveServer2和Hive Client,Hive Client需要Hive和Hadoop的jar包,配置環(huán)境。HiveServer2使得連接Hive的Client從Yarn和HDFS集群中獨(dú)立出來(lái),不需要每個(gè)幾點(diǎn)都配置Hive和Hadoop的jar包和一系列環(huán)境。
NiFi連接Hive就是使用了HiveServer2方式連接,所以這里需要配置HiveServer2。
配置HiveServer2步驟如下:
#在Hive 服務(wù)端 $HIVE_HOME/etc/hive-site.xml中配置: hive.server2.thrift.port 10000 hive.server2.thrift.bind.host 192.168.179.4
hadoop.proxyuser.root.hosts * hadoop.proxyuser.root.groups *
nohup hive --service metastore >> ./nohup.out 2>&1 &nohup hive --service hiveserver2 >> ./nohup.out 2>&1 &
[root@node3 test]# beelinebeeline> !connect jdbc:hive2://node1:10000 rootEnter password for jdbc:hive2://node1:10000: 沒(méi)有密碼直接跳過(guò)即可0: jdbc:hive2://node1:10000> show tables;+------------------------------------+| tab_name |+------------------------------------+| personinfo || test2 |+------------------------------------+
以上配置完成后,還需要將配置好的core-site.xml文件發(fā)送到各個(gè)NiFi節(jié)點(diǎn)對(duì)應(yīng)的路徑/root/test下替換原有的core-site.xml文件。之后重啟NiFi集群,各個(gè)NiFi節(jié)點(diǎn)上執(zhí)行命令:
service nifi restart
“PutHiveQL”主要執(zhí)行HiveQL的DDL/DML命令,傳入給該處理器的FlowFile內(nèi)容是要執(zhí)行的HiveQL命令。HiveQL命令可以使用“?”來(lái)指定參數(shù),這種情況下,參數(shù)必須存在于FlowFile的屬性中,命名約定為hiveql.args.N.type和hiveql.args.N.value,其中N為正整數(shù)。
關(guān)于“PutHiveQL”處理器的“Properties”主要配置的說(shuō)明如下:
配置項(xiàng) | 默認(rèn)值 | 允許值 | 描述 |
---|---|---|---|
Hive Database Connection Pooling Servic(Hive數(shù)據(jù)庫(kù)連接池服務(wù)) | Hive Controller服務(wù),用于獲取與Hive數(shù)據(jù)庫(kù)的連接。 | ||
Batch Size(批次大?。?/p> | 100 | 一批次讀取FlowFile的個(gè)數(shù)。 | |
Character Set(編碼) | UTF-8 | 指定數(shù)據(jù)的編碼格式。 | |
Statement Delimiter(語(yǔ)句分隔符) | ; | 語(yǔ)句分隔符,用于分隔多個(gè)語(yǔ)句腳本中的SQL語(yǔ)句。 | |
Rollback On Failure(失敗時(shí)回滾) | false | ?true?false | 指定如何處理錯(cuò)誤。默認(rèn)false指的是如果在處理FlowFile時(shí)發(fā)生錯(cuò)誤,則FlowFile將根據(jù)錯(cuò)誤類型路由到“failure”或“retry”關(guān)系,處理器繼續(xù)處理下一個(gè)FlowFile。相反,可以設(shè)置為true回滾當(dāng)前已處理的FlowFile,并立即停止進(jìn)一步的處理。如果設(shè)置為true啟用,失敗的FlowFiles將停留在輸入關(guān)系中并會(huì)反復(fù)處理,直到成功處理或通過(guò)其他方式將其刪除為止。可以設(shè)置足夠大的“Yield Duration”避免重試次數(shù)過(guò)多。 |
“PutHiveQL”處理器的配置如下:
?
?
點(diǎn)擊之后,配置“HiveConnectionPool”控制服務(wù):
注意以上需要配置:
“Database Connection URL” :這里是Hive的HiveServer2啟動(dòng)的節(jié)點(diǎn),也就是服務(wù)端節(jié)點(diǎn)?!癹dbc:hive2://192.168.179.4:10000”“Hive Configuration Resources”:“/root/test/hive-site.xml,/root/test/core-site.xml,/root/test/hdfs-site.xml”,這里需要將以上各個(gè)文件在NiFi集群各個(gè)節(jié)點(diǎn)對(duì)應(yīng)位置準(zhǔn)備好?!癉atabase User”:root,這里防止操作Hive對(duì)應(yīng)的HDFS時(shí)權(quán)限問(wèn)題。配置完成后,需要啟用對(duì)應(yīng)的“HiveConnectionPool”控制服務(wù):
最終配置“PROPERTIES”為:
?
設(shè)置“ReplaceText”處理器“failure”路由關(guān)系為自動(dòng)終止:
設(shè)置“PutHiveQL”處理器路由關(guān)系為自動(dòng)終止:
?
動(dòng)HDFS,啟動(dòng)Hive服務(wù)端和客戶端,創(chuàng)建表“test2”
create table test2 (id int,name string,age int )row format delimited fields terminated by "\t";
首先清空“CaptureChangeMySQL”處理器的狀態(tài),單獨(dú)啟動(dòng)“CaptureChangeMySQL”處理器,清空重新消費(fèi)的數(shù)據(jù)(以上主要就是避免此版本NiFi bug問(wèn)題),啟動(dòng)當(dāng)前案例中其他NiFi處理器。
然后向MySQL中插入以下數(shù)據(jù):
insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;
NiFi頁(yè)面:
Hive表test2中的結(jié)果:
在實(shí)際法律問(wèn)題情景中,個(gè)案情況都有所差異,為了高效解決您的問(wèn)題,保障合法權(quán)益,建議您直接向?qū)I(yè)律師說(shuō)明情況,解決您的實(shí)際問(wèn)題。 立即在線咨詢 >
公眾服務(wù)
法制網(wǎng)公眾號(hào)
快速找律師 / 免費(fèi)咨詢
查法律知識(shí) / 查看解答 / 隨時(shí)追問(wèn)
律師服務(wù)(工作日8:30-18:00 ,非工作日請(qǐng)QQ留言)
律師營(yíng)銷診斷
營(yíng)銷分析 / 回復(fù)咨詢
案件接洽 / 合作加盟
法律包,中國(guó)知名的 法律咨詢網(wǎng)站,能夠?yàn)閺V大用戶提供在線 免費(fèi)法律咨詢服務(wù)。
CopyRight@2003-2022 fazhi.net ALL Rights Reservrd 版權(quán)所有
皖I(lǐng)CP備2022009963號(hào)-41
違法和不良信息聯(lián)系郵箱:39 60 29 14 2 @qq.com