This website requires JavaScript.

在Cloudera Hadoop中使用SparkR[译]

部门新入职一位精通R语言的小伙伴,会用到SparkR,但是默认情况下Cloudera Hadoop 是不支持SparkR的。本来打算编译CDH官方的Spark源码然后替换掉自带版本,但总感觉会给自己挖坑,因此决定另外安装一个Spark环境连接Yarn执行SparkR程序,本文是对How to use SparkR in Cloudera Hadoop一文的翻译,并加入了一些自己的内容。

各工作节点安装R

yum install R -y

若要为R安装包,请在命令行输入R进入R的交互式命令行,然后输入install.packages('magrittr') 就可以安装包

建立Gateway

需要有一台机器是Yarn的gateway节点,用来连接群集提交任务

下载Spark

下载一个Spark到你gateway的home目录,这个很简单,具体可以参考这篇文章。最爽一点就是我们没有必要下载与CDH相匹配Spark版本;在我们群集中CDH版本是5.8,对应的Spark为1.6.0,我打算下载Spark 2.0的Hadoop 2.6预编译版本,以下是我gateway的home目录

[root@chd-components ~]# ll
drwxrwxr-x   2 root   root        4096 6月  10 2015 R
-rw-r--r--   1 root   root     5326536 5月  18 16:53 sp500hst.txt
drwxr-xr-x  12 zabbix zabbix      4096 7月  20 05:28 spark-2.0.0-bin-hadoop2.6
-rw-rw-r--   1 root   root   184354523 7月  27 11:48 spark-2.0.0-bin-hadoop2.6.tgz
drwxrwxr-x   3 root   root        4096 3月   3 2016 sqljdbc_4.0
drwxrwxr-x   2 root   root        4096 9月   9 18:42 sqoop

本地运行SparkR脚本

接着我们会用RStudio在本地运行SparkR脚本. 这里有一个简单的脚本,从HDFS上读取一个CSV文件,并且打印第一行元素(细节如下):

Sys.setenv(HADOOP_CONF_DIR='/etc/hadoop/conf.cloudera.hdfs')
Sys.setenv(YARN_CONF_DIR='/etc/hadoop/conf.cloudera.yarn')
 
library(SparkR, lib.loc = "/home/ctsats/spark-1.6.1-bin-hadoop2.6/R/lib")
library(magrittr)
 
sc <- sparkR.init(sparkHome = "/home/ctsats/spark-1.6.1-bin-hadoop2.6",
                  sparkPackages="com.databricks:spark-csv_2.10:1.4.0")
 
sqlContext <- sparkRSQL.init(sc)
 
df <- read.df(sqlContext, 'recsys/data/interactions_clean.csv', "com.databricks.spark.csv", header="true") 
df %>% head %>% print
 
sparkR.stop()

我们来看一下每一行代码:

  • 第一第二行设置Hadoop环境变量参数(具体可以问一下你们群集的管理员)
  • 第四行,我们加载SparkR包,填写它在本机的路径
  • 第七、八行为Spark context的初始配置,提供了sparkHome目录,第八行我们加载了Spark的扩展包 spark-csv
  • 第十行,我们初始化sqlContext,我们将使用Spark dataframes
  • 第十二行,我们使用spark-csv包读取位于HDFS目录中的 interactions_clean.csv文件。结果的Spark dataframe 为df.
  • 第十三行,我们用R的magrittr包把结果通过管道输出(第一行数据和表头)
  • 第十五行,程序结束。

interactions_clean.csv我自己随便建了一个,内容如下:

user_id,item_id,interaction_type
1,7,1006839
2,9944146,3
3,91053485,1
4,132444782,1
5,23501722,1
6,23305844,1

在命令行中运行前面的R脚本,如果一些顺利会显示CSV文件的内容:

Rscript R/sparkr1.6.2-test.R

提交SparkR应用至YARN执行

现在我们要将SparkR应用放在YARN上执行,首先修改以下脚本,保存为parkr-submit_test.R

library(SparkR, lib.loc = "/home/ctsats/spark-1.6.1-bin-hadoop2.6/R/lib")
library(magrittr)
 
sc <- sparkR.init(appName = 'SparkR-submit-test',
                  sparkEnvir = list(spark.executor.cores='2',          # STRINGS here for spark-submit!!!
                                    spark.executor.instances='12'))
                 
sqlContext <- sparkRSQL.init(sc)
 
df <- read.df(sqlContext, 'recsys/data/interactions_clean.csv', "com.databricks.spark.csv", header="true")
df %>% head %>% print
 
sparkR.stop()

我们脚本改变了以下几个地方:

  • 我们没有在脚本中设置环境变量HADOOP_CONF_DIRYARN_CONF_DIR
  • 没有声明附加的Spark包,如 spark-csv
  • 4-6行囊括了Spark的一些配置

让我们把脚本提交到YARN中吧:

1.设置必要的环境变量 2.将我们的工作目录切换到Spark命令所在的目录中,我们例子中为spark-1.6.1-bin-hadoop2.6/bin 3.当前目录下(./)执行spark-submit 4.把spark-csv包放到命令行参数中。

export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.hdfs
export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn
cd spark-1.6.1-bin-hadoop2.6/bin/
./spark-submit --master yarn-client --driver-memory 2g --packages com.databricks:spark-csv_2.10:1.4.0 ~/R/sparkr-submit_test.R

正如你所见的,我们不仅把Job提交到了YARN上,而且还用了与CDH自带版本不同的Spark 。

另外请注意,我们并不需要在所有节点上都安装SparkRmagrittr,只有R是需要在每个工作节点安装的。

还有一点,其实其他程序,诸如PySpark等也可以按照这个方式执行。

0条评论
avatar