比較了解,這里我們用Apriori算法及其優(yōu)化算法實(shí)現(xiàn)。大家好,下面為大家分享的實(shí)戰(zhàn)案例是K-頻繁相機(jī)發(fā)掘并行化算法。相信從事數(shù)據(jù)發(fā)掘相干工作的同學(xué)對(duì)頻繁項(xiàng)集的相干算法
首先說(shuō)1下實(shí)驗(yàn)結(jié)果。對(duì)2G,1800W條記錄的數(shù)據(jù),我們用了18秒就算完了1⑻頻繁項(xiàng)集的發(fā)掘。應(yīng)當(dāng)還算不錯(cuò)。
給出題目:
本題的較第4題難度更大。我們?cè)趯?xiě)程序的時(shí)候1定要注意寫(xiě)出的程序是并行化的,而不是只在client上運(yùn)行的單機(jī)程序。否
則你的算法效力將讓你跌破眼鏡。另外還需要對(duì)算法做相干優(yōu)化。在這里主要和大家交換1下算法思路和相干優(yōu)化。
對(duì)Apriori算法的實(shí)現(xiàn)在這里不做過(guò)量贅述,百度1下大片大片。在Spark上實(shí)現(xiàn)這個(gè)算法的時(shí)候主要分為兩個(gè)階段第1階段
是1個(gè)整體的循環(huán)求出每一個(gè)項(xiàng)集的階段,第2階段主要是針對(duì)第i個(gè)項(xiàng)集求出第i+1項(xiàng)集的候選集的階段。
對(duì)這個(gè)算法可以做以下優(yōu)化:
- 視察!這點(diǎn)很重要,經(jīng)過(guò)視察可以發(fā)現(xiàn)有大量重復(fù)的數(shù)據(jù),所謂方向不對(duì)努力白費(fèi)也是這個(gè)道理,首先需要緊縮重復(fù)的數(shù)據(jù)。不然會(huì)做許多無(wú)用功。
- 設(shè)計(jì)算法的時(shí)候1定要注意是并行化的,大家可能很疑惑,Spark不就是并行化的么?可是你1不謹(jǐn)慎可能就寫(xiě)成只在client端運(yùn)行的算法了。
- 由于數(shù)據(jù)量比較大,切記多使用數(shù)據(jù)持久化和BroadCast廣播變量對(duì)中間數(shù)據(jù)進(jìn)行相應(yīng)處理。
- 數(shù)據(jù)結(jié)構(gòu)的優(yōu)化,BitSet是1種優(yōu)秀的數(shù)據(jù)結(jié)構(gòu)他只需1位就能夠存儲(chǔ)以個(gè)整形數(shù),對(duì)所給出的數(shù)據(jù)都是整數(shù)的情況特別適用。
下面給出算法實(shí)現(xiàn)源碼:
- import scala.util.control.Breaks._
- import scala.collection.mutable.ArrayBuffer
- import java.util.BitSet
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark._
- object FrequentItemset {
- def main(args: Array[String]) {
- if (args.length != 2) {
- println("USage:<Datapath> <Output>")
- }
- //initial SparkContext
- val sc = new SparkContext()
- val SUPPORT_NUM = 15278611 //Transactions total is num=17974836, SUPPORT_NUM = num*0.85
- val TRANSACITON_NUM = 17974836.0
- val K = 8
- //All transactions after removing transaction ID, and here we combine the same transactions.
- val transactions = sc.textFile(args(0)).map(line =>
- line.substring(line.indexOf(" ") + 1).trim).map((_, 1)).reduceByKey(_ + _).map(line => {
- val bitSet = new BitSet()
- val ss = line._1.split(" ")
- for (i <- 0 until ss.length) {
- bitSet.set(ss(i).toInt, true)
- }
- (bitSet, line._2)
- }).cache()
- //To get 1 frequent itemset, here, fi represents frequent itemset
- var fi = transactions.flatMap { line =>
- val tmp = new ArrayBuffer[(String, Int)]
- for (i <- 0 until line._1.size()) {
- if (line._1.get(i)) tmp += ((i.toString, line._2))
- }
- tmp
- }.reduceByKey(_ + _).filter(line1 => line1._2 >= SUPPORT_NUM).cache()
- val result = fi.map(line => line._1 + ":" + line._2 / TRANSACITON_NUM)
- result.saveAsTextFile(args(1) + "/result⑴")
- for (i <- 2 to K) {
- val candiateFI = getCandiateFI(fi.map(_._1).collect(), i)
- val bccFI = sc.broadcast(candiateFI)
- //To get the final frequent itemset
- fi = transactions.flatMap { line =>
- val tmp = new ArrayBuffer[(String, Int)]()
- //To check if each itemset of candiateFI in transactions
- bccFI.value.foreach { itemset =>
- val itemArray = itemset.split(",")
- var count = 0
- for (item <- itemArray) if (line._1.get(item.toInt)) count += 1
- if (count == itemArray.size) tmp += ((itemset, line._2))
- }
- tmp
- }.reduceByKey(_ + _).filter(_._2 >= SUPPORT_NUM).cache()
- val result = fi.map(line => line._1 + ":" + line._2 / TRANSACITON_NUM)
- result.saveAsTextFile(args(1) + "/result-" + i)
- bccFI.unpersist()
- }
- }
- //To get the candiate k frequent itemset from k⑴ frequent itemset
- def getCandiateFI(f: Array[String], tag: Int) = {
- val separator = ","
- val arrayBuffer = ArrayBuffer[String]()
- for(i <- 0 until f.length;j <- i + 1 until f.length){
- var tmp = ""
- if(2 == tag) tmp = (f(i) + "," + f(j)).split(",").sortWith((a,b) => a.toInt <= b.toInt).reduce(_+","+_)
- else {
- if (f(i).substring(0, f(i).lastIndexOf(',')).equals(f(j).substring(0, f(j).lastIndexOf(',')))) {
- tmp = (f(i) + f(j).substring(f(j).lastIndexOf(','))).split(",").sortWith((a, b) => a.toInt <= b.toInt).reduce(_ + "," + _)
- }
- }
- var hasInfrequentSubItem = false //To filter the item which has infrequent subitem
- if (!tmp.equals("")) {
- val arrayTmp = tmp.split(separator)
- breakable {
- for (i <- 0 until arrayTmp.size) {
- var subItem = ""
- for (j <- 0 until arrayTmp.size) {
- if (j != i) subItem += arrayTmp(j) + separator
- }
- //To remove the separator "," in the end of the item
- subItem = subItem.substring(0, subItem.lastIndexOf(separator))
- if (!f.contains(subItem)) {
- hasInfrequentSubItem = true
- break
- }
- }
- } //breakable
- }
- else hasInfrequentSubItem = true
- //If itemset has no sub inftequent itemset, then put it into candiateFI
- if (!hasInfrequentSubItem) arrayBuffer += (tmp)
- } //for
- arrayBuffer.toArray
- }
- }
先寫(xiě)到這里,歡迎大家提出相干的建議或意見(jiàn)。(by老楊,轉(zhuǎn)載請(qǐng)注明出處)