使用Spark提取嵌套Json数据
Spark
2020-01-15
1224
0
JSON是一种常用的数据存储方式。但是JSON解析起来还是比较麻烦的,这里是通过Spark DataFrames处理嵌套Json的一些例子(Spark 版本为 1.6.0)
原文:HOW TO EXTRACT NESTED JSON DATA IN SPARK
样本文件 sample.json
{
"user": "gT35Hhhre9m",
"dates": ["2016-01-29", "2016-01-28"],
"status": "OK",
"reason": "some reason",
"content": [{
"foo": 123,
"bar": "val1"
}, {
"foo": 456,
"bar": "val2"
}, {
"foo": 789,
"bar": "val3"
}, {
"foo": 124,
"bar": "val4"
}, {
"foo": 126,
"bar": "val5"
}]
}
假设你已经建立了一个SQLContext对象,下面的例子会给你演示怎么解析嵌套的Json文件。
将JSON文件载入 Spark DataFrame
scala> val df = sqlContext.read.json("sample.json")
df: org.apache.spark.sql.DataFrame = [content: array<struct<bar:string,foo:bigint>>, dates: array<string>, reason: string, status: string, user: string]
//output
df.show
+--------------------+--------------------+-----------+------+-----------+
| content| dates| reason|status| user|
+--------------------+--------------------+-----------+------+-----------+
|[[val1,123], [val...|[2016-01-29, 2016...|some reason| OK|gT35Hhhre9m|
+--------------------+--------------------+-----------+------+-----------+
在上面的输出中我们看到“content”字段包含结构体数组,“dates”字段包含整形数组。我们要做的第一步是通过explode
函数把数据抽取并转换到新的DataFrame中。
抽取“dates”然后放入新的DataFrame
//explode dates field
scala> val dfDates = df.select(explode(df("dates")))
//output
dfDates.show
+----------+
| col|
+----------+
|2016-01-29|
|2016-01-28|
+----------+
//rename "col" to "dates"
scala> val dfDates = df.select(explode(df("dates"))).toDF("dates")
//output
dfDates.show
+----------+
| dates|
+----------+
|2016-01-29|
|2016-01-28|
+----------+
我们“content”字段包含了结构化数组,要访问这些数据我们需要使用点操作符。
抽取结构化数据数据
//explode content field
scala> val dfContent = df.select(explode(df("content")))
dfContent: org.apache.spark.sql.DataFrame = [col: struct<bar:string,foo:bigint>]
//output
scala> dfContent.show
+----------+
| col|
+----------+
|[val1,123]|
|[val2,456]|
|[val3,789]|
|[val4,124]|
|[val5,126]|
+----------+
//rename "col" to "content"
scala> val dfContent = df.select(explode(df("content"))).toDF("content")
dfContent: org.apache.spark.sql.DataFrame = [content: struct<bar:string,foo:bigint>]
//output
scala> dfContent.show
+----------+
| content|
+----------+
|[val1,123]|
|[val2,456]|
|[val3,789]|
|[val4,124]|
|[val5,126]|
+----------+
//extracting fields in struct
scala> val dfFooBar = dfContent.select("content.foo", "content.bar")
dfFooBar: org.apache.spark.sql.DataFrame = [foo: bigint, bar: string]
//output
scala> dfFooBar.show
+---+----+
|foo| bar|
+---+----+
|123|val1|
|456|val2|
|789|val3|
|124|val4|
|126|val5|
+---+----+
参考
0条评论