Alex 的 Hadoop 菜鳥教程: 第20課 工作流引擎 Oozie
來源:程序員人生 發(fā)布時(shí)間:2015-03-30 08:35:05 閱讀次數(shù):6447次
本文基于 Centos6.x + CDH5.x
Oozie是甚么
簡單的說Oozie是1個(gè)工作流引擎。只不過它是1個(gè)基于Hadoop的工作流引擎,在實(shí)際工作中,遇到對(duì)數(shù)據(jù)進(jìn)行1連串的操作的時(shí)候很實(shí)用,不需要自己寫1些處理代碼了,只需要定義好各個(gè)action,然后把他們串在1個(gè)工作流里面就能夠自動(dòng)履行了。對(duì)大數(shù)據(jù)的分析工作非常有用
安裝Oozie
Oozie分為服務(wù)端和客戶端,我現(xiàn)在選擇host1作為服務(wù)端,host2作為客戶端。
所以在host1上運(yùn)行
yum install oozie
在host2上運(yùn)行
yum install oozie-client
配置Oozie
配置Oozie使用的MapReduce版本,MapReduce版本有兩個(gè)1個(gè)是 MRv1 和 YARN。由于我們選擇的是YARN,而且我為了方便上手暫時(shí)不用SSL,所以切換成不帶SSL并且使用YARN
alternatives --set oozie-tomcat-conf /etc/oozie/tomcat-conf.http
$ mysql -u root -p
Enter password: ******
mysql> create database oozie;
Query OK, 1 row affected (0.03 sec)
mysql> grant all privileges on oozie.* to 'oozie'@'localhost' identified by 'oozie';
Query OK, 0 rows affected (0.03 sec)
mysql> grant all privileges on oozie.* to 'oozie'@'%' identified by 'oozie';
Query OK, 0 rows affected (0.03 sec)
編輯 oozie-site.xml 配置mysql的連接屬性
<property>
<name>oozie.service.JPAService.jdbc.driver</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.url</name>
<value>jdbc:mysql://localhost:3306/oozie</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.username</name>
<value>oozie</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.password</name>
<value>oozie</value>
</property>
把mysql的jdbc驅(qū)動(dòng)做1個(gè)軟鏈到 /var/lib/oozie/
$ sudo yum install mysql-connector-java
$ ln -s /usr/share/java/mysql-connector-java.jar /var/lib/oozie/mysql-connector-java.jar
第1行,如果你已裝過 mysql-connector-java 可以跳過這步
創(chuàng)建oozie需要的表結(jié)構(gòu)
$ sudo -u oozie /usr/lib/oozie/bin/ooziedb.sh create -run
打開Web控制臺(tái)
Step1
Oozie使用的是ExtJs,所以得先下載1個(gè)ext http://archive.cloudera.com/gplextras/misc/ext⑵.2.zip
Step2
解壓開 ext⑵.2.zip 并拷貝到 /var/lib/oozie.
# unzip ext⑵.2.zip
# mv ext⑵.2 /var/lib/oozie/
在HDFS上安裝Oozie庫
為oozie分配hdfs的權(quán)限,編輯所有機(jī)器上的 /etc/hadoop/conf/core-site.xml ,增加以下配置
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
并重啟hadoop的service(namenode 和 datanode 就好了)
拷貝Oozie的Jars到HDFS,讓DistCp, Pig, Hive, and Sqoop 可以調(diào)用
$ sudo -u hdfs hadoop fs -mkdir /user/oozie
$ sudo -u hdfs hadoop fs -chown oozie:oozie /user/oozie
$ sudo oozie-setup sharelib create -fs hdfs://mycluster/user/oozie -locallib /usr/lib/oozie/oozie-sharelib-yarn.tar.gz
這里的mycluster請(qǐng)自行替換成你的clusterId
啟動(dòng)Oozie
$ sudo service oozie start
使用Oozie
連接Oozie的方法
連接Oozie有3個(gè)方法
用客戶端連接
由于我的client端裝在了host2上,所以在host2上運(yùn)行
$ oozie admin -oozie http://host1:11000/oozie -status
System mode: NORMAL
為了方便,不用每次都輸入oozie-server所在
服務(wù)器,我們可以設(shè)置環(huán)境變量
$ export OOZIE_URL=http://host1:11000/oozie
$ oozie admin -version
Oozie server build version: 4.0.0-cdh5.0.0
用閱讀器訪問
打開閱讀器訪問 http://host1:11000/oozie
用HUE訪問
上節(jié)課我們講了HUE的使用,現(xiàn)在可以在hue里面配置上Oozie的參數(shù)。用HUE來使用Oozie。
編輯 /etc/hue/conf/hue.init 找到 oozie_url 這個(gè)屬性,修改成真實(shí)地址
[liboozie]
# The URL where the Oozie service runs on. This is required in order for
# users to submit jobs. Empty value disables the config check.
oozie_url=http://host1:11000/oozie
重啟hue服務(wù)
訪問hue中的oozie模塊
點(diǎn)擊 Workflow 可以看到工作流界面
Oozie的3個(gè)概念
Oozie有3個(gè)主要概念
- workflow 工作流
- coordinator 多個(gè)workflow可以組成1個(gè)coordinator,可以把前幾個(gè)workflow的輸出作為后1個(gè)workflow的輸入,也能夠定義workflow的觸發(fā)條件,來做定時(shí)觸發(fā)
- bundle 是對(duì)1堆coordinator的抽象
以下這幅圖解釋了Oozie組件之間的關(guān)系
hPDL
oozie采取1種叫 hPDL的xml規(guī)范來定義工作流。
這是1個(gè)wordcount版本的hPDL的xml例子
<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.1">
<start to='wordcount'/>
<action name='wordcount'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.mapper.class</name>
<value>org.myorg.WordCount.Map</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.myorg.WordCount.Reduce</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to='end'/>
<error to='end'/>
</action>
<kill name='kill'>
<message>Something went wrong: ${wf:errorCode('wordcount')}</message>
</kill/>
<end name='end'/>
</workflow-app>
這個(gè)例子可以用以下這幅圖表示
1個(gè)oozie job的組成
1個(gè)oozie 的 job 1般由以下文件組成
- job.properties 記錄了job的屬性
- workflow.xml 使用hPDL 定義任務(wù)的流程和分支
- class 文件,用來履行具體的任務(wù)
任務(wù)啟動(dòng)的命令1般長這模樣
$ oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run
可以看到 任務(wù)開始是通過調(diào)用 oozie job 命令并傳入oozie
服務(wù)器地址和 job.properties 的路徑開始。job.properties 是1個(gè)任務(wù)的履行入口
做個(gè)MapReduce例子
這里使用官方提供的例子。
Step1
在 host1 上下載oozie包
wget http://apache.fayea.com/oozie/4.1.0/oozie⑷.1.0.tar.gz
解壓開,里面有1個(gè) examples文件夾,我們將這個(gè)文件夾拷貝到別的地方,并改名為
oozie-examples 進(jìn)入這個(gè)文件夾,然后修改pom.xml,在plugins里面增加1個(gè)plugin
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.5</version>
<configuration>
<skipTests>false</skipTests>
<testFailureIgnore>true</testFailureIgnore>
<forkMode>once</forkMode>
</configuration>
</plugin>
然后運(yùn)行 mvn package 可以看到 target 文件夾下有 oozie-examples⑷.1.0.jar
Step2
編輯 oozie-examples/src/main/apps/map-reduce/job.properties
修改 namenode為hdfs 的namenode地址,由于我們是搭成ha模式,所以寫成 hdfs://mycluster 。修改 jobTracker為 resoucemanager 所在的地址,這邊為 host1:8032
改完后的 job.properties 長這樣
nameNode=hdfs://mycluster
jobTracker=host1:8032
queueName=default
examplesRoot=examples
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
outputDir=map-reduce
這里的 user.name 就是你運(yùn)行oozie的linux 用戶名,我用的是root,所以最后的路徑會(huì)變成 hdfs://mycluster/user/root/examples/apps/map-reduce
Step3
根據(jù)上面配置的路徑,我們?cè)趆dfs上先建立出 /user/root/examples/apps/map-reduce/ 目錄
hdfs dfs -mkdir -p /user/root/examples/apps/map-reduce
然后把 src/main/apps/map-reduce/workflow.xml 傳到 /user/root/examples/apps/map-reduce 下面
hdfs dfs -put oozie-examples/src/main/apps/map-reduce/workflow.xml /user/root/examples/apps/map-reduce/
在 /user/root/examples/apps/map-reduce/ 里面建立 lib 文件夾,并把 打包好的 oozie-examples⑷.1.0.jar 上傳到這個(gè)目錄下
hdfs dfs -mkdir /user/root/examples/apps/map-reduce/lib
hdfs dfs -put oozie-examples/target/oozie-examples⑷.1.0.jar /user/root/examples/apps/map-reduce/lib
在hdfs上建立 /examples 文件夾
sudo -u hdfs hdfs dfs -mkdir /examples
把examples 文件夾里面的 srcmainapps 文件夾傳到這個(gè)文件夾底下
hdfs dfs -put examples/src/main/apps /examples
建立輸出跟輸入文件夾并上傳測試數(shù)據(jù)
hdfs dfs -mkdir -p /user/root/examples/input-data/text
hdfs dfs -mkdir -p /user/root/examples/output-data
hdfs dfs -put oozie-examples/src/main/data/data.txt /user/root/examples/input-data/text
Step4
運(yùn)行這個(gè)任務(wù)
oozie job -oozie http://host1:11000/oozie -config oozie-examples/src/main/apps/map-reduce/job.properties -run
任務(wù)創(chuàng)建成功后會(huì)返回1個(gè)job號(hào)比如 job: 0000017⑴50302164219871-oozie-oozi-W
然后你可以采取之條件供的 3 個(gè)連接oozie 的方法去查詢?nèi)蝿?wù)狀態(tài),這里我采取HUE去查詢的情況,點(diǎn)擊最上面的 Workflow -> 儀表盤 -> Workflow
會(huì)看到有1個(gè)任務(wù)正在運(yùn)行
點(diǎn)擊后,可以實(shí)時(shí)的看任務(wù)狀態(tài),完成后會(huì)變成SUCCESS
這時(shí)候候去看下結(jié)果 /user/root/examples/output-data/map-reduce/part-00000
0 To be or not to be, that is the question;
42 Whether 'tis nobler in the mind to suffer
84 The slings and arrows of outrageous fortune,
129 Or to take arms against a sea of troubles,
172 And by opposing, end them. To die, to sleep;
217 No more; and by a sleep to say we end
255 The heart-ache and the thousand natural shocks
302 That flesh is heir to ? 'tis a consummation
346 Devoutly to be wish'd. To die, to sleep;
387 To sleep, perchance to dream. Ay, there's the rub,
438 For in that sleep of death what dreams may come,
487 When we have shuffled off this mortal coil,
531 Must give us pause. There's the respect
571 That makes calamity of so long life,
608 For who would bear the whips and scorns of time,
657 Th'oppressor's wrong, the proud man's contumely,
706 The pangs of despised love, the law's delay,
751 The insolence of office, and the spurns
791 That patient merit of th'unworthy takes,
832 When he himself might his quietus make
871 With a bare bodkin? who would fardels bear,
915 To grunt and sweat under a weary life,
954 But that the dread of something after death,
999 The undiscovered country from whose bourn
1041 No traveller returns, puzzles the will,
1081 And makes us rather bear those ills we have
1125 Than fly to others that we know not of?
1165 Thus conscience does make cowards of us all,
1210 And thus the native hue of resolution
1248 Is sicklied o'er with the pale cast of thought,
1296 And enterprises of great pitch and moment
1338 With this regard their currents turn awry,
1381 And lose the name of action.
workflow.xml解析
我們把剛剛這個(gè)例子里面的workflow.xml打開看下
<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wf">
<start to="mr-node"/>
<action name="mr-node">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.mapper.class</name>
<value>org.apache.oozie.example.SampleMapper</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.apache.oozie.example.SampleReducer</value>
</property>
<property>
<name>mapred.map.tasks</name>
<value>1</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
最重要的就是里面的action 節(jié)點(diǎn)。
中間那段action 可以有支持幾種類型的action
- Map-Reduce Action
- Pig Action
- Fs(HDFS) Action
- Java Action
- Email Action
- Shell Action
- Hive Action
- Sqoop Action
- Ssh Action
- DistCp Action
- 自定義Action
- sub-workflow (這個(gè)可以嵌套另外1個(gè)workflow.xml文件的路徑)
具體見 http://oozie.apache.org/docs/4.1.0/WorkflowFunctionalSpec.html#a3.2_Workflow_Action_Nodes
這個(gè)簡單的map-reduce 其實(shí)甚么也沒干,只是把文本1行的讀取并打印出來。接下來我要把這個(gè)例子改成我們熟習(xí)的WordCount例子
WordCount例子
Step1
先改1下我們的Mapper 和 Reducer 代碼
修改SampleMapper為
package org.apache.oozie.example;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SampleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
然后再把Reducer修改成
package org.apache.oozie.example;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SampleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
改好后用 mvn clean package 打包好,還是上傳到 /user/root/examples/apps/map-reduce/lib 覆蓋之前的那份jar
這邊說1點(diǎn)題外話,關(guān)于MapReduce的old API跟new API的區(qū)分,這個(gè)跟我們這次的教程沒關(guān)系,如果不感興趣的同學(xué)可以直接跳過下面這1段
MapReduce 的 old API 跟 new API 區(qū)分
mapreduce 分為 old api 和 new api , new api廢棄了 org.apache.hadoop.mapred 包下的 Mapper 和 Reducer,新增了org.apache.hadoop.mapreduce包,如果你手頭有用舊api寫的mp(mapreduce)任務(wù)可以通過以下幾個(gè)改動(dòng)修改成新的mp寫法
- 將implements Mapper/Reducer 改成 extends Mapper/Reducer,由于new API 里 Mapper 和 Reducer不是接口,并且包的位置變成 org.apache.hadoop.mapreduce.Mapper
- OutputCollector 改成 Context
- map方法改成 map(LongWritable key, Text value, Context context) reduce 方法改成
具體見 Hadoop WordCount with new map reduce api
Step2
我們把之前的 src/main/apps/map-reduce/workflow.xml 修改1下成為這樣
<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wf">
<start to="mr-node"/>
<action name="mr-node">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapreduce.inputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
</property>
<property>
<name>mapreduce.outputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>org.apache.oozie.example.SampleMapper</value>
</property>
<property>
<name>mapreduce.reduce.class</name>
<value>org.apache.oozie.example.SampleReducer</value>
</property>
<property>
<name>mapred.map.tasks</name>
<value>1</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
我把中間的action 里面的屬性替換了,我說明1下幾個(gè)重要屬性
- mapred.mapper.new-api 和 mapred.reducer.new-api 意思是是不是要使用new API,我們這邊設(shè)置為true
- mapred.output.key.class 和 mapred.output.value.class 意思是 mapper的輸出類型
- mapreduce.map.class 和 mapreduce.reduce.class 這兩處連屬性名都修改了,可能很多人會(huì)發(fā)現(xiàn)不了,之前是 mapred.mapper.class 和 mapred.reducer.class ,如果你只改了value就會(huì)出錯(cuò),說new API的屬性里面沒有這兩個(gè)屬性
然后我們把workflow.xml上傳到hdfs上
hdfs dfs -put -f oozie-examples/src/main/apps/map-reduce/workflow.xml /user/root/examples/apps/map-reduce/
Step3
我們把素材準(zhǔn)備1下,還是之前做 wordcount 用的 file0 和 file1
$ echo "Hello World Bye World" > file0
$ echo "Hello Hadoop Goodbye Hadoop" > file1
$ hdfs dfs -put file* /user/root/examples/input-data/text
順便把之前的data.txt刪掉
hdfs dfs -rm /user/root/examples/input-data/text/data.txt
Step4
我們來運(yùn)行1下這個(gè)job
oozie job -oozie http://host1:11000/oozie -config oozie-examples/src/main/apps/map-reduce/job.properties -run
履行完后到 / user/ root/ examples/ output-data/ map-reduce/ part-r-00000 查看我們的結(jié)果
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
完成!
參考資料
- http://www.infoq.com/cn/articles/introductionOozie
- http://www.infoq.com/cn/articles/oozieexample
- https://github.com/yahoo/oozie/wiki/Oozie-Coord-Use-Cases
- https://oozie.apache.org/docs/3.1.3-incubating/CoordinatorFunctionalSpec.html#a2._Definitions
- http://oozie.apache.org/docs/4.1.0/DG_Examples.html
- https://github.com/jrkinley/oozie-examples
- http://codesfusion.blogspot.com/2013/10/hadoop-wordcount-with-new-map-reduce-api.html
- https://support.pivotal.io/hc/en-us/articles/203355837-How-to-run-a-Map-Reduce-jar-using-Oozie-workflow
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)