日本搞逼视频_黄色一级片免费在线观看_色99久久_性明星video另类hd_欧美77_综合在线视频

國內(nèi)最全I(xiàn)T社區(qū)平臺 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > php框架 > 框架設(shè)計(jì) > Flink流計(jì)算編程--Kafka+Flink整合demo

Flink流計(jì)算編程--Kafka+Flink整合demo

來源:程序員人生   發(fā)布時間:2016-06-30 08:24:47 閱讀次數(shù):10872次

1、簡介

1.1、Kafka Consumer提供了2種API:high level與low level(SimpleConsumer)。
(1)high level consumer的API較為簡單,不需要關(guān)心offset、partition、broker等信息,kafka會自動讀取zookeeper中該consumer group的last offset。
(2)low level consumer也叫SimpleConsumer,這個接口非常復(fù)雜,需要自己寫代碼去實(shí)現(xiàn)對offset、partition、broker和broker的切換,能不用就不用,那什么時候必須用?

1、Read a message multiple times
2、Consume only a subset of the partitions in a topic in a process
3、Manage transactions to make sure a message is processed once and only once

這里寫圖片描述

2、Flink的開發(fā)準(zhǔn)備

Flink提供了high level的API來消費(fèi)kafka的數(shù)據(jù):flink-connector-kafka-0.8_2.10。注意,這里的0.8代表的是kafka的版本,你可以通過maven來導(dǎo)入kafka的依賴,具體以下:
這里寫圖片描述

例如你的kafka安裝版本是“kafka_2.10-0.8.2.1”,即此版本是由scala2.10編寫,kafka的本身版本是0.8.2.1.那此時你需要添加以下的內(nèi)容到maven的pom.xml文件中:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>${flink.version}</version> </dependency>

注意:
$${flink.version}是個變量,自己調(diào)劑下代碼,例如可以直接寫1.0.0。我的項(xiàng)目里采取的是添加了properties來控制${flink.version}:

<properties> <project.build.sourceEncoding>UTF⑻</project.build.sourceEncoding> <flink.version>1.0.0</flink.version> </properties>

3、集群環(huán)境準(zhǔn)備

這里主要是介紹下Flink集群與kafka集群的搭建。
基礎(chǔ)的軟件安裝包括JDK、scala、hadoop、zookeeper、kafka和flink就不介紹了,直接看下flink的集群配置和kafka的集群配置。
zookeeper–3.4.6
hadoop–2.6.0
kafka–2.10-0.8.2.1
flink–1.0.3

3.1、Flink集群配置(standalone且沒有用zookeeper的HA)

3.1.1、環(huán)境變量
添加FLINK_HOME和path的內(nèi)容:

export FLINK_HOME=/usr/local/flink/flink-1.0.3 export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${ZOOKEEPER_HOME}/bin:${KAFKA_HOME}/bin:${FLINK_HOME}/bin:$PATH export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

3.1.2、修改conf/flink-conf.yaml
這里寫圖片描述

這幾近是最簡單的配置方式了,主要注意要修改jobmanager.rpc.address為集群中jobManager的IP或hostname。檢查點(diǎn)和HA的參數(shù)都沒有配置。

3.1.3、slaves文件
這里寫圖片描述

這個文件中寄存的信息是taskmanager的hostname。

3.1.4、復(fù)制flink目錄和.bashrc文件到集群中其他的機(jī)器,并使bashrc生效

root@master:/usr/local/flink# scp -r flink⑴.0.3/ root@worker1:/usr/local/flink/ root@master:/usr/local/flink# scp -r flink⑴.0.3/ root@worker2:/usr/local/flink/ root@master:/usr/local/flink# scp ~/.bashrc root@worker1:~/.bashrc root@master:/usr/local/flink# scp ~/.bashrc root@worker2:~/.bashrc root@worker1:~# source ~/.bashrc root@worker2:~# source ~/.bashrc

3.2、kafka集群配置

3.2.1、環(huán)境變量
省略

3.2.2、配置config/zookeeper.properties
由于kafka集群依賴于zookeeper集群,所以kafka提供了通過kafka去啟動zookeeper集群的功能,固然也能夠手動去啟動zookeeper的集群而不通過kafka去啟動zookeeper的集群。
這里寫圖片描述
注意這里的dataDir最好不要指定/tmp目錄下,由于機(jī)器重啟會刪除此目錄下的文件。且指定的新路徑必須存在。

3.2.3、配置config/server.properties
這個文件是啟動kafka集群需要指定的配置文件,注意2點(diǎn):

# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The port the socket server listens on #port=9092 listeners=PLAINTEXT://:9092

broker.id在kafka集群的每臺機(jī)器上都不1樣,我這里3臺集群分別是0、1、2.

############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,worker1:2181,worker2:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000

zookeeper.connect要配置kafka集群所依賴的zookeeper集群的信息,hostname:port。

3.2.4、復(fù)制kafka路徑及環(huán)境變量到其他kafka集群的機(jī)器,并修改server.properties中的broker_id.
復(fù)制進(jìn)程省略

3.3、啟動kafka集群+Flink集群

3.3.1、首先啟動zookeeper集群(3臺zookeeper機(jī)器都要啟動):

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start

驗(yàn)證zookeeper集群:
進(jìn)程是不是啟動;zookeeper集群中是不是可以正常顯示leader和follower。

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# jps 3295 QuorumPeerMain root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: follower root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: follower root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: leader root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin#

3.3.2、啟動kafka集群(3臺都要啟動)

root@master:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties & root@worker1:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties & root@worker2:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties &

驗(yàn)證:
進(jìn)程;日志

3512 Kafka

3.3.3、啟動hdfs(master上啟動便可)

root@master:/usr/local/hadoop/hadoop-2.6.0/sbin# start-dfs.sh

驗(yàn)證:進(jìn)程及webUI

root@master:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3798 NameNode 4007 SecondaryNameNode root@worker1:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3843 DataNode root@worker2:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3802 DataNode

webUI:50070,默許可配置
這里寫圖片描述

3.3.4、啟動Flink集群(master便可)

root@master:/usr/local/flink/flink-1.0.3/bin# start-cluster.sh

驗(yàn)證:進(jìn)程及WebUI

root@master:/usr/local/flink/flink-1.0.3/bin# jps 4411 JobManager root@worker1:/usr/local/flink/flink-1.0.3/bin# jps 4151 TaskManager root@worker2:/usr/local/flink/flink-1.0.3/bin# jps 4110 TaskManager

WebUI:8081(默許,可配置)
這里寫圖片描述

4、編寫Flink程序,實(shí)現(xiàn)consume kafka的數(shù)據(jù)(demo)

4.1、代碼
這里就是簡單的實(shí)現(xiàn)接收kafka的數(shù)據(jù),要指定zookeeper和kafka的集群配置,并指定topic的名字。
最后將consume的數(shù)據(jù)直接打印出來。

import java.util.Properties import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ /** * 用Flink消費(fèi)kafka */ object ReadingFromKafka { private val ZOOKEEPER_HOST = "master:2181,worker1:2181,worker2:2181" private val KAFKA_BROKER = "master:9092,worker1:9092,worker2:9092" private val TRANSACTION_GROUP = "transaction" def main(args : Array[String]){ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(1000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // configure Kafka consumer val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) //topicd的名字是new,schema默許使用SimpleStringSchema()便可 val transaction = env .addSource( new FlinkKafkaConsumer08[String]("new", new SimpleStringSchema(), kafkaProps) ) transaction.print() env.execute() } }

4.2、打包:

mvn clean package

這里寫圖片描述
看到成功標(biāo)志,否則會提示error的地方。

4.3、發(fā)布到集群

root@master:/usr/local/flink/flink-1.0.3/bin# flink run -c wikiedits.ReadingFromKafka /root/Documents/wiki-edits-0.1.jar

驗(yàn)證:進(jìn)程及WebUI

root@master:/usr/local/flink/flink-1.0.3/bin# jps 6080 CliFrontend

這里寫圖片描述

5、kafka produce數(shù)據(jù),驗(yàn)證flink是不是正常消費(fèi)

5.1、通過kafka console produce數(shù)據(jù)
之前已在kafka中創(chuàng)建了名字為new的topic,因此直接produce new的數(shù)據(jù):

root@master:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 --topic new

生產(chǎn)數(shù)據(jù):
這里寫圖片描述

5.2、查看flink的標(biāo)準(zhǔn)輸出中,是不是已消費(fèi)了這部份數(shù)據(jù):

root@worker2:/usr/local/flink/flink-1.0.3/log# ls -l | grep out -rw-r--r-- 1 root root 254 629 09:37 flink-root-taskmanager-0-worker2.out root@worker2:/usr/local/flink/flink-1.0.3/log#

我們在worker2的log中發(fā)現(xiàn)已有了數(shù)據(jù),下面看看內(nèi)容:
這里寫圖片描述

OK,沒問題,flink正常消費(fèi)了數(shù)據(jù)。

6、總結(jié)

kafka作為1個消息系統(tǒng),本身具有高吞吐、低延時、持久化、散布式等特點(diǎn),其topic可以指定replication和partitions,使得可靠性和性能都可以很好的保證。
Kafka+Flink的架構(gòu),可使flink只需關(guān)注計(jì)算本身。

參考
http://www.tuicool.com/articles/fI7J3m
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html
http://kafka.apache.org/082/documentation.html
http://dataartisans.github.io/flink-training/exercises/toFromKafka.html
http://data-artisans.com/kafka-flink-a-practical-how-to/

生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 日韩在线视频免费观看 | 久久se精品一区精品二区 | 亚洲经典三级 | 国产一区二区三区网站 | 久久久久久国产精品免费免费 | 久久美女视频 | 久久网亚洲 | 欧美成人精品一区 | 免费国产一区二区 | 日韩成人美女视频 | 能看av的网站 | 91系列在线观看 | 激情久久av | 日韩精品一区二区三区四区视频 | 欧美日韩一区二区三区不卡 | 91国偷自产一区二区使用方法 | 黄色观看 | 偷拍自拍在线观看 | 国产日产欧美一区二区 | 国产嫩草一区二区三区在线观看 | 一区二区精品在线 | 日本一区2区 | 国产精品一区二区三区在线 | 91久久精品人人做人人爽综合 | 久久精品91| 国产一区二区三区高清 | 国内在线一区 | 国产区一区 | 91偷拍视频 | 国产激情视频 | 亚洲一区国产精品 | 国产高清视频 | 高清国产一区 | 欧美日本韩国一区二区三区 | 九九九久久国产免费 | 欧美成人精品一区二区 | 国产日韩欧美一区二区 | 久久激情av | 国产二区三区在线播放 | 久热中文 | 国产精一区 |