This website requires JavaScript.

SparkSQL实践

通过本文我们会学些使用SparkSQL数据框架连接JDBC数据源,如果你不太熟悉SparkSQL可以查看这篇文章Analyzing Crime Data using SparkSQL。

我们在Spark 1.3里面介绍了DataFrame API 。 Spark RDD是一个计算单元,他并不包含结构化数据的schema信息。而Dataframe组合RDD和Schema,这回带来极大好处。

数据准备

你可以从这里下载示例数据。

在MySQL中创建表:

mysql> create table StockPrices(date varchar(20),open float,
-> high float,low float,
-> close float,volume bigint,
-> adjclose float);
导入数据:
mysql> load data local infile '/root/sp500hst.txt' 
-> into table StockPrices
-> fields terminated by ','
-> lines terminated by '\n';
运行pyspark时加载MySQL connector

可以点此下载

pyspark --driver-class-path /usr/share/java/mysql-connector-java.jar
** 创建连接字符串**
url ="jdbc:mysql://localhost/spark?user=root&password=xxx"
stock_data=sqlContext.read.format("jdbc").option("url",url).option("dbtable","StockPrices").load()
**查看stock_data类型**
type(stock_data)

数据查询

我们有两种方式查询dataframe

1. 使用DataFrame方法

查看Schema

stock_data.printSchema()
查询symbols字段
date=stock_data.select("date")
date.show()
**2.将DataFrame注册成表**
stock_data.registerTempTable(“Stock”)
查询1
result=sqlContext.sql(“select max(volume) as  max from Stock where symbol=’A’”)
result.show()
查询2
result=sqlContext.sql(“select max(volume) as max from Stock group by symbol”)
result.show()
原文地址:https://acadgild.com/blog/integrating-sparksql-with-mysql/
0条评论
avatar