SparkSQL已编程模式指定Schema

云计算 waitig 513℃ 百度已收录 0评论
   def test5(): Unit ={
        val ss: SparkSession = SparkSession.builder().appName("Spark SQL basic example")
          .config("spark.some.config.option", "some-value").getOrCreate()

        import ss.implicits._
        import org.apache.spark.sql.types._
        val personRDD: RDD[String] = ss.sparkContext.textFile("c:/users/os/desktop/people.txt")

        //指定编码模式
        val schemaString = "name age"
        val fields: Array[StructField] = schemaString.split(" ").map(fileName =>StructField(fileName,StringType,nullable = true))
        val schema: StructType = StructType(fields)

        //将RDD(person)的记录转换为行
        val rowRDD: RDD[Row] = personRDD.map(_.split(",")).map(attribute=>Row(attribute(0),attribute(1).trim))

        //将模式应用于RDD
        val peopleDF: DataFrame = ss.createDataFrame(rowRDD,schema)

        //用DataFrame创建一个临时表
        peopleDF.createOrReplaceTempView("person")

        val result = ss.sql("select * from person")
        result.map(attribute=>"Name: "+ attribute(0)).show()
    }
case class Person(name:String,age:Long)

本文由【waitig】发表在等英博客
本文固定链接:SparkSQL已编程模式指定Schema
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)