sparkSQL操作结果集

云计算 waitig 531℃ 百度已收录 0评论

原始数据

zhang san,15
li si,15
wang wu,20
zhao liu,22
zhang san,42
li wu,22
li si,20
hello world,18
hello world,18

    /**
      * 从文本文件中创建Person对象的RDD,将其转换为Dataframe
      */
    @Test
    def test4(): Unit ={
        val ss: SparkSession = SparkSession.builder().appName("Spark SQL basic example")
          .config("spark.some.config.option", "some-value").getOrCreate()

        import ss.implicits._
        //从文本文件中创建Person对象的RDD,将其转换为Dataframe
        val personDF: DataFrame = ss.sparkContext.textFile("c:/users/os/desktop/people.txt").map(_.split(","))
          .map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
        personDF.createOrReplaceTempView("person")

        val selectDF = ss.sql("select name,age from person where age between 30 and 50 ")
        selectDF.show()
        //结果中的列可以通过列索引来访问
        selectDF.map(person => "Name: "+ person(0) ).show()
        //也可通过字段名来访问
        selectDF.map(person=> "Name: " + person.getAs[String]("name")).show()

        implicit val mapEncoder: Encoder[Map[String, Any]] = org.apache.spark.sql.Encoders.kryo[Map[String,Any]]
        val maps: Array[Map[String, Any]] = selectDF.map(person => person.getValuesMap[Any](List("name","age"))).collect()
        println(maps.toList) //List(Map(name -> zhang san, age -> 42))
    }
    //定义一个类,数据结果的格式
case class Person(name:String,age:Long)

本文由【waitig】发表在等英博客
本文固定链接:sparkSQL操作结果集
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)