在Cloudera Hadoop中使用SparkR[译]
Spark
2020-01-15
570
0
部门新入职一位精通R语言的小伙伴,会用到SparkR,但是默认情况下Cloudera Hadoop 是不支持SparkR的。本来打算编译CDH官方的Spark源码然后替换掉自带版本,但总感觉会给自己挖坑,因此决定另外安装一个Spark环境连接Yarn执行SparkR程序,本文是对How to use SparkR in Cloudera Hadoop一文的翻译,并加入了一些自己的内容。
各工作节点安装R
yum install R -y
若要为R安装包,请在命令行输入R
进入R的交互式命令行,然后输入install.packages('magrittr')
就可以安装包
建立Gateway
需要有一台机器是Yarn的gateway节点,用来连接群集提交任务
下载Spark
下载一个Spark到你gateway的home目录,这个很简单,具体可以参考这篇文章。最爽一点就是我们没有必要下载与CDH相匹配Spark版本;在我们群集中CDH版本是5.8,对应的Spark为1.6.0,我打算下载Spark 2.0的Hadoop 2.6预编译版本,以下是我gateway的home目录
[root@chd-components ~]# ll
drwxrwxr-x 2 root root 4096 6月 10 2015 R
-rw-r--r-- 1 root root 5326536 5月 18 16:53 sp500hst.txt
drwxr-xr-x 12 zabbix zabbix 4096 7月 20 05:28 spark-2.0.0-bin-hadoop2.6
-rw-rw-r-- 1 root root 184354523 7月 27 11:48 spark-2.0.0-bin-hadoop2.6.tgz
drwxrwxr-x 3 root root 4096 3月 3 2016 sqljdbc_4.0
drwxrwxr-x 2 root root 4096 9月 9 18:42 sqoop
本地运行SparkR脚本
接着我们会用RStudio在本地运行SparkR脚本. 这里有一个简单的脚本,从HDFS上读取一个CSV文件,并且打印第一行元素(细节如下):
Sys.setenv(HADOOP_CONF_DIR='/etc/hadoop/conf.cloudera.hdfs')
Sys.setenv(YARN_CONF_DIR='/etc/hadoop/conf.cloudera.yarn')
library(SparkR, lib.loc = "/home/ctsats/spark-1.6.1-bin-hadoop2.6/R/lib")
library(magrittr)
sc <- sparkR.init(sparkHome = "/home/ctsats/spark-1.6.1-bin-hadoop2.6",
sparkPackages="com.databricks:spark-csv_2.10:1.4.0")
sqlContext <- sparkRSQL.init(sc)
df <- read.df(sqlContext, 'recsys/data/interactions_clean.csv', "com.databricks.spark.csv", header="true")
df %>% head %>% print
sparkR.stop()
我们来看一下每一行代码:
- 第一第二行设置Hadoop环境变量参数(具体可以问一下你们群集的管理员)
- 第四行,我们加载
SparkR
包,填写它在本机的路径 - 第七、八行为Spark context的初始配置,提供了sparkHome目录,第八行我们加载了Spark的扩展包
spark-csv
- 第十行,我们初始化
sqlContext
,我们将使用Spark dataframes - 第十二行,我们使用
spark-csv
包读取位于HDFS目录中的 interactions_clean.csv文件。结果的Spark dataframe 为df
. - 第十三行,我们用R的
magrittr
包把结果通过管道输出(第一行数据和表头) - 第十五行,程序结束。
interactions_clean.csv
我自己随便建了一个,内容如下:
user_id,item_id,interaction_type
1,7,1006839
2,9944146,3
3,91053485,1
4,132444782,1
5,23501722,1
6,23305844,1
在命令行中运行前面的R脚本,如果一些顺利会显示CSV文件的内容:
Rscript R/sparkr1.6.2-test.R
提交SparkR应用至YARN执行
现在我们要将SparkR应用放在YARN上执行,首先修改以下脚本,保存为parkr-submit_test.R
library(SparkR, lib.loc = "/home/ctsats/spark-1.6.1-bin-hadoop2.6/R/lib")
library(magrittr)
sc <- sparkR.init(appName = 'SparkR-submit-test',
sparkEnvir = list(spark.executor.cores='2', # STRINGS here for spark-submit!!!
spark.executor.instances='12'))
sqlContext <- sparkRSQL.init(sc)
df <- read.df(sqlContext, 'recsys/data/interactions_clean.csv', "com.databricks.spark.csv", header="true")
df %>% head %>% print
sparkR.stop()
我们脚本改变了以下几个地方:
- 我们没有在脚本中设置环境变量
HADOOP_CONF_DIR
和YARN_CONF_DIR
- 没有声明附加的Spark包,如 spark-csv
- 4-6行囊括了Spark的一些配置
让我们把脚本提交到YARN中吧:
1.设置必要的环境变量 2.将我们的工作目录切换到Spark命令所在的目录中,我们例子中为spark-1.6.1-bin-hadoop2.6/bin
3.当前目录下(./
)执行spark-submit 4.把spark-csv包放到命令行参数中。
export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.hdfs
export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn
cd spark-1.6.1-bin-hadoop2.6/bin/
./spark-submit --master yarn-client --driver-memory 2g --packages com.databricks:spark-csv_2.10:1.4.0 ~/R/sparkr-submit_test.R
正如你所见的,我们不仅把Job提交到了YARN上,而且还用了与CDH自带版本不同的Spark 。
另外请注意,我们并不需要在所有节点上都安装SparkR
和magrittr
,只有R是需要在每个工作节点安装的。
还有一点,其实其他程序,诸如PySpark等也可以按照这个方式执行。