Spark Pair RDD 基本操作

云计算 waitig 460℃ 百度已收录 0评论
package com.guanyaqi

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat


import org.apache.spark.{SparkConf, SparkContext}

object PairRddTest {

  val conf=new SparkConf().setMaster("local[*]").setAppName("pairRdd test")
  val sc=new SparkContext(conf)

  def main(args: Array[String]): Unit = {
    //mapTest()
    //lookUpTest
    //aggragateTest()
    //coGroupTest()
    //subtractTest()
    //joinTest()
    saveFile()

  }

  def mapTest()={
    val map=List("关亚奇"->21,"小李子"->41,"朱宇"->60)
    val rdd=sc.parallelize(map)

    //map 每个人的薪水涨2000
    val mapResult=rdd.mapValues(_+2000)
    mapResult.foreach(println)

    //flatMapValues,只对value进行展平
    //根据薪水对人打标签,大于5000高收入,大于7000土豪,小于5000低收入

    val flatMapResult=rdd.flatMapValues(x=>{
     val tag1= x match{
        case x  if x<5000=>"低收入"
        case _=>"高收入"
      }

      x match{
        case x if x>7000=>Array(tag1,"土豪")
        case _=>Array(tag1)
      }

    })
    flatMapResult.foreach(println)

    val keysResult=rdd.keys
    val valueResult=rdd.values
    keysResult.foreach(println)
    valueResult.foreach(println)
  }

  def lookUpTest()={
    val map=List("关亚奇"->("语文",88),"关亚奇"->("英语"->60),"朱宇"->("体育"->75))
    val rdd=sc.parallelize(map)

    //key是关亚奇的所有记录
    val sep=rdd.lookup("关亚奇")
    sep.foreach(println)

    //collectMap
    val cmap=rdd.collectAsMap()
    cmap.foreach(println)

    //countByKey 计算每一个key值对应的元素的数量
    val countByKey=rdd.countByKey()
    countByKey.foreach(println)
  }

    def aggragateTest()={
      //reduceByKey
      val score=List("亚瑟 语文 70","亚瑟 数学 85","亚瑟 英语 99"
        ,"妲己 语文 69","妲己 数学 79","妲己 英语 95"
        ,"甄姬 语文 94","甄姬 数学 89","甄姬 英语 77")
      val rdd=sc.parallelize(score)

      val reduceRdd=rdd.map(x=>{
        val regex="(.+)\\s(.+)\\s(.+)".r
        x match{
          case regex(studentName,className,score)=>(studentName,score.toInt)
        }
      })
      val reduceResult=reduceRdd.reduceByKey((v1,v2)=>v1+v2)
      reduceResult.foreach(println)

      //foldBykey
      val foldResult=reduceResult.foldByKey(0)(_+_)
      foldResult.foreach(println)

      //aggragateByKey
      val aggregateResult=reduceRdd.aggregateByKey(0)(
        seqOp=(c,v)=>{c+v}
        ,combOp = (c1,c2)=>c1+c2
      )
      aggregateResult.foreach(println)

    //groupByKey
      val groupByKey=reduceRdd.groupByKey()
      groupByKey.foreach(x=>{

        println(s"${x._1}总分数:${x._2.sum}")
      })

      combineByKey
      val combineResult=reduceRdd.combineByKey[Int](
        initValue=>initValue
        ,(c,v)=>c+v
        ,(c1,c2)=>c1+c2
      )
      combineResult.foreach(println)

  //列出每个人的每一个学科的成绩如:小张:语文:语文,数学:30,英语:10
  val combineRdd=rdd.map(
    x=>{
    val info=x.split("\\s")
    (info(0),s"${info(1)} ${info(2)}")
  })
  val result=combineRdd.combineByKey(
    (fv:String)=>fv.split("\\s").mkString(":")
    ,(c:String,v:String)=>s"$c,${v.split("\\s").mkString(":")}"
    ,(c1:String,c2:String)=>s"$c1,$c2"
  )
  result.foreach(println)

    }


  //coGroupTest
  def coGroupTest()={
    val list1=List("小张"->"语文 50","小张"->"数学 60","小王"->"语文 70","小王"->"英语 50","小李"->"数学 80")
    val list2=List("小张"->"迟到 10","小张"->"旷课 20","小王"->"迟到 20","小王"->"旷课 0")

    val rdd1=sc.parallelize(list1)
    val rdd2=sc.parallelize(list2)

    val cogroupRdd=rdd1.cogroup(rdd2)
    cogroupRdd.foreach(x=>{

      println(s"姓名:${x._1},成绩信息:${x._2._1},违纪信息:${x._2._2}")
    })
    println(s"结果数据集:${cogroupRdd.count}")
  }

    //集合操作
    //subtractTest  根据key值减去rdd中的kv对记录
  def subtractTest()={
    val list1=List("小张"->"语文 50","小张"->"数学 60","小王"->"语文 70","小王"->"英语 50","小李"->"数学 80")
    val list2=List("小张"->111,"小刘"->666)
    val rdd1=sc.parallelize(list1)
    val rdd2=sc.parallelize(list2)

    val subResult=rdd1.subtractByKey(rdd2)
    subResult.foreach(println)
  }
  //关联操作,value形成一个新的元组
  def joinTest()={
    val list1=List("小张"->"","小李"->"","小王"->"","小刘"->"")
    val list2=List("小张"->23,"小王"->18,"小刘"->19,"小赵"->22)
    val rdd1=sc.parallelize(list1)
    val rdd2=sc.parallelize(list2)
    //内关联,两个rdd操作只需要保证key值一致,value不需要要求
    val innerJoin=rdd1.join(rdd2)
    innerJoin.foreach(x=>{
     println(s"姓名:${x._1},性别:${x._2._1},年龄:${x._2._2}")
    })
    //左外关联
    val leftOutJoin=rdd1.leftOuterJoin(rdd2)
    leftOutJoin.foreach(x=>{
      val age=x._2._2 match{
        case None=>"不详"
        case Some(a)=>a
      }

      println(s"姓名:${x._1},性别:${x._2._1},年龄:${age}")
    })

    //右外关联
    val rightOutJoin=rdd1.rightOuterJoin(rdd2)
    rightOutJoin.foreach(x=>{
      val gender=x._2._1 match{
        case None=>"不详"
        case Some(a)=>a
      }
      println(s"姓名:${x._1},性别:${gender},年龄:${x._2._2}")
    })

    //全外连接
    val fullOutJoin=rdd1.fullOuterJoin(rdd2)
    fullOutJoin.foreach(x=>{
      val gender=x._2._1 match{
        case None=>"不详"
        case Some(a)=>a
      }
      val age=x._2._2 match{
        case None=>"不详"
        case Some(a)=>a
      }
      println(s"姓名:${x._1},性别:${gender},年龄:${age}")
    })
  }
  //kv saveAs 文件的方式保存数据
  def saveFile()={
    val list=List("banana","apple","pear","orange","watermalon")
    val rdd=sc.parallelize(list).map(x=>(x,x.length))
    //保存为sequence文件
    rdd.saveAsSequenceFile("/spark-sequence-file")
    rdd.saveAsHadoopFile("/spark-file",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
  }
}


本文由【waitig】发表在等英博客
本文固定链接:Spark Pair RDD 基本操作
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)