久久久久亚洲av色欲av,国偷自产一区二区免费视频,久久99er精品国产首页,精品国产_亚洲人成在线,久久男人av资源网站无码软件

  • 鄭州
您的位置: 法律包 > 綜合 > 詳情

環(huán)球消息!大數(shù)據(jù)NiFi(二十):實(shí)時(shí)同步MySQL數(shù)據(jù)到Hive

來(lái)源: 騰訊云 2023-02-27 15:20:21

?實(shí)時(shí)同步MySQL數(shù)據(jù)到Hive

案例:將mysql中新增的數(shù)據(jù)實(shí)時(shí)同步到Hive中。

以上案例需要用到的處理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。

首先通過(guò)“CaptureChangeMySQL”讀取MySQL中數(shù)據(jù)的變化(需要開(kāi)啟MySQL binlog日志),將Binlog中變化的數(shù)據(jù)同步到“RouteOnAttribute”處理器,通過(guò)此處理器獲取上游數(shù)據(jù)屬性,獲取對(duì)應(yīng)binlog操作類型,再將想要處理的數(shù)據(jù)路由到“EvaluateJsonPath”處理器,該處理器可以將json格式的binlog數(shù)據(jù)解析,通過(guò)自定義json 表達(dá)式獲取json數(shù)據(jù)中的屬性放入FlowFile屬性,將FlowFile通過(guò)“ReplaceText”處理器獲取上游FowFile屬性,動(dòng)態(tài)拼接sql替換所有的FlowFile內(nèi)容,將拼接好的sql組成FlowFile路由到“PutHiveQL”將數(shù)據(jù)寫(xiě)入到Hive表。


(資料圖片)

一、開(kāi)啟MySQL的binlog日志

mysql-binlog是MySQL數(shù)據(jù)庫(kù)的二進(jìn)制日志,記錄了所有的DDL和DML(除了數(shù)據(jù)查詢語(yǔ)句)語(yǔ)句信息。一般來(lái)說(shuō)開(kāi)啟二進(jìn)制日志大概會(huì)有1%的性能損耗。這里需要開(kāi)啟MySQL的binlog日志方便后期使用“CaptureChangeMySQL”處理器來(lái)獲取MySQL中的CDC事件。MySQL的版本最好是5.7版本之上。

1、登錄mysql查看MySQL是否開(kāi)啟binlog日志

[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";

2 、開(kāi)啟mysql binlog日志

在/etc/my.cnf文件中[mysqld]下寫(xiě)入以下內(nèi)容:

[mysqld]#隨機(jī)指定一個(gè)不能和其他集群中機(jī)器重名的字符串server-id=123#配置binlog日志目錄,配置后會(huì)自動(dòng)開(kāi)啟binlog日志,并寫(xiě)入該目錄log-bin=/var/lib/mysql/mysql-bin

3、重啟mysql 服務(wù),重新查看binlog日志情況

[root@node2 ~]# service mysqld restart[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";

二、???????配置“CaptureChangeMySQL”處理器

“CaptureChangeMySQL”主要是從MySQL數(shù)據(jù)庫(kù)捕獲CDC(Change Data Capture)事件。CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作發(fā)生時(shí)的順序輸出為單獨(dú)的FlowFile文件。

關(guān)于“CaptureChangeMySQL”處理器的“Properties”主要配置的說(shuō)明如下:

配置項(xiàng)

默認(rèn)值

允許值

描述

MySQL Hosts(MySQL 節(jié)點(diǎn))

MySQL集群節(jié)點(diǎn)相對(duì)應(yīng)的主機(jī)名/端口項(xiàng)的列表。多個(gè)節(jié)點(diǎn)使用逗號(hào)分隔,格式為:host1:port、host2:port…,處理器將嘗試按順序連接到列表中的主機(jī)。如果一個(gè)節(jié)點(diǎn)關(guān)閉,并且群集啟用了故障轉(zhuǎn)移,那么處理器將連接到活動(dòng)節(jié)點(diǎn)。

MySQL Driver Class Name(MySQL驅(qū)動(dòng)名稱)

com.mysql.jdbc.Driver

MySQL數(shù)據(jù)庫(kù)驅(qū)動(dòng)程序類的類名。

MySQL Driver Location(s)(MySQL驅(qū)動(dòng)的位置)

包含MySQL驅(qū)動(dòng)程序包及其依賴項(xiàng)的文件/文件夾和/或url的逗號(hào)分隔列表(如果有),例如"/var/tmp/mysql-connector-java-5.1.38-bin.jar文件"。

Username(用戶名)

訪問(wèn)MySQL集群的用戶名。

Password(密碼)

訪問(wèn)MySQL集群的密碼。

Database/Schema Name Pattern(匹配數(shù)據(jù)庫(kù)/Schema)

用于根據(jù)CDC事件列表匹配數(shù)據(jù)庫(kù)(或模式,具體取決于RDBMS類型)的正則表達(dá)式。正則表達(dá)式必須與存儲(chǔ)在RDBMS中的數(shù)據(jù)庫(kù)名稱匹配。如果未設(shè)置屬性,則數(shù)據(jù)庫(kù)名稱將不會(huì)用于篩選CDC事件。

Table Name Pattern(匹配表)

用于匹配影響匹配表的CDC事件的正則表達(dá)式(regex)。regex必須與存儲(chǔ)在數(shù)據(jù)庫(kù)中的表名匹配。如果未設(shè)置屬性,則不會(huì)根據(jù)表名篩選任何事件。

Max Wait Time(最大連接等待時(shí)長(zhǎng))

30 seconds

允許建立連接的最長(zhǎng)時(shí)間,零表示實(shí)際上沒(méi)有限制。

Distributed Map Cache Client(分布式緩存客戶端)

指定用于保存處理器所需的各種表、列等信息的分布式映射緩存客戶端控制器服務(wù)。如果未指定,則生成的事件將不包括列類型或名稱等信息。

Retrieve All Records(檢索所有記錄)

true

?true?false

指定是否獲取所有可用的CDC事件,而不考慮當(dāng)前的binlog文件名或位置。如果處理器狀態(tài)中存在binlog文件名和位置值,則忽略此屬性的值。這允許4種不同的配置:1).如果處理器State中存在binlog數(shù)據(jù),則State用來(lái)確定開(kāi)始位置,并忽略Retrieve All Records的值。(目前NiFi版本測(cè)試有問(wèn)題)2).如果處理器State中不存在binlog數(shù)據(jù),此值設(shè)置為true意味著從頭開(kāi)始讀取Binlog 數(shù)據(jù)。3).如果處理器State中不存在binlog數(shù)據(jù),并且沒(méi)有指定binlog文件名和位置,此值設(shè)置為false意味著從binlog尾部開(kāi)始讀取數(shù)據(jù)。4).如果處理器State中不存在binlog數(shù)據(jù),并指定binlog文件名和位置,此值設(shè)置為false意味著從指定binlog尾部開(kāi)始讀取數(shù)據(jù)。

Include Begin/Commit Events(包含開(kāi)始/提交事件)

false

?true?false

指定是否發(fā)出與二進(jìn)制日志中的開(kāi)始或提交事件相對(duì)應(yīng)的事件。如果下游流中需要開(kāi)始/提交事件,則設(shè)置為true,否則設(shè)置為false,這將抑制這些事件的生成并可以提高流性能。

Include DDL Events(標(biāo)準(zhǔn)表/列名)

false

?true?false

指定是否發(fā)出與數(shù)據(jù)定義語(yǔ)言(DDL)事件對(duì)應(yīng)的事件,如ALTER TABLE、TRUNCATE TABLE。如果下游流中需要DDL事件,則設(shè)置為true,否則設(shè)置為false。為false時(shí)這將抑制這些事件的生成,并可以提高流性能。

配置步驟如下:

1、創(chuàng)建“CaptureChangeMySQL”處理器

2、配置“DistributeMapCacheServer”控制服務(wù)

監(jiān)控mysql變化需要設(shè)置“DistributedMapCacheClient”控制服務(wù),其對(duì)應(yīng)的Server中存儲(chǔ)處理器所需的各種表、列等信息,所以這里需要首先配置“DistributeMapCacheServer”控制服務(wù)。

?

?

3、配置“SCHEDULING”

由于這里使用“CaptureChangeMySQL”處理器監(jiān)控“MySQL”中的數(shù)據(jù),所以設(shè)置調(diào)度訪問(wèn)周期為“10s”,防止一直監(jiān)聽(tīng)MySQL binlog數(shù)據(jù),帶來(lái)性能消耗。

?

4、配置“PROPERTIES”

在“CaptureChangeMySQL”處理器中配置“PROPERTIES”,配置如下:

MySQL Host : 192.168.179.5:3306MySQL Driver Class Name:com.mysql.jdbc.DriverMySQL Driver Location(s):/root/test/mysql-connector-java-5.1.47.jar

注意:這里需要在每臺(tái)NiFi節(jié)點(diǎn)上創(chuàng)建對(duì)應(yīng)目錄,上傳mysql驅(qū)動(dòng)包。

“PROPERTIES”配置如下:

此外,在“PROPERTIES”中還需要配置“Distributed Map Cache Client”控制服務(wù),來(lái)讀取“DistributeMapCacheServer”控制服務(wù)中的緩存數(shù)據(jù):

?

另外,這里我們只是監(jiān)控表“test2”對(duì)應(yīng)的CDC事件,這里設(shè)置匹配表名為“test2”,最終“PROPERTIES”的配置如下:

注意:以上“Table Name Pattern”這里配置對(duì)應(yīng)的Value值為:test2,也可以不配置,不配置會(huì)監(jiān)控所有MySQL表的變化對(duì)應(yīng)的binlog事件。當(dāng)后面向Hive表中插入新增和更新數(shù)據(jù)時(shí),對(duì)應(yīng)MySQL中的元數(shù)據(jù)表也會(huì)變化,也會(huì)監(jiān)控到對(duì)應(yīng)的binlog事件。為了避免后期出現(xiàn)監(jiān)控到其他表的binlog日志,這里建議配置上“test2”。

5、啟動(dòng)MySQL,創(chuàng)建表“test2”測(cè)試“CaptureChangeMySQL”處理器

登錄mysql ,使用“mynifi”庫(kù),創(chuàng)建表“test2”。暫時(shí)設(shè)置“CaptureChangeMySQL”處理器“success”事件自動(dòng)終止并啟動(dòng),向表中插入對(duì)應(yīng)的數(shù)據(jù)查看“CaptureChangeMySQL”處理器能否正常監(jiān)控事件。

在mysql中創(chuàng)建對(duì)應(yīng)的表:

use mynifi;create table test2 (id int,name varchar(255),age int);

啟動(dòng)“CaptureChangeMySQL”處理器:

向表“test2”中插入以下數(shù)據(jù):

insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;

可以在“CaptureChangeMySQL”處理器中右鍵“View data provenance”查看捕獲到的“insert”、“update”、“delete”事件:

注意問(wèn)題:在配置好“CaptureChangeMySQL”處理器啟動(dòng)后,當(dāng)MySQL中有數(shù)據(jù)插入、修改、刪除時(shí)當(dāng)前處理器會(huì)讀取MySql binlog日志,并在當(dāng)前處理器中記錄讀取binlog的位置狀態(tài)。正常來(lái)說(shuō)這里關(guān)閉“CaptureChangeMySQL”處理器后再次啟動(dòng),會(huì)接著保存的binlog位置繼續(xù)讀取(可以參照“PROPERTIES”屬性中“Retrieve All Records”配置說(shuō)明),但是經(jīng)過(guò)測(cè)試,此NiFi版本出現(xiàn)以下錯(cuò)誤(無(wú)效的binlog位置,目測(cè)是一個(gè)版本bug錯(cuò)誤):

所以在之后的測(cè)試中,我們可以將“CaptureChangeMysql”處理器讀取binlog的狀態(tài)清空,然后再次啟動(dòng)即可,這里會(huì)重復(fù)讀取MySQL之前已經(jīng)檢測(cè)到的新增、修改、刪除數(shù)據(jù)。

清空“CaptureChangeMysql”讀取binlog狀態(tài):

三、??????????????配置“RouteOnAttribute”處理器

“RouteOnAttribute”是根據(jù)FlowFile的屬性使用屬性表達(dá)式進(jìn)行數(shù)據(jù)路由。

關(guān)于“RouteOnAttribute”處理器的“Properties”主要配置的說(shuō)明如下:

配置項(xiàng)

默認(rèn)值

描述

Routing Strategy(路由策略)

Route to Property name

指定在計(jì)算表達(dá)式語(yǔ)言時(shí)如何使用哪個(gè)關(guān)系。有如下幾個(gè)關(guān)系可選擇:?Route to Property nameFlowFile的副本將被路由到對(duì)應(yīng)的表達(dá)式計(jì)算結(jié)果為"true"的每個(gè)關(guān)系。?Route to "matched" if all match要求所有用戶定義的表達(dá)式求值都為"true",才認(rèn)為FlowFile是匹配的。?Route to "matched" if any matches至少有一個(gè)用戶定義的表達(dá)式求值為"true",才能認(rèn)為FlowFile是匹配的。

注意:該處理器允許用戶自定義屬性并指定該屬性的匹配表達(dá)式。屬性與動(dòng)態(tài)屬性指定的屬性表達(dá)式相匹配的FileFlow,映射到動(dòng)態(tài)屬性上。

配置如下:

1、創(chuàng)建“RouteOnAttribute”處理器

2、配置“PROPERTIES”自定義屬性

注意:以上自定義的屬性中update、insert、delete對(duì)應(yīng)的json 表達(dá)式寫(xiě)法為:${cdc.event.type:equals("delete")},代表匹配對(duì)應(yīng)類型的FlowFile,“cdc.event.type”是上游FlowFile中的屬性,“equales”是對(duì)應(yīng)的方法,“delete”使用單引號(hào)引起,表示匹配的CDC事件。

3、連接“CaptureChangeMySQL”處理器與“RouteOnAttribute”處理器

四、配置“EvaluatejsonPath”處理器

“EvaluatejsonPath”處理器將根據(jù)上游“RouteOnAttribute”匹配的事件將內(nèi)容映射成FlowFile屬性,方便后期拼接SQL獲取數(shù)據(jù),上游匹配到的FlowFile中的數(shù)據(jù)格式為:

EvaluatejsonPath”處理器配置如下:

1、配置“EvaluatejsonPath”的“PROPERTIES”屬性

2、連接“RouteOnAttribute”處理器和“EvaluatejsonPath”處理器

連接關(guān)系中,我們這里只關(guān)注“insert”和“update”的數(shù)據(jù),后期獲取對(duì)應(yīng)的屬性將插入和更新的數(shù)據(jù)插入到Hive表中,對(duì)于“delete”的數(shù)據(jù)可以路由到其他關(guān)系中,例如需要將刪除數(shù)據(jù)插入到另外的Hive表中,可以再設(shè)置個(gè)分支處理。這里我們將“delete”和“failure”的數(shù)據(jù)設(shè)置自動(dòng)終止關(guān)系。

設(shè)置“RouteOnAttribute”處理器其他匹配路由關(guān)系為自動(dòng)終止:

五、??????????????配置“ReplaceText”處理器

“ReplaceText”處理器可以獲取“EvaluatejsonPath”轉(zhuǎn)換后FlowFile中的屬性來(lái)替換原有數(shù)據(jù)組成一個(gè)“insert into ... values (... ...)”語(yǔ)句,方便后續(xù)將數(shù)據(jù)插入到Hive中?!癛eplaceText”處理器的配置如下:

1、配置“RelaceText”處理器“PROPERTIES”屬性

在“Replacement Value”中配置“insert into ${tablename} values (${id},"${name}",${age})”

注意:

以上獲取的tablename名稱為“test2”,后面這個(gè)sql是要將數(shù)據(jù)插入到Hive中的,所以這里在Hive中也應(yīng)該創(chuàng)建“test2”的表名稱,或者將表名稱寫(xiě)成固定表,后期在Hive中創(chuàng)建對(duì)應(yīng)的表即可。

另外,需要注意${name}在插入Hive中時(shí)對(duì)應(yīng)的列為字符串,這里需要加上單引號(hào)。

2、連接“EvaluatejsonPath”處理器與“ReplaceText”處理器

配置“EvaluatjsonPath”處理器“failure”和“unmatch”路由關(guān)系為自動(dòng)終止。

六、??????????????配置Hive 支持HiveServer2

訪問(wèn)Hive有兩種方式:HiveServer2和Hive Client,Hive Client需要Hive和Hadoop的jar包,配置環(huán)境。HiveServer2使得連接Hive的Client從Yarn和HDFS集群中獨(dú)立出來(lái),不需要每個(gè)幾點(diǎn)都配置Hive和Hadoop的jar包和一系列環(huán)境。

NiFi連接Hive就是使用了HiveServer2方式連接,所以這里需要配置HiveServer2。

配置HiveServer2步驟如下:

1、在Hive服務(wù)端配置hive-site.xml

#在Hive 服務(wù)端 $HIVE_HOME/etc/hive-site.xml中配置: hive.server2.thrift.port 10000hive.server2.thrift.bind.host192.168.179.4

2、在每臺(tái)Hadoop 節(jié)點(diǎn)配置core-site.xml

     hadoop.proxyuser.root.hosts     *       hadoop.proxyuser.root.groups        * 

3、重啟HDFS ,Hive ,在Hive服務(wù)端啟動(dòng)Metastore和HiveServer2服務(wù)

nohup hive --service metastore >> ./nohup.out 2>&1 &nohup hive --service hiveserver2 >> ./nohup.out 2>&1 &

4、在客戶端通過(guò)beeline連接Hive

[root@node3 test]# beelinebeeline> !connect jdbc:hive2://node1:10000 rootEnter password for jdbc:hive2://node1:10000: 沒(méi)有密碼直接跳過(guò)即可0: jdbc:hive2://node1:10000> show tables;+------------------------------------+|              tab_name              |+------------------------------------+| personinfo                         || test2                              |+------------------------------------+

以上配置完成后,還需要將配置好的core-site.xml文件發(fā)送到各個(gè)NiFi節(jié)點(diǎn)對(duì)應(yīng)的路徑/root/test下替換原有的core-site.xml文件。之后重啟NiFi集群,各個(gè)NiFi節(jié)點(diǎn)上執(zhí)行命令:

service nifi restart

七、配置“PutHiveQL”處理器

“PutHiveQL”主要執(zhí)行HiveQL的DDL/DML命令,傳入給該處理器的FlowFile內(nèi)容是要執(zhí)行的HiveQL命令。HiveQL命令可以使用“?”來(lái)指定參數(shù),這種情況下,參數(shù)必須存在于FlowFile的屬性中,命名約定為hiveql.args.N.type和hiveql.args.N.value,其中N為正整數(shù)。

關(guān)于“PutHiveQL”處理器的“Properties”主要配置的說(shuō)明如下:

配置項(xiàng)

默認(rèn)值

允許值

描述

Hive Database Connection Pooling Servic(Hive數(shù)據(jù)庫(kù)連接池服務(wù))

Hive Controller服務(wù),用于獲取與Hive數(shù)據(jù)庫(kù)的連接。

Batch Size(批次大?。?/p>

100

一批次讀取FlowFile的個(gè)數(shù)。

Character Set(編碼)

UTF-8

指定數(shù)據(jù)的編碼格式。

Statement Delimiter(語(yǔ)句分隔符)

;

語(yǔ)句分隔符,用于分隔多個(gè)語(yǔ)句腳本中的SQL語(yǔ)句。

Rollback On Failure(失敗時(shí)回滾)

false

?true?false

指定如何處理錯(cuò)誤。默認(rèn)false指的是如果在處理FlowFile時(shí)發(fā)生錯(cuò)誤,則FlowFile將根據(jù)錯(cuò)誤類型路由到“failure”或“retry”關(guān)系,處理器繼續(xù)處理下一個(gè)FlowFile。相反,可以設(shè)置為true回滾當(dāng)前已處理的FlowFile,并立即停止進(jìn)一步的處理。如果設(shè)置為true啟用,失敗的FlowFiles將停留在輸入關(guān)系中并會(huì)反復(fù)處理,直到成功處理或通過(guò)其他方式將其刪除為止。可以設(shè)置足夠大的“Yield Duration”避免重試次數(shù)過(guò)多。

“PutHiveQL”處理器的配置如下:

1、創(chuàng)建“PutHiveQL”處理器

?

2、 配置“PROPERTIES”

?

點(diǎn)擊之后,配置“HiveConnectionPool”控制服務(wù):

注意以上需要配置:

“Database Connection URL” :這里是Hive的HiveServer2啟動(dòng)的節(jié)點(diǎn),也就是服務(wù)端節(jié)點(diǎn)?!癹dbc:hive2://192.168.179.4:10000”“Hive Configuration Resources”:“/root/test/hive-site.xml,/root/test/core-site.xml,/root/test/hdfs-site.xml”,這里需要將以上各個(gè)文件在NiFi集群各個(gè)節(jié)點(diǎn)對(duì)應(yīng)位置準(zhǔn)備好?!癉atabase User”:root,這里防止操作Hive對(duì)應(yīng)的HDFS時(shí)權(quán)限問(wèn)題。

配置完成后,需要啟用對(duì)應(yīng)的“HiveConnectionPool”控制服務(wù):

最終配置“PROPERTIES”為:

3、連接“ReplaceText”處理器與“PutHiveQL”處理器并設(shè)置關(guān)系

?

設(shè)置“ReplaceText”處理器“failure”路由關(guān)系為自動(dòng)終止:

設(shè)置“PutHiveQL”處理器路由關(guān)系為自動(dòng)終止:

?

八、??????????????運(yùn)行測(cè)試

1、在Hive中創(chuàng)建表“test2”

動(dòng)HDFS,啟動(dòng)Hive服務(wù)端和客戶端,創(chuàng)建表“test2”

create table test2 (id int,name string,age int )row format delimited fields terminated by "\t";

2、啟動(dòng)NiFi處理數(shù)據(jù)流程,向MySQL中寫(xiě)入數(shù)據(jù),查看Hive中表數(shù)據(jù)

首先清空“CaptureChangeMySQL”處理器的狀態(tài),單獨(dú)啟動(dòng)“CaptureChangeMySQL”處理器,清空重新消費(fèi)的數(shù)據(jù)(以上主要就是避免此版本NiFi bug問(wèn)題),啟動(dòng)當(dāng)前案例中其他NiFi處理器。

然后向MySQL中插入以下數(shù)據(jù):

insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;

NiFi頁(yè)面:

Hive表test2中的結(jié)果:

標(biāo)簽: Hive 云數(shù)據(jù)庫(kù) Server Java
溫馨提示:

在實(shí)際法律問(wèn)題情景中,個(gè)案情況都有所差異,為了高效解決您的問(wèn)題,保障合法權(quán)益,建議您直接向?qū)I(yè)律師說(shuō)明情況,解決您的實(shí)際問(wèn)題。 立即在線咨詢 >

相關(guān)知識(shí)推薦
操作
分享
15037178970

公眾服務(wù)

法制網(wǎng)公眾號(hào)

快速找律師 / 免費(fèi)咨詢

查法律知識(shí) / 查看解答 / 隨時(shí)追問(wèn)

律師服務(wù)(工作日8:30-18:00 ,非工作日請(qǐng)QQ留言)

律師加盟

律師營(yíng)銷服務(wù)

在線客服:

加盟熱線:

律師營(yíng)銷診斷

營(yíng)銷分析 / 回復(fù)咨詢

案件接洽 / 合作加盟

法律包,中國(guó)知名的 法律咨詢網(wǎng)站,能夠?yàn)閺V大用戶提供在線 免費(fèi)法律咨詢服務(wù)。
CopyRight@2003-2022 fazhi.net ALL Rights Reservrd 版權(quán)所有
皖I(lǐng)CP備2022009963號(hào)-41
違法和不良信息聯(lián)系郵箱:39 60 29 14 2 @qq.com