弹性分布式数据集RDD(代码实现)

云计算 waitig 454℃ 百度已收录 0评论

统计用户对每个学科的各个模块访问的次数,取top3
RDD(textFile) -> RDD(map) -> RDD(reduceByKey) -> RDD(groupBy) -> RDD(sortBy).reverse.task

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  *统计用户对每个学科的各个模块访问的次数,取top3
  */
object CountClass1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CountClass1").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //获取数据
    val file: RDD[String] = sc.textFile("C://Users/hositity/Desktop/access.txt")
    //切分数据,取出url生成元组
    val urlAndOne: RDD[(String, Int)] = file.map(line => {
      val fields = line.split("\t")
      val url: String = fields(1)

      (url, 1)
    })

    //相同的url聚合
    val sumUrl: RDD[(String, Int)] = urlAndOne.reduceByKey(_+_)
    //获取学科信息
    val project: RDD[(String, String, Int)] = sumUrl.map(x => {
      val url = x._1
      val count = x._2
      val project = new URL(url).getHost

      (project, url, count)
    })

    //以学科信息分组,整合后得到结果
    val result: RDD[(String, List[(String, String, Int)])] =
      project.groupBy(_._1).mapValues(_.toList.sortBy(_._3).reverse.take(2))

    println(result.collect().toBuffer)
    sc.stop()
  }
}

在IDEA上运行结果
运行结果

*自定义一个分区器(继承Partitioner重写方法)
按照每个学科的数据放到不同的分区里
并调用saveAsTextFile***

package myRPC.qf.itcast.RDD

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

/**
  * 自定义一个分区器
  * 按照每个学科的数据放到不同的分区里
  */
object CountClass2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CountClass2").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //获取数据
    val file = sc.textFile("C://Users/hositity/Desktop/access.txt")
    //切分数据,取出url生成元组
    val urlAndOne: RDD[(String, Int)] = file.map(line => {
      val fields = line.split("\t")
      val url = fields(1)
      (url, 1)
    })
    //相同的url聚合
    val sumedUrl: RDD[(String, Int)] = urlAndOne.reduceByKey(_+_)
    //获取学科信息并缓存
    val cachedProject: RDD[(String, (String, Int))] = sumedUrl.map(x => {
      val url = x._1
      val count = x._2
      val project = new URL(url).getHost

      (project, (url, count))
    }).cache()

    //调用Spark默认的分区器此时会发生哈希碰撞,x % 3 的hash值会分到同一块分区
//    val partition: RDD[(String, (String, Int))] = cachedProject.partitionBy(new HashPartitioner(3))

    //得到所有学科
    val projects: Array[String] = cachedProject.keys.distinct().collect()
    //调用自定义分区并获取分区号
    val partitioner: ProjectPartitioner = new ProjectPartitioner(projects)
    //分区
    val partitioned: RDD[(String, (String, Int))] = cachedProject.partitionBy(partitioner)
    //对每个分区的数据进行排序并取top3
    val result: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
      it.toList.sortBy(_._2._2).reverse.take(2).iterator
    })
    result.saveAsTextFile("E://hadoop/log")

    sc.stop()
  }
}

//自定义分区器
class ProjectPartitioner(projects: Array[String]) extends Partitioner{
  //用来存储学科和分区号
  private val projectsAndPartNum = new mutable.HashMap[String,Int]()
  //计数器,用于生成分区号
  var n = 0
  for(p <- projects){
    projectsAndPartNum += (p -> n)
    n += 1
  }

  //获取分区数
  override def numPartitions = projects.length
  //获取分区号
  override def getPartition(key: Any):Int ={
    projectsAndPartNum.getOrElse(key.toString,0)
  }
}

结果:
运行结果
: 在各个分区的文件中的数据不会出现数据倾斜的情况


本文由【waitig】发表在等英博客
本文固定链接:弹性分布式数据集RDD(代码实现)
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)