温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
本文将是Fayson最近写的R系列的最后一篇文章了,大家且看且珍惜吧。
无需额外花费过多的学习成本,sparklyr(https://spark.rstudio.com)可以让R用户很方便的利用Apache Spark的分布式计算能力。之前Fayson介绍了什么是sparklyr,大家知道R用户可以编写几乎相同的代码运行在Spark之上实现本地或者分布式计算。
spark_apply的架构 (来自 https://github.com/rstudio/sparklyr/pull/728)
从sparklyr0.6(https://blog.rstudio.com/2017/07/31/sparklyr-0-6/)开始,你就可以通过spark_apply()运行R代码在Spark集群之上。换句话说,你可以用R写UDF。这样可以让你用你最喜欢的R包来访问Spark里的数据,比如仅在R中实现的特定的统计分析方法,或者像NLP的高级分析,等等。
因为目前spark_apply()的实现需要在工作节点上也安装R环境,在这篇文章里,我们将介绍如何在CDH集群中运行spark_apply()。我们会介绍两种方法:1.使用Parcel。2)使用conda环境。
选项1:使用Parcel安装R环境
创建和分发R的Parcel
Parcel(https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ig_parcels.html)是一种二进制的分发格式,Cloudera Manager可以使用Parcel来分发CDH,Spark2,Kafka和需要运行在集群上的服务。Parcel很像.deb或者.rpm包。它可以让你通过Cloudera Manager的界面很容易的在CDH集群上安装特定的服务。使用这种方式的前提是CDH集群是使用Parcel方式安装的。
你可以到https://dl.bintray.com/chezou/Parcels/下载预先创建好的Parcel包,它包含了https://github.com/conda/conda-recipes/blob/master/r-packages/r-essentials/meta.yaml里的一些基础组件。注意你不能将Bintray直接设置为远程的Parcel仓库地址,所以你需要提前将你的Parcel上传到HTTP服务器。
如果你想使用其他的R包,可以使用R的Parcel构建工具。你可以用Docker创建你自己的Parcels,通过修改Dockerfile。
https://github.com/chezou/cloudera-parcel
将这些Parcels放到HTTP服务器或者指定的S3 bucket。然后你就可以在Cloudera Manager中添加Parcel的仓库地址。更多细节请参考:https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ig_parcels.html#cmug_topic_7_11_5
在Spark的工作节点上运行R代码
当分发完R的Parcel包以后,就可以在工作节点上运行R代码。
注意:因为存在环境变量配置的问题:https://github.com/rstudio/sparklyr/issues/915,所以目前只能使用sparklyr的upstreamversion。最新的sparklyr 0.6.1没有这个功能。
以下是一个分布式执行R代码的例子
https://github.com/chezou/sparklyr-distribute
devtools::install_github("rstudio/sparklyr") library(sparklyr) config <- spark_config() config[["spark.r.command"]]<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/bin/Rscript" config$sparklyr.apply.env.R_HOME <- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R" config$sparklyr.apply.env.RHOME <- "/opt/cloudera/parcels/CONDAR/lib/conda-R" config$sparklyr.apply.env.R_SHARE_DIR<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R/share" config$sparklyr.apply.env.R_INCLUDE_DIR<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R/include" sc <- spark_connect(master = "yarn-client",config = config) sdf_len(sc,5, repartition= 1) %>% spark_apply(function(e) I(e))r ## # Source: table<sparklyr_tmp_1cc757d61b8>[?? x 1] ## # Database: spark_connection ## id ## <dbl> ## 1 1 ## 2 2 ## 3 3 ## 4 4 ## 5 5
如果想要在分布式函数中使用R的包,sparklyr将这些包打包放在了本地的.libPaths(),然后使用SparkContext.addFile()函数将这些包分发到工作节点。如果是在spark_apply()中使用这些包则依赖于本地的代码,当然也可以按照下一个章节要介绍的使用Conda来分发他们。
更多官方文档资料: https://spark.rstudio.com/articles/guides-distributed-r.html#distributing-packages
选项2:使用conda环境
使用R创建conda环境
http://blog.cloudera.com/blog/2017/04/use-your-favorite-python-library-on-pyspark-cluster-with-cloudera-data-science-workbench/说明过conda虚拟环境可以打包R环境。
创建一个R的conda环境,使用zip压缩。
代码语言:javascript复制$ conda create -p ~/r_env --copy -y -q r-essentials -c r
# [Option] If you need additional package you can install as follows:
# $ source activate r_env
# $ Rscript -e 'install.packages(c("awesome-package"), lib = /home/cdsw/r_env/lib/R/library, dependencies = TRUE, repos="https://cran.r-project.org")'
# $ source deactivate
与使用Parcel的差异是环境变量的设置,需要将r_env.zip设置为环境变量。虽然这种方式很灵活,但是需要每次创建Spark连接时都分发zip文件。全部代码请参考: https://github.com/chezou/sparklyr-distribute/blob/master/dist_sparklyr_conda.r
代码语言:javascript复制config <- spark_config()
config[["spark.r.command"]] <- "./r_env.zip/r_env/bin/Rscript"
config[["spark.yarn.dist.archives"]] <- "r_env.zip"
config$sparklyr.apply.env.R_HOME <- "./r_env.zip/r_env/lib/R"
config$sparklyr.apply.env.RHOME <- "./r_env.zip/r_env"
config$sparklyr.apply.env.R_SHARE_DIR <- "./r_env.zip/r_env/lib/R/share"
config$sparklyr.apply.env.R_INCLUDE_DIR <- "./r_env.zip/r_env/lib/R/include"
然后你就可以在Spark的工作节点上运行R代码。
复杂的例子:使用spacyr做文本分析
注意:本版本目前不支持在spark_apply()中使用本地代码的R包。
在这个例子中,我们使用spacyr package(https://github.com/kbenoit/spacyr),这个包R绑定了spaCy(https://spacy.io),一个新的Python的NLP库。我们提取named-entity(https://en.wikipedia.org/wiki/Named-entity_recognition),比如person, place, organization等等,来自于Jane Austen’sbooks(https://www.rdocumentation.org/packages/janeaustenr/versions/0.1.5/topics/austen_books)。所有代码请参考:
https://github.com/chezou/spacyr-sparklyr
为spacyr准备conda环境
因为spacyr需要Python运行环境,在运行下面例子之前你需要安装Anaconda的Parcel。
使用https://github.com/chezou/spacyr-sparklyr/blob/master/install_spacy.sh这个脚本安装conda环境
用spark_apply()抽取named entities
Spark DataFrame有text的column,我们可以用下面的UDF抽取named entities
代码语言:javascript复制entities <- austen_tbl %>%
select(text) %>%
spark_apply(
function(e)
{
lapply(e, function(k) {
spacyr::spacy_initialize(python_executable="/opt/cloudera/parcels/Anaconda/bin/python")
parsedtxt <- spacyr::spacy_parse(as.character(k), lemma = FALSE)
spacyr::entity_extract(parsedtxt)
}
)
},
names = c("doc_id", "sentence_id", "entity", "entity_type"),
packages = FALSE)
entities %>% head(10) %>% collect()
## # A tibble: 10 x 4
## doc_id sentence_id entity entity_type
## <chr> <int> <chr> <chr>
## 1 text1 1 Jane Austen PERSON
## 2 text1 1 Dashwood ORG
## 3 text1 1 Sussex GPE
## 4 text1 2 Norland Park GPE
## 5 text1 4 Henry Dashwood PERSON
## 6 text1 4 Norland GPE
## 7 text1 5 Gentleman NORP
## 8 text1 7 Henry Dashwood PERSON
## 9 text1 8 Henry Dashwood PERSON
## 10 text1 11 Norland GPE
划分entity类型的数量
代码语言:javascript复制library(ggplot2)
p <- entities %>%
collect() %>%
ggplot(aes(x=factor(entity_type)))
p <- p scale_y_log10()
p geom_bar()
前十名看每本书的人
代码语言:javascript复制persons <- entities %>%
filter(entity_type == "PERSON") %>%
group_by(doc_id, entity) %>%
select(doc_id, entity) %>%
count() %>%
arrange(doc_id, desc(n))
persons %>%
filter(doc_id == "text1") %>%
head(10) %>%
collect()
## # A tibble: 10 x 3
## # Groups: doc_id, entity [10]
## doc_id entity n
## <chr> <chr> <dbl>
## 1 text1 Elinor 662
## 2 text1 Marianne 538
## 3 text1 Edward 235
## 4 text1 Jennings 233
## 5 text1 Willoughby 200
## 6 text1 Lucy 172
## 7 text1 Dashwood 159
## 8 text1 Ferrars 104
## 9 text1 Lady Middleton 80
## 10 text1 Palmer 74
我应该使用哪个选项?
一般来说,建议选择选项1,因为你不需要每次分发R的环境,而且构建包含所有包的Parcel节约了很多时间,而不用纠结于某一个包。目前,RStudio有OS(https://spark.rstudio.com/articles/guides-distributed-r.html#requirements)的限制,但你可以设置packages= FALSE来绕过因为OS不同的问题。
另外,选项2可以灵活的后面再安装包,但用起来会麻烦一些。如果只是仅仅使用R包,这个方法也行,但当你还想使用rJava等源生的扩展包的时候,则比较难准备好环境。
总结
本文主要是介绍了如何使用sparklyr在Spark工作节点上运行和分发R代码。因为spark_apply()方法需要在工作节点上安装R,我们介绍了两种方法可以让你在CDH集群和CDSW上运行spark_apply()。你可以根据你想要的进行选择。如果需要稳定,可以选择选项1:Parcel的方法。如果需要灵活,则可以选择选项2:conda环境。
不仅只是执行dplyr,同时你可以分发你本地的R代码到Spark集群。这样可以让你将你的R技能充分应用到分布式计算框架上。
参考
https://blog.cloudera.com/blog/2017/09/how-to-distribute-your-r-code-with-sparklyr-and-cdsw/
醉酒鞭名马,少年多浮夸! 岭南浣溪沙,呕吐酒肆下!挚友不肯放,数据玩的花! 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操