午夜激情网址,国产精品黄M.M视频,Aⅴ86干,天天狠狠综合爱

首頁 >頭條 > 正文

【熱聞】大數(shù)據(jù)Flink進(jìn)階(六):Flink入門案例

2023-03-22 05:16:24來源:騰訊云

Flink入門案例

需求:讀取本地數(shù)據(jù)文件,統(tǒng)計文件中每個單詞出現(xiàn)的次數(shù)。

一、IDEA Project創(chuàng)建及配置

本案例編寫Flink代碼選擇語言為Java和Scala,所以這里我們通過IntelliJ IDEA創(chuàng)建一個目錄,其中包括Java項(xiàng)目模塊和Scala項(xiàng)目模塊,將Flink Java api和Flink Scala api分別在不同項(xiàng)目模塊中實(shí)現(xiàn)。步驟如下:


【資料圖】

1、打開IDEA,創(chuàng)建空項(xiàng)目

2、在IntelliJ IDEA 中安裝Scala插件

使用IntelliJ IDEA開發(fā)Flink,如果使用Scala api 那么還需在IntelliJ IDEA中安裝Scala的插件,如果已經(jīng)安裝可以忽略此步驟,下圖為以安裝Scala插件。

3、打開Structure,創(chuàng)建項(xiàng)目新模塊

創(chuàng)建Java模塊:

繼續(xù)點(diǎn)擊"+",創(chuàng)建Scala模塊:

創(chuàng)建好"FlinkScalaCode"模塊后,右鍵該模塊添加Scala框架支持,并修改該模塊中的"java"src源為"scala":

在"FlinkScalaCode"模塊Maven pom.xml中引入Scala依賴包,這里使用的Scala版本為2.12.10。

  org.scala-lang  scala-library  2.12.10  org.scala-lang  scala-compiler  2.12.10  org.scala-lang  scala-reflect  2.12.10

4、Log4j日志配置

為了方便查看項(xiàng)目運(yùn)行過程中的日志,需要在兩個項(xiàng)目模塊中配置log4j.properties配置文件,并放在各自項(xiàng)目src/main/resources資源目錄下,沒有resources資源目錄需要手動創(chuàng)建并設(shè)置成資源目錄。log4j.properties配置文件內(nèi)容如下:

log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n

復(fù)制

并在兩個項(xiàng)目中的Maven pom.xml中添加對應(yīng)的log4j需要的依賴包,使代碼運(yùn)行時能正常打印結(jié)果:

  org.slf4j  slf4j-log4j12  1.7.36  org.apache.logging.log4j  log4j-to-slf4j  2.17.2

5、分別在兩個項(xiàng)目模塊中導(dǎo)入Flink Maven依賴

"FlinkJavaCode"模塊導(dǎo)入Flink Maven依賴如下:

  UTF-8  1.8  1.8  1.16.0  1.7.36  2.17.2        org.apache.flink    flink-clients    ${flink.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

"FlinkScalaCode"模塊導(dǎo)入Flink Maven依賴如下:

  UTF-8  1.8  1.8  1.16.0  1.7.31  2.17.1  2.12.10  2.12        org.apache.flink    flink-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-streaming-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-clients    ${flink.version}          org.scala-lang    scala-library    ${scala.version}        org.scala-lang    scala-compiler    ${scala.version}        org.scala-lang    scala-reflect    ${scala.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

注意:在后續(xù)實(shí)現(xiàn)WordCount需求時,F(xiàn)link Java Api只需要在Maven中導(dǎo)入"flink-clients"依賴包即可,而Flink Scala Api 需要導(dǎo)入以下三個依賴包:

flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients

主要是因?yàn)樵贔link1.15版本后,F(xiàn)link添加對opting-out(排除)Scala的支持,如果你只使用Flink的Java api,導(dǎo)入包不必包含scala后綴,如果使用Flink的Scala api,需要選擇匹配的Scala版本。

二、案例數(shù)據(jù)準(zhǔn)備

在項(xiàng)目"MyFlinkCode"中創(chuàng)建"data"目錄,在目錄中創(chuàng)建"words.txt"文件,向文件中寫入以下內(nèi)容,方便后續(xù)使用Flink編寫WordCount實(shí)現(xiàn)代碼。

hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink

三、案例實(shí)現(xiàn)

數(shù)據(jù)源分為有界和無界之分,有界數(shù)據(jù)源可以編寫批處理程序,無界數(shù)據(jù)源可以編寫流式程序。DataSet API用于批處理,DataStream API用于流式處理。

批處理使用ExecutionEnvironment和DataSet,流式處理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示數(shù)據(jù)的特殊類,DataSet處理的數(shù)據(jù)是有界的,DataStream處理的數(shù)據(jù)是無界的,這兩個類都是不可變的,一旦創(chuàng)建出來就無法添加或者刪除數(shù)據(jù)元。

1、Flink 批數(shù)據(jù)處理案例

Java版本W(wǎng)ordCount

使用Flink Java Dataset api實(shí)現(xiàn)WordCount具體代碼如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.讀取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分單詞FlatMapOperator wordsDS =        linesDS.flatMap((String lines, Collector collector) -> {    String[] arr = lines.split(" ");    for (String word : arr) {        collector.collect(word);    }}).returns(Types.STRING);//3.將單詞轉(zhuǎn)換成Tuple2 KV 類型MapOperator> kvWordsDS =        wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 進(jìn)行分組處理得到最后結(jié)果并打印kvWordsDS.groupBy(0).sum(1).print();

Scala版本W(wǎng)ordCount

使用Flink Scala Dataset api實(shí)現(xiàn)WordCount具體代碼如下:

//1.準(zhǔn)備環(huán)境,注意是Scala中對應(yīng)的Flink環(huán)境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.導(dǎo)入隱式轉(zhuǎn)換,使用Scala API 時需要隱式轉(zhuǎn)換來推斷函數(shù)操作后的類型import org.apache.flink.api.scala._//3.讀取數(shù)據(jù)文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.進(jìn)行 WordCount 統(tǒng)計并打印linesDS.flatMap(line => {  line.split(" ")})  .map((_, 1))  .groupBy(0)  .sum(1)  .print()

以上無論是Java api 或者是Scala api 輸出結(jié)果如下,顯示的最終結(jié)果是統(tǒng)計好的單詞個數(shù)。

(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)

2、Flink流式數(shù)據(jù)處理案例

Java版本W(wǎng)ordCount

使用Flink Java DataStream api實(shí)現(xiàn)WordCount具體代碼如下:

//1.創(chuàng)建流式處理環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.讀取文件數(shù)據(jù)DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分單詞,設(shè)置KV格式數(shù)據(jù)SingleOutputStreamOperator> kvWordsDS =        lines.flatMap((String line, Collector> collector) -> {    String[] words = line.split(" ");    for (String word : words) {        collector.collect(Tuple2.of(word, 1L));    }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分組統(tǒng)計獲取 WordCount 結(jié)果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式計算中需要最后執(zhí)行execute方法env.execute();
Scala版本W(wǎng)ordCount

使用Flink Scala DataStream api實(shí)現(xiàn)WordCount具體代碼如下:

//1.創(chuàng)建環(huán)境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.導(dǎo)入隱式轉(zhuǎn)換,使用Scala API 時需要隱式轉(zhuǎn)換來推斷函數(shù)操作后的類型import org.apache.flink.streaming.api.scala._//3.讀取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.進(jìn)行wordCount統(tǒng)計ds.flatMap(line=>{line.split(" ")})  .map((_,1))  .keyBy(_._1)  .sum(1)  .print()//5.最后使用execute 方法觸發(fā)執(zhí)行env.execute()

以上輸出結(jié)果開頭展示的是處理當(dāng)前數(shù)據(jù)的線程,一個Flink應(yīng)用程序執(zhí)行時默認(rèn)的線程數(shù)與當(dāng)前節(jié)點(diǎn)cpu的總線程數(shù)有關(guān)。

3、DataStream BATCH模式

下面使用Java代碼使用DataStream API 的Batch 模式來處理批WordCount代碼,方式如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設(shè)置批運(yùn)行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() {    @Override    public void flatMap(String lines, Collector> out) throws Exception {        String[] words = lines.split(" ");        for (String word : words) {            out.collect(new Tuple2<>(word, 1L));        }    }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute();

以上代碼運(yùn)行完成之后結(jié)果如下,可以看到結(jié)果與批處理結(jié)果類似,只是多了對應(yīng)的處理線程號。

3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)

此外,Stream API 中除了可以設(shè)置Batch批處理模式之外,還可以設(shè)置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式會根據(jù)數(shù)據(jù)是有界流/無界流自動決定采用BATCH/STREAMING模式來讀取數(shù)據(jù),設(shè)置方式如下:

//BATCH 設(shè)置批處理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 會根據(jù)有界流/無界流自動決定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 設(shè)置流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

除了在代碼中設(shè)置處理模式外,還可以在Flink配置文件(flink-conf.yaml)中設(shè)置execution.runtime-mode參數(shù)來指定對應(yīng)的模式,也可以在集群中提交Flink任務(wù)時指定execution.runtime-mode來指定,F(xiàn)link官方建議在提交Flink任務(wù)時指定執(zhí)行模式,這樣減少了代碼配置給Flink Application提供了更大的靈活性,提交任務(wù)指定參數(shù)如下:

$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar
責(zé)任編輯:

標(biāo)簽:

免責(zé)聲明

頭條新聞

推薦內(nèi)容

亚洲蜜臀| 色吧小说图片综合亚洲| XXXWWW在线播放| 色十八综合社区| av大全| 中文字幕视频久久| 国产素人情侣对白在线| 夜夜躁狠狠躁日日躁| 婷婷五情天欧美网| av无码主播| 精品人妻久久久| 一级二级三级免费影视| 999精品视频在线观看| 蜜臀午夜啪啪麻豆视频| 伊人久久大香线蕉av色| 看黄片.com| 日韩性爱日| 欧美色图50p| 本道无码AV| 日韩无码影院| 九九久欧美| 无码色AV一二区在线播放| 亚洲丁香五月天AV| 精品视频网站免费| 1级啊片| 亚洲第一色小说| 日本女同网站| 百度成人网| 一本a道新久| 欧美另类激情小说| 东京热 社区| 在线理论视频| 91人人动漫视频| 亚洲一二三四区| A级黄片兔费观看| 精品丰满熟女一区二区三区| 酒色1314| ww黄片| 黄色视频WWW麻豆| 免费a级毛片无码av| 精品久久网|