日本搞逼视频_黄色一级片免费在线观看_色99久久_性明星video另类hd_欧美77_综合在线视频

國(guó)內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > 服務(wù)器 > Flume+Hadoop+Hive的離線分析系統(tǒng)基本架構(gòu)

Flume+Hadoop+Hive的離線分析系統(tǒng)基本架構(gòu)

來源:程序員人生   發(fā)布時(shí)間:2016-06-21 11:39:00 閱讀次數(shù):4183次
      PS:歷史緣由作者賬號(hào)名為:ymh198816,但事實(shí)上作者的生日其實(shí)不是1988年1月6日偷笑

      最近在學(xué)習(xí)大數(shù)據(jù)的離線分析技術(shù),所以在這里通過做1個(gè)簡(jiǎn)單的網(wǎng)站點(diǎn)擊流數(shù)據(jù)分析離線系統(tǒng)來和大家1起梳理1下離線分析系統(tǒng)的架構(gòu)模型。固然這個(gè)架構(gòu)模型只能是離線分析技術(shù)的1個(gè)簡(jiǎn)單的入門級(jí)架構(gòu),實(shí)際生產(chǎn)環(huán)境中的大數(shù)據(jù)離線分析技術(shù)還觸及到很多細(xì)節(jié)的處理和高可用的架構(gòu)。這篇文章的目的只是帶大家入個(gè)門,讓大家對(duì)離線分析技術(shù)有1個(gè)簡(jiǎn)單的認(rèn)識(shí),并和大家1起做學(xué)習(xí)交換。

離線分析系統(tǒng)的結(jié)構(gòu)圖
    

      全部離線分析的整體架構(gòu)就是使用FlumeFTP服務(wù)器上收集日志文件,并存儲(chǔ)在Hadoop HDFS文件系統(tǒng)上,再接著用Hadoopmapreduce清洗日志文件,最后使用HIVE構(gòu)建數(shù)據(jù)倉庫做離線分析。任務(wù)的調(diào)度使用Shell腳本完成,固然大家也能夠嘗試1些自動(dòng)化的任務(wù)調(diào)度工具,比如說AZKABANOOZIE等。
      分析所使用的點(diǎn)擊流日志文件主要來自Nginxaccess.log日志文件,需要注意的是在這里其實(shí)不是用Flume直接去生產(chǎn)環(huán)境上拉取nginx的日志文件,而是多設(shè)置了1層FTP服務(wù)器來緩沖所有的日志文件,然后再用Flume監(jiān)聽FTP服務(wù)器上指定的目錄并拉取目錄里的日志文件到HDFS服務(wù)器上(具體緣由下面分析)。從生產(chǎn)環(huán)境推送日志文件到FTP服務(wù)器的操作可以通過Shell腳本配合Crontab定時(shí)器來實(shí)現(xiàn)。

網(wǎng)站點(diǎn)擊流數(shù)據(jù)

       
         
         圖片來源:http://webdataanalysis.net/data-collection-and-preprocessing/weblog-to-clickstream/#comments

      1般在WEB系統(tǒng)中,用戶對(duì)站點(diǎn)的頁面的訪問閱讀,點(diǎn)擊行動(dòng)等1系列的數(shù)據(jù)都會(huì)記錄在日志中,每條日志記錄就代表著上圖中的1個(gè)數(shù)據(jù)點(diǎn);而點(diǎn)擊流數(shù)據(jù)關(guān)注的就是所有這些點(diǎn)連起來后的1個(gè)完全的網(wǎng)站閱讀行動(dòng)記錄,可以認(rèn)為是1個(gè)用戶對(duì)網(wǎng)站的閱讀session。比如說用戶從哪個(gè)外站進(jìn)入到當(dāng)前的網(wǎng)站,用戶接下來閱讀了當(dāng)前網(wǎng)站的哪些頁面,點(diǎn)擊了哪些圖片鏈接按鈕等1系列的行動(dòng)記錄,這1個(gè)整體的信息就稱為是該用戶的點(diǎn)擊流記錄。這篇文章中設(shè)計(jì)的離線分析系統(tǒng)就是搜集WEB系統(tǒng)中產(chǎn)生的這些數(shù)據(jù)日志,并清洗日志內(nèi)容存儲(chǔ)散布式的HDFS文件存儲(chǔ)系統(tǒng)上,接著使用離線分析工具HIVE去統(tǒng)計(jì)所有用戶的點(diǎn)擊流信息。

      本系統(tǒng)中我們采取Nginx的access.log來做點(diǎn)擊流分析的日志文件。access.log日志文件的格式以下:
      樣例數(shù)據(jù)格式:
      124.42.13.230 - - [18/Sep/2013:06:57:50 +0000] "GET /shoppingMall?ver=1.2.1 HTTP/1.1" 200 7200 "http://www.baidu.com.cn" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)"
        格式分析: 
        1、 訪客ip地址:124.42.13.230
        2、訪客用戶信息: - -
          3、要求時(shí)間:[18/Sep/2013:06:57:50 +0000]
        4、要求方式:GET
          5、要求的url/shoppingMall?ver=1.10.2
        6、要求所用協(xié)議:HTTP/1.1
          7、響應(yīng)碼:200
        8、返回的數(shù)據(jù)流量:7200
        9、訪客的來源urlhttp://www.baidu.com.cn
        10、訪客所用閱讀器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)

 搜集用戶數(shù)據(jù)
      網(wǎng)站會(huì)通過前端JS代碼或服務(wù)器真?zhèn)€后臺(tái)代碼搜集用戶閱讀數(shù)據(jù)并存儲(chǔ)在網(wǎng)站服務(wù)器中。1般運(yùn)維人員會(huì)在離線分析系統(tǒng)和真實(shí)生產(chǎn)環(huán)境之間部署FTP服務(wù)器,并將生產(chǎn)環(huán)境上的用戶數(shù)據(jù)每天定時(shí)發(fā)送到FTP服務(wù)器上,離線分析系統(tǒng)就會(huì)從FTP服務(wù)上收集數(shù)據(jù)而不會(huì)影響到生產(chǎn)環(huán)境。
      
收集數(shù)據(jù)的方式有多種,1種是通過自己編寫shell腳本或Java編程收集數(shù)據(jù),但是工作量大,不方便保護(hù),另外一種就是直接使用第3方框架去進(jìn)行日志的收集,1般第3方框架的硬朗性,容錯(cuò)性和易用性都做得很好也易于保護(hù)。本文彩用第3方框架Flume進(jìn)行日志收集,Flume是1個(gè)散布式的高效的日志收集系統(tǒng),它能把散布在不同服務(wù)器上的海量日志文件數(shù)據(jù)統(tǒng)1搜集到1個(gè)集中的存儲(chǔ)資源中,FlumeApache的1個(gè)頂級(jí)項(xiàng)目,與Hadoop也有很好的兼容性。不過需要注意的是Flume其實(shí)不是1個(gè)高可用的框架,這方面的優(yōu)化得用戶自己去保護(hù)。
        Flume
agent是運(yùn)行在JVM上的,所以各個(gè)服務(wù)器上的JVM環(huán)境必不可少。每個(gè)Flume agent部署在1臺(tái)服務(wù)器上,Flume會(huì)搜集web server 產(chǎn)生的日志數(shù)據(jù),并封裝成1個(gè)個(gè)的事件發(fā)送給Flume AgentSourceFlume Agent Source會(huì)消費(fèi)這些搜集來的數(shù)據(jù)事件并放在Flume Agent ChannelFlume Agent Sink會(huì)從Channel中搜集這些收集過來的數(shù)據(jù),要末存儲(chǔ)在本地的文件系統(tǒng)中要末作為1個(gè)消費(fèi)資源分發(fā)給下1個(gè)裝在散布式系統(tǒng)中其它服務(wù)器上的Flume進(jìn)行處理。Flume提供了點(diǎn)對(duì)點(diǎn)的高可用的保障,某個(gè)服務(wù)器上的Flume Agent Channel中的數(shù)據(jù)只有確保傳輸?shù)搅肆硗庖粋€(gè)服務(wù)器上的Flume Agent Channel里或正確保存到了本地的文件存儲(chǔ)系統(tǒng)中,才會(huì)被移除。
本系統(tǒng)中每個(gè)FTP服務(wù)器Hadoopname node服務(wù)器上都要部署1個(gè)Flume AgentFTPFlume Agent收集Web Server的日志并匯總到name node服務(wù)器上的Flume Agent,最后由hadoop name node服務(wù)器將所有的日志數(shù)據(jù)下沉到散布式的文件存儲(chǔ)系統(tǒng)HDFS上面。
      需要注意的是Flume的Source在本文的系統(tǒng)當(dāng)選擇的是Spooling Directory Source,而沒有選擇Exec Source,由于當(dāng)Flume服務(wù)down掉的時(shí)候Spooling Directory Source能記錄上1次讀取到的位置,而Exec Source則沒有,需要用戶自己去處理,當(dāng)重啟Flume服務(wù)器的時(shí)候如果處理不好就會(huì)有重復(fù)數(shù)據(jù)的問題。固然Spooling Directory Source也是有缺點(diǎn)的,會(huì)對(duì)讀取過的文件重命名,所以多架1層FTP服務(wù)器也是為了不Flume“污染”生產(chǎn)環(huán)境。Spooling Directory Source另外1個(gè)比較大的缺點(diǎn)就是沒法做到靈活監(jiān)聽某個(gè)文件夾底下所有子文件夾里的所有文件里新追加的內(nèi)容。關(guān)于這些問題的解決方案也有很多,比如選擇其它的日志收集工具,像logstash等。
       FTP
服務(wù)器上的Flume配置文件以下:   
agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = spooldir agent.sources.origin.spoolDir = /export/data/trivial/weblogs agent.sources.origin.channels = memorychannel agent.sources.origin.deserializer.maxLineLength = 2048 agent.sources.origin.interceptors = i2 agent.sources.origin.interceptors.i2.type = host agent.sources.origin.interceptors.i2.hostHeader = hostname agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 10000 agent.sinks.target.type = avro agent.sinks.target.channel = memorychannel agent.sinks.target.hostname = 172.16.124.130 agent.sinks.target.port = 4545
     這里有幾個(gè)參數(shù)需要說明,Flume Agent Source可以通過配置deserializer.maxLineLength這個(gè)屬性來指定每一個(gè)Event的大小,默許是每一個(gè)Event2048個(gè)byteFlume Agent Channel的大小默許等于于本地服務(wù)器JVM所獲得到的內(nèi)存的80%,用戶可以通過byteCapacityBufferPercentagebyteCapacity兩個(gè)參數(shù)去進(jìn)行優(yōu)化。
     
需要特別注意的是FTP上放入Flume監(jiān)聽的文件夾中的日志文件不能同名,不然Flume會(huì)報(bào)錯(cuò)并停止工作,最好的解決方案就是為每份日志文件拼上時(shí)間戳。

     Hadoop服務(wù)器上的配置文件以下:   
agent.sources = origin agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = avro agent.sources.origin.channels = memorychannel agent.sources.origin.bind = 0.0.0.0 agent.sources.origin.port = 4545 #agent.sources.origin.interceptors = i1 i2 #agent.sources.origin.interceptors.i1.type = timestamp #agent.sources.origin.interceptors.i2.type = host #agent.sources.origin.interceptors.i2.hostHeader = hostname agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 5000000 agent.channels.memorychannel.transactionCapacity = 1000000 agent.sinks.target.type = hdfs agent.sinks.target.channel = memorychannel agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S agent.sinks.target.hdfs.filePrefix = data-%{hostname} agent.sinks.target.hdfs.rollInterval = 60 agent.sinks.target.hdfs.rollSize = 1073741824 agent.sinks.target.hdfs.rollCount = 1000000 agent.sinks.target.hdfs.round = true agent.sinks.target.hdfs.roundValue = 10 agent.sinks.target.hdfs.roundUnit = minute agent.sinks.target.hdfs.useLocalTimeStamp = true agent.sinks.target.hdfs.minBlockReplicas=1 agent.sinks.target.hdfs.writeFormat=Text agent.sinks.target.hdfs.fileType=DataStream

round, roundValue,roundUnit3個(gè)參數(shù)是用來配置每10分鐘在hdfs里生成1個(gè)文件夾保存從FTP服務(wù)器上拉取下來的數(shù)據(jù)。

    Troubleshooting 
       使用Flume拉取文件到HDFS中會(huì)遇到將文件分散成多個(gè)1KB⑸KB的小文件的問題   
       需要注意的是如果遇到Flume會(huì)將拉取過來的文件分成很多份1KB⑸KB的小文件存儲(chǔ)到HDFS上,那末極可能是HDFS Sink的配置不正確,致使系統(tǒng)使用了默許配置。spooldir類型的source是將指定目錄中的文件的每行封裝成1個(gè)event放入到channel中,默許每行最大讀取1024個(gè)字符。在HDFS Sink端主要是通過rollInterval(默許30秒), rollSize(默許1KB), rollCount(默許10個(gè)event)3個(gè)屬性來決定寫進(jìn)HDFS的分片文件的大小。rollInterval表示經(jīng)過量少秒后就將當(dāng)前.tmp文件(寫入的是從channel中過來的events)下沉到HDFS文件系統(tǒng)中,rollSize表示1旦.tmp文件到達(dá)1定的size后,就下沉到HDFS文件系統(tǒng)中,rollCount表示.tmp文件1旦寫入了指定數(shù)量的events就下沉到HDFS文件系統(tǒng)中。

       使用Flume拉取到HDFS中的文件格式錯(cuò)亂
       這是由于HDFS Sink的配置中,hdfs.writeFormat屬性默許為“Writable”會(huì)將本來的文件的內(nèi)容序列化成HDFS的格式,應(yīng)當(dāng)手動(dòng)設(shè)置成hdfs.writeFormat=“text”; 并且hdfs.fileType默許是“SequenceFile”類型的,是將所有event拼成1行,應(yīng)當(dāng)該手動(dòng)設(shè)置成hdfs.fileType=“DataStream”,這樣就能夠是1行1個(gè)event,與原文件格式保持1致

使用Mapreduce清洗日志文件
當(dāng)把日志文件中的數(shù)據(jù)拉取到HDFS文件系統(tǒng)后,使用Mapreduce程序去進(jìn)行日志清洗
第1步,先用Mapreduce過濾掉無效的數(shù)據(jù)
package com.guludada.clickstream; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.dataparser.WebLogParser; public class logClean { public static class cleanMap extends Mapper<Object,Text,Text,NullWritable> { private NullWritable v = NullWritable.get(); private Text word = new Text(); WebLogParser webLogParser = new WebLogParser(); public void map(Object key,Text value,Context context) { //將1行內(nèi)容轉(zhuǎn)成string String line = value.toString(); String cleanContent = webLogParser.parser(line); if(cleanContent != "") { word.set(cleanContent); try { context.write(word,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(logClean.class); //指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類 job.setMapperClass(cleanMap.class); //指定mapper輸出數(shù)據(jù)的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //指定job的輸入原始文件所在目錄 Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); FileInputFormat.setInputPaths(job, new Path("/flume/events/" + dateStr + "/*/*")); //指定job的輸出結(jié)果所在目錄 FileOutputFormat.setOutputPath(job, new Path("/clickstream/cleandata/"+dateStr+"/")); //將job中配置的相干參數(shù),和job所用的java類所在的jar包,提交給yarn去運(yùn)行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }

package com.guludada.dataparser; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.guludada.javabean.WebLogBean; /** * 用正則表達(dá)式匹配出合法的日志記錄 * * */ public class WebLogParser { public String parser(String weblog_origin) { WebLogBean weblogbean = new WebLogBean(); // 獲得IP地址 Pattern IPPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+"); Matcher IPMatcher = IPPattern.matcher(weblog_origin); if(IPMatcher.find()) { String IPAddr = IPMatcher.group(0); weblogbean.setIP_addr(IPAddr); } else { return "" } // 獲得時(shí)間信息 Pattern TimePattern = Pattern.compile("\\[(.+)\\]"); Matcher TimeMatcher = TimePattern.matcher(weblog_origin); if(TimeMatcher.find()) { String time = TimeMatcher.group(1); String[] cleanTime = time.split(" "); weblogbean.setTime(cleanTime[0]); } else { return ""; } //獲得其余要求信息 Pattern InfoPattern = Pattern.compile( "(\\\"[POST|GET].+?\\\") (\\d+) (\\d+).+?(\\\".+?\\\") (\\\".+?\\\")"); Matcher InfoMatcher = InfoPattern.matcher(weblog_origin); if(InfoMatcher.find()) { String requestInfo = InfoMatcher.group(1).replace('\"',' ').trim(); String[] requestInfoArry = requestInfo.split(" "); weblogbean.setMethod(requestInfoArry[0]); weblogbean.setRequest_URL(requestInfoArry[1]); weblogbean.setRequest_protocol(requestInfoArry[2]); String status_code = InfoMatcher.group(2); weblogbean.setRespond_code(status_code); String respond_data = InfoMatcher.group(3); weblogbean.setRespond_data(respond_data); String request_come_from = InfoMatcher.group(4).replace('\"',' ').trim(); weblogbean.setRequst_come_from(request_come_from); String browserInfo = InfoMatcher.group(5).replace('\"',' ').trim(); weblogbean.setBrowser(browserInfo); } else { return ""; } return weblogbean.toString(); } }
package com.guludada.javabean; public class WebLogBean { String IP_addr; String time; String method; String request_URL; String request_protocol; String respond_code; String respond_data; String requst_come_from; String browser; public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public String getRequest_URL() { return request_URL; } public void setRequest_URL(String request_URL) { this.request_URL = request_URL; } public String getRequest_protocol() { return request_protocol; } public void setRequest_protocol(String request_protocol) { this.request_protocol = request_protocol; } public String getRespond_code() { return respond_code; } public void setRespond_code(String respond_code) { this.respond_code = respond_code; } public String getRespond_data() { return respond_data; } public void setRespond_data(String respond_data) { this.respond_data = respond_data; } public String getRequst_come_from() { return requst_come_from; } public void setRequst_come_from(String requst_come_from) { this.requst_come_from = requst_come_from; } public String getBrowser() { return browser; } public void setBrowser(String browser) { this.browser = browser; } @Override public String toString() { return IP_addr + " " + time + " " + method + " " + request_URL + " " + request_protocol + " " + respond_code + " " + respond_data + " " + requst_come_from + " " + browser; } }

第1第二天記清洗后的記錄以下圖:
 


2步,根據(jù)訪問記錄生成相應(yīng)的Session信息記錄假定Session的過期時(shí)間是30分鐘

package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.dataparser.SessionParser; import com.guludada.dataparser.WebLogParser; import com.guludada.javabean.WebLogSessionBean; public class logSession { public static class sessionMapper extends Mapper<Object,Text,Text,Text> { private Text IPAddr = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); WebLogParser webLogParser = new WebLogParser(); public void map(Object key,Text value,Context context) { //將1行內(nèi)容轉(zhuǎn)成string String line = value.toString(); String[] weblogArry = line.split(" "); IPAddr.set(weblogArry[0]); content.set(line); try { context.write(IPAddr,content); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } static class sessionReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text IPAddr = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); WebLogParser webLogParser = new WebLogParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SessionParser sessionParser = new SessionParser(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Date sessionStartTime = null; String sessionID = UUID.randomUUID().toString(); //將IP地址所對(duì)應(yīng)的用戶的所有閱讀記錄按時(shí)間排序 ArrayList<WebLogSessionBean> sessionBeanGroup = new ArrayList<WebLogSessionBean>(); for(Text browseHistory : values) { WebLogSessionBean sessionBean = sessionParser.loadBean(browseHistory.toString()); sessionBeanGroup.add(sessionBean); } Collections.sort(sessionBeanGroup,new Comparator<WebLogSessionBean>() { public int compare(WebLogSessionBean sessionBean1, WebLogSessionBean sessionBean2) { Date date1 = sessionBean1.getTimeWithDateFormat(); Date date2 = sessionBean2.getTimeWithDateFormat(); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } }); for(WebLogSessionBean sessionBean : sessionBeanGroup) { if(sessionStartTime == null) { //當(dāng)天日志中某用戶第1次訪問網(wǎng)站的時(shí)間 sessionStartTime = timeTransform(sessionBean.getTime()); content.set(sessionParser.parser(sessionBean, sessionID)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { Date sessionEndTime = timeTransform(sessionBean.getTime()); long sessionStayTime = timeDiffer(sessionStartTime,sessionEndTime); if(sessionStayTime > 30 * 60 * 1000) { //將當(dāng)前閱讀記錄的時(shí)間設(shè)為下1個(gè)session的開始時(shí)間 sessionStartTime = timeTransform(sessionBean.getTime()); sessionID = UUID.randomUUID().toString(); continue; } content.set(sessionParser.parser(sessionBean, sessionID)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } private Date timeTransform(String time) { Date standard_time = null; try { standard_time = sdf.parse(time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } return standard_time; } private long timeDiffer(Date start_time,Date end_time) { long diffTime = 0; diffTime = end_time.getTime() - start_time.getTime(); return diffTime; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(logClean.class); //指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類 job.setMapperClass(sessionMapper.class); job.setReducerClass(sessionReducer.class); //指定mapper輸出數(shù)據(jù)的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定終究輸出的數(shù)據(jù)的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path("/clickstream/cleandata/"+dateStr+"/*")); //指定job的輸出結(jié)果所在目錄 FileOutputFormat.setOutputPath(job, new Path("/clickstream/sessiondata/"+dateStr+"/")); //將job中配置的相干參數(shù),和job所用的java類所在的jar包,提交給yarn去運(yùn)行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import com.guludada.javabean.WebLogSessionBean; public class SessionParser { SimpleDateFormat sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH); SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String parser(WebLogSessionBean sessionBean,String sessionID) { sessionBean.setSession(sessionID); return sessionBean.toString(); } public WebLogSessionBean loadBean(String sessionContent) { WebLogSessionBean weblogSession = new WebLogSessionBean(); String[] contents = sessionContent.split(" "); weblogSession.setTime(timeTransform(contents[1])); weblogSession.setIP_addr(contents[0]); weblogSession.setRequest_URL(contents[3]); weblogSession.setReferal(contents[7]); return weblogSession; } private String timeTransform(String time) { Date standard_time = null; try { standard_time = sdf_origin.parse(time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } return sdf_final.format(standard_time); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class WebLogSessionBean { String time; String IP_addr; String session; String request_URL; String referal; public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRequest_URL() { return request_URL; } public void setRequest_URL(String request_URL) { this.request_URL = request_URL; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public Date getTimeWithDateFormat() { SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if(this.time != null && this.time != "") { try { return sdf_final.parse(this.time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } @Override public String toString() { return time + " " + IP_addr + " " + session + " " + request_URL + " " + referal; } }

第2次清算出來的Session信息結(jié)構(gòu)以下:
時(shí)間 IP SessionID 要求頁面URL Referal URL
2015-05⑶0 19:38:00 192.168.12.130 Session1 /blog/me www.baidu.com
2015-05⑶0 19:39:00 192.168.12.130 Session1 /blog/me/details www.mysite.com/blog/me
2015-05⑶0 19:38:00 192.168.12.40 Session2 /blog/me www.baidu.com



第3步,清洗第2步生成的Session信息,生成PageViews信息表
package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.clickstream.logSession.sessionMapper; import com.guludada.clickstream.logSession.sessionReducer; import com.guludada.dataparser.PageViewsParser; import com.guludada.dataparser.SessionParser; import com.guludada.dataparser.WebLogParser; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.WebLogSessionBean; public class PageViews { public static class pageMapper extends Mapper<Object,Text,Text,Text> { private Text word = new Text(); public void map(Object key,Text value,Context context) { String line = value.toString(); String[] webLogContents = line.split(" "); //根據(jù)session來分組 word.set(webLogContents[2]); try { context.write(word,value); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class pageReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text session = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); PageViewsParser pageViewsParser = new PageViewsParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //上1條記錄的訪問信息 PageViewsBean lastStayPageBean = null; Date lastVisitTime = null; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //將session所對(duì)應(yīng)的所有閱讀記錄按時(shí)間排序 ArrayList<PageViewsBean> pageViewsBeanGroup = new ArrayList<PageViewsBean>(); for(Text pageView : values) { PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageView.toString()); pageViewsBeanGroup.add(pageViewsBean); } Collections.sort(pageViewsBeanGroup,new Comparator<PageViewsBean>() { public int compare(PageViewsBean pageViewsBean1, PageViewsBean pageViewsBean2) { Date date1 = pageViewsBean1.getTimeWithDateFormat(); Date date2 = pageViewsBean2.getTimeWithDateFormat(); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } }); //計(jì)算每一個(gè)頁面的停留時(shí)間 int step = 0; for(PageViewsBean pageViewsBean : pageViewsBeanGroup) { Date curVisitTime = pageViewsBean.getTimeWithDateFormat(); if(lastStayPageBean != null) { //計(jì)算前后兩次訪問記錄像差的時(shí)間,單位是秒 Integer timeDiff = (int) ((curVisitTime.getTime() - lastVisitTime.getTime())/1000); //根據(jù)當(dāng)前記錄的訪問信息更新上1條訪問記錄中訪問的頁面的停留時(shí)間 lastStayPageBean.setStayTime(timeDiff.toString()); } //更新訪問記錄的步數(shù) step++; pageViewsBean.setStep(step+""); //更新上1條訪問記錄的停留時(shí)間后,將當(dāng)前訪問記錄設(shè)定為上1條訪問信息記錄 lastStayPageBean = pageViewsBean; lastVisitTime = curVisitTime; //輸出pageViews信息 content.set(pageViewsParser.parser(pageViewsBean)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(PageViews.class); //指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類 job.setMapperClass(pageMapper.class); job.setReducerClass(pageReducer.class); //指定mapper輸出數(shù)據(jù)的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定終究輸出的數(shù)據(jù)的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*")); //指定job的輸出結(jié)果所在目錄 FileOutputFormat.setOutputPath(job, new Path("/clickstream/pageviews/"+dateStr+"/")); //將job中配置的相干參數(shù),和job所用的java類所在的jar包,提交給yarn去運(yùn)行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.WebLogSessionBean; public class PageViewsParser { /** * 根據(jù)logSession的輸出數(shù)據(jù)加載PageViewsBean * * */ public PageViewsBean loadBean(String sessionContent) { PageViewsBean pageViewsBean = new PageViewsBean(); String[] contents = sessionContent.split(" "); pageViewsBean.setTime(contents[0] + " " + contents[1]); pageViewsBean.setIP_addr(contents[2]); pageViewsBean.setSession(contents[3]); pageViewsBean.setVisit_URL(contents[4]); pageViewsBean.setStayTime("0"); pageViewsBean.setStep("0"); return pageViewsBean; } public String parser(PageViewsBean pageBean) { return pageBean.toString(); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class PageViewsBean { String session; String IP_addr; String time; String visit_URL; String stayTime; String step; public String getSession() { return session; } public void setSession(String s
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 亚洲 欧美 综合 | 一区二区视屏 | 在线黄网| 亚洲视频免费在线观看 | 久久久婷| 男女爱爱免费视频 | 久一久久 | 在线午夜 | 福利在线看| 久久之久久 | 视频一区国产精品 | 亚洲成人av电影网站 | 国产美女无遮挡网站 | 羞羞视频在线观看免费 | 欧美伊人精品成人久久综合97 | 免费成人在线观看 | 欧美一区二区三区视频 | 成年人免费在线观看 | 国产精品免费观看视频 | 午夜久久久 | 日韩综合久久 | 精品视频在线观看一区二区三区 | 国产一区二区高清视频 | 99久久精品国产一区二区三区 | 色婷婷久久 | 精品2区| 欧美日韩电影一区二区 | 在线观看国产黄色 | 亚洲男人天堂2024 | av在线成人| 国产一区二区在线免费观看 | 精品国产91乱码一区二区三区 | 天堂在线中文 | 亚洲一区二区三区精品视频 | 亚洲自拍偷拍视频 | 国产伦精品一区二区三区四区免费 | 国av级一级理论片 | 国产三级电影在线播放 | 久久二区视频 | 人人九九精品 | 色一区二区三区四区 |