RHadoop是R支持Hadoop大数据分析和处理提供的算法包合集。传统统计学主要关注样本数据(小数据集)的分析,可能忽略发生概率极小单导致不确定性的结果。当数据量大到一台机器无法处理时,只能求助于超算或者Hadoop这样的可扩展方案。Hadoop是最流行的一种开源可扩展大数据处理基础架构,基于集群并行数据存储和计算。RHadoop主要包含五个算法包:
- rmr:R-MapReduce交互接口,我们只需关注map和reduce函数。
- rhdfs:R-HDFS交互接口,访问HDFS的数据。
- rhbase:R-HBase的交互接口,操纵存储在HBase中的表格。
- plyrmr:MapReduce的高级抽象,支持勒plyr语法实现常规数据操作。
- ravro:读写avro文件,与HDFS数据交换。
准备RHadoop环境
使用这个虚拟机啦,这个公司好像已经停止提供相应镜像了,找到一个书中提到的mapr的。https://package.mapr.com/releases/v6.1.0/sandbox/MapR-Sandbox-For-Hadoop-6.1.0.ova 下载Vmware,然后导入这个虚拟机,试用就够了。用户名和密码都是maprInstalling the Sandbox on VirtualBox (hpe.com)这里安装吃了很多苦呀,折腾了几个晚上,终于发现conda安装是最省事的,这个包系列已经6年没有更新了,对新python的兼容性很差,难道hadoop已经衰落,还是工业环境只要稳定能用就好,不需要再更新呢?
代码语言:javascript复制# 虚拟机已经有端口转发,直接连接 ssh mapr@localhost -p 2222
su
mapr
yum install R-core -y
R
# 安装这里走了许多弯路,发现最简单的方式是conda,简直万能的,节约很多时间
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
source /root/miniconda3/bin/activate
conda install r-rmr2 r-devtools
conda install -c mndrake r-rhdfs -y
vi /root/mapr/word.txt
i
123
:wq
# 解决两个java环境相互冲突的问题
Error: package or namespace load failed for ‘rJava’:
.onLoad failed in loadNamespace() for 'rJava', details:
call: dyn.load(file, DLLpath = DLLpath, ...)
error: unable to load shared object '/usr/lib64/R/library/rJava/libs/rJava.so':
libjvm.so: cannot open shared object file: No such file or directory
vi /root/.bashrc
i
export JAVA_HOME=/root/miniconda3
:wq
source /root/.bashrc
12.5 在rhdfs中操作HDFS
代码语言:javascript复制source /root/miniconda3/bin/activate
R
# 以下两个变量可以放入.rprofile文件,这样就不需要每次运行了。
Sys.setenv(HADOOP_CMD='/usr/bin/hadoop')
Sys.setenv(HADOOP_STREAMING='/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0-mapr-1808.jar')
library(rhdfs)
hdfs.init()
# 复制文件到HDFS
hdfs.put('word.txt', './')
[1] TRUE
# 查看
hdfs.ls('./')
permission owner group size modtime file
1 drwxr-xr-x mapr mapr 1 2018-10-23 17:55 /user/mapr/tmp
2 -rwxr-xr-x mapr mapr 4 2021-12-15 10:01 /user/mapr/word.txt
hdfs.copy('word.txt', 'wordcnt.txt')
[1] TRUE
hdfs.move('wordcnt.txt','./data/wordcnt.txt')
[1] TRUE
hdfs.delete('./data')
Deleted maprfs:///user/mapr/data
[1] TRUE
hdfs.rm('wordcnt.txt')
Deleted maprfs:///user/mapr/wordcnt.txt
[1] TRUE
# 下载
hdfs.get('word.txt', 'test.txt')
[1] TRUE
hdfs.rename('word.txt','2.txt')
[1] TRUE
hdfs.chmod('2.txt', permissions='777')
hdfs.file.info('./')
perms isDir block replication owner group size modtime
1 rwxr-xr-x TRUE 268435456 1 mapr mapr 2 53926-08-07 05:11:00
path
1 ./
# 写HDFS
f <- hdfs.file('iris.txt', 'w')
data(iris)
hdfs.write(iris,f)
hdfs.close(f)
[1] TRUE
# 读HDFS
f <- hdfs.file('iris.txt', 'r')
dfserialized <- hdfs.read(f)
df <- unserialize(dfserialized)
head(df)
head(df)
Sepal.Length Sepal.Width Petal.Length Petal.Width Species
1 5.1 3.5 1.4 0.2 setosa
2 4.9 3.0 1.4 0.2 setosa
3 4.7 3.2 1.3 0.2 setosa
4 4.6 3.1 1.5 0.2 setosa
5 5.0 3.6 1.4 0.2 setosa
6 5.4 3.9 1.7 0.4 setosa
寻找streaming可以用以下命令:
代码语言:javascript复制locate streaming|grep jar |more
/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0-mapr-1808.jar
/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/tools/sources/hadoop-streaming-2.7.0-mapr-1808-sources.jar
/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/tools/sources/hadoop-streaming-2.7.0-mapr-1808-test-sources.jar
/opt/mapr/hive/hive-2.3/hcatalog/share/hcatalog/hive-hcatalog-streaming-2.3.3-mapr-1808.jar
/opt/mapr/oozie/oozie-4.3.0/lib/oozie-sharelib-streaming-4.3.0-mapr-1808.jar
/opt/mapr/oozie/oozie-4.3.0/oozie-server/webapps/oozie/WEB-INF/lib/oozie-sharelib-streaming-4.3.0-mapr-1808.jar
/opt/mapr/oozie/oozie-4.3.0/share/lib/mapreduce-streaming/commons-io-2.4.jar
/opt/mapr/oozie/oozie-4.3.0/share/lib/mapreduce-streaming/hadoop-streaming-2.7.0-mapr-1808.jar
/opt/mapr/oozie/oozie-4.3.0/share/lib/mapreduce-streaming/oozie-sharelib-streaming-4.3.0-mapr-1808.jar
/opt/mapr/oozie/oozie-4.3.0/share/lib/spark/spark-streaming-kafka-0-9_2.11-2.3.1-mapr-1808.jar
/opt/mapr/oozie/oozie-4.3.0/share/lib/spark/spark-streaming_2.11-2.3.1-mapr-1808.jar
/opt/mapr/pig/pig-0.16/test/e2e/pig/lib/hadoop-0.23.0-streaming.jar
/opt/mapr/pig/pig-0.16/test/e2e/pig/lib/hadoop-streaming.jar
/opt/mapr/spark/spark-2.3.1/jars/spark-streaming-kafka-0-9_2.11-2.3.1-mapr-1808.jar
/opt/mapr/spark/spark-2.3.1/jars/spark-streaming_2.11-2.3.1-mapr-1808.jar
# 另外,如果想操作更方便,可以用rstudio-server,虚拟机要配置相应端口转发
wget https://download2.rstudio.org/server/centos8/x86_64/rstudio-server-rhel-2021.09.1-372-x86_64.rpm
sudo yum install rstudio-server-rhel-2021.09.1-372-x86_64.rpm
12.6 RHadoop中解决单词计数问题
代码语言:javascript复制# 准备数据
https://gitee.com/zd200572/ml_R_cookbook.git
library(rmr2)
hdfs.init()
21/12/15 10:27:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
# 创建目录并放入文件
hdfs.mkdir("/user/mapr/wordcount/data")
[1] TRUE
hdfs.put("ml_R_cookbook/CH12/wc_input.txt", '/user/mapr/wordcount/data')
[1] TRUE
# map函数
map <- function(.,lines){ keyval(
unlist(
strsplit(
x <- lines,
split <- " " )
),
1)}
# reduce函数
reduce <- function(word, counts) {
keyval(word, sum(counts))
}
hdfs.root <- 'wordcount'
hdfs.data <- file.path(hdfs.root, 'data')
hdfs.out <- file.path(hdfs.root, 'out2')
wordcounts <- function(input, output=NULL){
mapreduce(input=input, output=output, input.format="text",
map=map, reduce=reduce)
}
out <- wordcounts(hdfs.data, hdfs.out)
hdfs.remove('')
results <- from.dfs(out)
results$key[order(results$eval), decreasing=TRUE][1:10]
# 调用map完成单词计数
# 还是报错,这个错就hold不住啦,看资源还凑活充足呢,或许硬件要求高!
21/12/18 01:13:05 INFO mapreduce.Job: Counters: 19
Job Counters
Failed map tasks=7
Killed map tasks=1
Killed reduce tasks=1
Launched map tasks=8
Other local map tasks=6
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=209232
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=52308
Total time spent by all reduce tasks (ms)=0
Total vcore-seconds taken by all map tasks=52308
Total vcore-seconds taken by all reduce tasks=0
Total megabyte-seconds taken by all map tasks=53563392
Total megabyte-seconds taken by all reduce tasks=0
DISK_MILLIS_MAPS=26157
DISK_MILLIS_REDUCES=0
Map-Reduce Framework
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
21/12/18 01:13:05 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!
Error in mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, :
hadoop streaming failed with error code 1
hadoop监控
深深地感受到了大数据的门槛还是挺高的,特别是好些软件不够完善,对新手不够友好的情况下,还需要大量的计算资源做支撑。如果没法实践,那就理解下具体过程吧:MapReduce程序被分成map和reduce两部分,map函数先使用strsplit函数将一行分成单词,然后unlist函数将结果处理成字符向量,最后返回键值组合,reduce函数计算各自子任务计数的总和并返回每个单词出现次数的和。mapreduce函数提交任务,需要4个输入参数,HDFS输入路径、HDFS输出路径,map函数和reduce函数。
12.7 比较R MapReduce函数和标准R程序的性能差别
代码语言:javascript复制rmr.options(backend='local')
a.time <- proc.time()
small.ints2 <- 1:100000
result.normal <- sapply(small.ints2, function(x) x^2)
proc.time() -a.time
user system elapsed
0.163 0.007 0.176
b.time <- proc.time()
small.ints <- to.dfs(1:100000)
result <- mapreduce(input=small.ints, map=function(k,v) cbind(v,v^2))
proc.time()-b.time
user system elapsed
0.931 0.083 1.022
时间差了10倍呢,得到了一个计算时间的方法。这是本地模式运行,所以速度快了点呢,如果分布模式下,要花费几分钟以上了。可以发现,任务不大的情况下,MapReduce方法要完成几十秒的任务也需要几分钟,原因是需要花费一定时间用于启动系统服务、协调不同进程间的任务,从每个节点读取数据。因此,如果我们可以把数据全部放到内存中,就应该采用标准R程序来解决问题,如果数据太大,才可以选择MapReduce,否则应该是“大炮打苍蝇”了吧!
12.8 测试和调试rmr2程序
代码语言:javascript复制rmr.options(backend='local')
b.time <- proc.time()
small.ints <- to.dfs(1:100000)
result <- mapreduce(input=small.ints, map=function(k,v) cbind(v,v^2))
proc.time()-b.time
result <- mapreduce(to.dfs(1), map=function(k,v) rmr.str(v))
Dotted pair list of 14
$ : language mapreduce(to.dfs(1), map = function(k, v) rmr.str(v))
$ : language mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, in.folder = if (is.list(input)) { lapply| __truncated__ ...
$ : language c.keyval(do.call(c, lapply(in.folder, function(fname) { kv = get.data(fname) ...
$ : language do.call(c, lapply(in.folder, function(fname) { kv = get.data(fname) ...
$ : language lapply(in.folder, function(fname) { kv = get.data(fname) ...
$ : language FUN(X[[i]], ...)
$ : language unname(tapply(1:lkv, ceiling((1:lkv)/(lkv/(object.size(kv)/10^6))), function(r) { kvr = slice.keyval(kv, r) ...
$ : language tapply(1:lkv, ceiling((1:lkv)/(lkv/(object.size(kv)/10^6))), function(r) { kvr = slice.keyval(kv, r) ...
$ : language lapply(X = ans[index], FUN = FUN, ...)
$ : language FUN(X[[i]], ...)
$ : language as.keyval(map(keys(kvr), values(kvr)))
$ : language is.keyval(x)
$ : language map(keys(kvr), values(kvr))
$ : language rmr.str(v)
..- attr(*, "srcref")= 'srcref' int [1:8] 1 36 1 59 36 59 1 1
.. ..- attr(*, "srcfile")=Classes 'srcfilecopy', 'srcfile' <environment: 0x7fe5cde38970>
v
num 1
这是本地模式运行,所以速度快了点呢,如果分布模式下,要花费几分钟以上了。
12.10 使用plyrmr处理数据
rmr2包写mapreduce程序已经相比原生简单多了,但相对一个非程序员难度依然很大,plyrmr包是MapReduce的较高抽象。
代码语言:javascript复制 yum install libxml2-devel curl-devel -y
conda install -c r r-pryr
install.packages(c("RCurl","httr"), dependencies=TRUE)
install.packages(c("R.methodsS3","hydroPSO"), dependencies=TRUE)
发现这个包的安装巨难,怎么尝试都没成功呢,就到这了。
12.11 RHadoop中实施机器学习
代码语言:javascript复制library(MASS)
data(cats)
X <- matrix(cats$Bwt)
Y <- matrix(cats$Hwt)
model <- lm(Y ~ X)
summary(model)
Call:
lm(formula = Y ~ X)
Residuals:
Min 1Q Median 3Q Max
-3.5694 -0.9634 -0.0921 1.0426 5.1238
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) -0.3567 0.6923 -0.515 0.607
X 4.0341 0.2503 16.119 <2e-16 ***
---
Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
Residual standard error: 1.452 on 142 degrees of freedom
Multiple R-squared: 0.6466, Adjusted R-squared: 0.6441
F-statistic: 259.8 on 1 and 142 DF, p-value: < 2.2e-16
library(MASS)
data(cats)
X <- matrix(cats$Bwt)
Y <- matrix(cats$Hwt)
model <- lm(Y ~ X)
summary(model)
library(rmr2)
rmr.options(backend='local')
X <- matrix(cats$Bwt)
X.index <- to.dfs(cbind(1:nrow(X), X))
Y <- as.matrix(cats$Hwt)
Sum <- function(., YY) keyval(1, list(Reduce(' ', YY)))
XtX <- values(
from.dfs(
mapreduce(
input <- X.index,
map <- function(., Xi){
Xi <- Xi[,-1]
keyval(1,list(t(Xi) %*% Xi))},
reduce <- Sum, combine =TRUE)))[[1]]
XtY <- values(
from.dfs(
mapreduce(
input <- X.index,
map <- function(., Xi){
Yi <- Y[Xi[,1],]
Xi <- Xi[,-1]
keyval(1,list(t(Xi) %*% Yi))},
reduce <- Sum, combine =TRUE)))[[1]]
solve(XtX, XtY)
这里依然报错,没有得出结果,不过,根据书中的结果,MapReduce方法的系数据比lm模型得到的更不准确。。。,而且差距还是有点的,3.907113 Vs 4.0341 但是如果数据集大致内存无法放下,就无其他选择了。后面内容就省略了,awz的云应该暂时用不到。