前言
因为在做网站案例的时候,想学习如何处理海量数据,所以想接触大数据相关的知识,之前对于大数据的了解,仅仅是停留在知道Hadoop,Hive,HBase,Spark的层面上,所以如何学习确实对我造成了困扰,所幸我所在公司部门存在大数据开发,于是不断的请教大佬,大佬说第一步是学一点Linux和Java,然后Hadoop,再然后......。再然后就先不说了,对于Linux和Java我这里可以直接跨过了,然后就是学Hadoop。这周利用散碎的时间,学会了Hadoop的安装使用,使用Java写一个Hadoop任务。安装这里我就不说了,大家可以去网上搜索,或者来我的网站文章模块看我如何安装(Mac): 网址:www.study-java.cn来看一下(需要打开微信小程序:每天学Java进行扫码登录)
引入依赖
代码语言:javascript复制 <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
我在网上看很多人都引用了hadoop-core,但是由于我引用报错,所以我去除了,大家根据自己需要决定是否补上这个依赖
操作文件
引入依赖之后,我们使用Java可以简单的操作Hadoop的文件系统了。比如增加,删除某些文件/文件夹。操作Hadoop文件系统是我们去了解Hadoop必不可少的一步,下面的代码替代了我们在命令行敲写Hadoop命令的过程,但是安装Hadoop后,我们还是需要去数据Hadoop的常用命令,不然下面代码的作用可能有点不好理解。
代码语言:javascript复制 public static void main(String[] args) {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(new URI("hdfs://localhost:9000/"), new Configuration(), "chenlong");
//是否存在
if (fileSystem.exists(new Path("/test2"))) {
//强制删除
fileSystem.delete(new Path("/test2"), true);
}
//创建
fileSystem.mkdirs(new Path("/test2"));
//将本地文件放入到工作目录下,是否删除本地,是否覆盖已经存在的数据,本地路径,hadoop路径
fileSystem.copyFromLocalFile(false, false, new Path("/Users/chenlong/Documents/xcx/dream/web-project/email-project/src/main/resources/view/log/log.json"), new Path("/test2"));
//fileSystem.copyFromLocalFile();
fileSystem.close();
} catch (Exception e) {
e.printStackTrace();
//Not implemented by the DistributedFileSystem FileSystem implementation -- 引入了hadoop-core 的问题
}
}
}
编写MapReduce任务
对于Hadoop文件系统比较熟悉后,那么就来写一个任务去提交,我这里编写的任务是为了统计Json文件中不同URL的访问数量,Json格式如下:
代码语言:javascript复制 {
"url":"/view/screen",
"desc":"请求项目案例->接口数据",
"name": "每天学Java",
"email":"4****6@qq.com",
"date":"1573721558826"
}
编写Hadoop任务第一步是编写自己的Mapper和Reduce。
代码语言:javascript复制//Mapper
public static class ApiMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
}
//Reduce
public static class ApiReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
}
上面<LongWritable, Text, Text, IntWritable>含义是<输入键、输入值、输出键、输出值> LongWritable可以理解为Long,Text可以理解为String。什么叫输入键和输入值呢?笔者这样理解的,我们在提交文件到Hadoop任务的时候,Hadoop会读取文件(默认一行一行的读),然后将文件读取为K-V的形式,然后调用mapper的map方法(每读取一行,就分配到一个Mapper上,由于Hadoop大多数都是集群,所有一个文件会有很有mapper参与进来) 而输出键和输出值就好理解了,这个操作是我们将输入值和输出值经过一些逻辑衍生而成的。
需要注意的时候Reduce的输入键和输入值需要是Mapper的输出键、输出值。
完整Mapper和Reduce。这里我重写了run方法,因为默认按行读取,这样会将我的Json打散,所以我一次读取然后分发到map方法中,其实这里可以优化,就是判断是否组装成json,如果是就分发,不是继续组装,这样就不用读取完文件后在调用map方法。
代码语言:javascript复制 // 输入键、输入值、输出键、输出值
public static class ApiMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//重写Run方法,修改每读取一行就调用Mapper的格式
@Override
public void run(Context context) throws IOException, InterruptedException {
super.setup(context);
String json = "";
try {
while (context.nextKeyValue()) {
json = context.getCurrentValue();
}
Text text = new Text();
text.set(json);
map(context.getCurrentKey(), text, context);
} finally {
this.cleanup(context);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
JSONArray jsonArray = JSON.parseArray(value.toString());
System.out.println(jsonArray.size());
for (Object o : jsonArray) {
JSONObject jsonObject = (JSONObject) o;
word.set(jsonObject.getString("url"));
context.write(word, one);
}
}
}
//使用Mapper的输出作为输入,所以键要取mappere的输出
public static class ApiReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum = val.get();
}
result.set(sum);
context.write(key, result);
}
}
写完Mapper和Reduce就是写启动函数了:
代码语言:javascript复制 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//删除输出目录
FileSystem.get(new URI("hdfs://localhost:9000/"), new Configuration(), "chenlong")
.delete(new Path("/output"), true);
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "Api Request Num");
job.setJarByClass(ApiCount.class);
job.setMapperClass(ApiMapper.class);
job.setCombinerClass(ApiReduce.class);
job.setReducerClass(ApiReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入文件new Path(args[0])
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/test2/log.json"));
//输出文件new Path(args[1])
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
boolean temp = job.waitForCompletion(true);
if (temp) {
System.out.println("处理完成");
} else {
System.out.println("处理失败");
}
}
如此便完成了json日志分析,统计出不同的URL请求数量。