RDD转DataFrame的两种方法

介绍一下Spark将RDD转换成DataFrame的两种方式。

  1. 通过是使用case class的方式,不过在scala 2.10中最大支持22个字段的case class,这点需要注意
  2. 是通过spark内部的StructType方式,将普通的RDD转换成DataFrame 装换成DataFrame后,就可以使用SparkSQL来进行数据筛选过滤等操作

方法一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

//需要提前知道列名及类型
case class People(var name: String,var age:Int)
object DataFrameReflection {
def main(args:Array[String]):Unit = {
val conf = new SparkConf().setMaster("local").setAppName("DataFrameReflection")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._
val peopelRDD: RDD[People] = sc.textFile("people.txt")
.map(line => People(line.split(",")(0),line.split(",")(1).trim.toInt))

val df = peopelRDD.toDF()
df.createOrReplaceTempView("people")
sqlContext.sql("select * from people").show()
}
}

方法二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object DataFrameProgrammatically {
def main(args:Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("DataFrameProgrammatically")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

//读取文件
val rdd: RDD[String] = sc.textFile("people.txt")

/**
* 得到 rowRDD
*/
val rowRDD: RDD[Row] = rdd.map(line => {
val fields = line.split(",")
Row(fields(0), fields(1).trim.toInt)
})

/**
* 得到structType
*/
val structType = StructType(
StructField("name",StringType,true) ::
StructField("age",IntegerType,true) :: Nil
)
/**
* rowRDD:RDD[Row]
* schema: StructType
*/
val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
df.createOrReplaceTempView("people")
sqlContext.sql("select * from people").show()
//
// /**
// * 官网schema实现方法
// */
// val schemaString = "name age"
// val fields = schemaString.split(" ")
// .map(fieldName => StructField(
// fieldName,StringType,nullable = true
// ))
// val schema = StructType(fields)

}
}
0%