对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口,同时对MapReduce处理好的数据利用Hive实现数据的基本统计。 设计要求:
- 根据数据特征,设计一个任务场景,利用MapReduce编程实现数据的清洗和预处理。(10分)
- 利用HDFS的JavaAPI编写程序将原始数据和预处理后的数据上传到分布式文件系统
数据集: 链接:https://pan.baidu.com/s/1rnUJn5ld45HpLhzbwYIM1A
代码语言:javascript复制package com.company.HDFS;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class step0 {
final static String INPUT_PATH="hdfs://192.168.88.100/data";
final static String OUTPUT_PATH="hdfs://192.168.88.100/output";
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration configuration = new Configuration();
FileSystem fileSystem =FileSystem.get(new URI(INPUT_PATH),configuration);
if (fileSystem.exists(new Path(OUTPUT_PATH))) {
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
Job job = new Job(configuration,"step0");
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
job.setJarByClass(step0.class);
job.setMapperClass(ReMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(ReReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
}
public static class ReReducer extends Reducer<Text,NullWritable, Text,NullWritable> {
private IntWritable result = new IntWritable();
public ReReducer() {
}
protected void reduce(Text key2, Iterable<NullWritable> value2, Reducer<Text,NullWritable, Text,NullWritable>.Context context) throws IOException, InterruptedException {
context.write(key2,NullWritable.get());
}
}
public static class ReMapper extends Mapper<LongWritable, Text, Text,NullWritable> {
private static final int FAIL_DATA=9999;
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
}
代码语言:javascript复制package com.company.HDFS;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class step1 {
/**
* 查看 所有文件
*/
@Test
public void demo_03() {
try {
//1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root");
// 2 获取文件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus status = listFiles.next();
// 输出详情
// 文件名称
System.out.println(status.getPath().getName());
// 长度
System.out.println(status.getLen());
// 权限
System.out.println(status.getPermission());
// 分组
System.out.println(status.getGroup());
// 获取存储的块信息
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取块存储的主机节点
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("-----------分割线----------");
}
// 3 关闭资源
fs.close();
} catch (Exception ex) {
}
}
/**
* 上传
*/
@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {
// 1 获取文件系统
Configuration configuration = new Configuration();
configuration.set("dfs.replication", "2");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root");
// 2 上传文件
fs.copyFromLocalFile(new Path("J:\the_efforts_paid_offf\HDFS_HBase_HiveApi\src\main\java\com\company\datas\iris.data"), new Path("hdfs://192.168.88.100/input"));
// 3 关闭资源
fs.close();
System.out.println("over");
}
}
代码语言:javascript复制package com.company.HDFS;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import java.io.IOException;
/**
* @author ChinaManor
* #Description hbase的javaAPI
* #Date: 2021/12/19 18:10
*/
public class step2 {
/**
* @Description: createTable():创建表的方法
* @Param: 0
* @return: 0
*/
@Test
public void createTable() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node1:2181");
//建立连接
Connection conn = ConnectionFactory.createConnection(conf);
//获取表的管理类
Admin admin = conn.getAdmin();
//定义表
HTableDescriptor hTableDescriptor=new HTableDescriptor(TableName.valueOf("demo"));
//定义列簇
HColumnDescriptor hColumnDescriptor =new HColumnDescriptor("info");
//讲列簇定义到表中
hTableDescriptor.addFamily(hColumnDescriptor);
//执行建表操作
admin.createTable(hTableDescriptor);
admin.close();
conn.close();
}
/**
* @Description: 向Hbase中插入数据的方法
* @Param: null
* @return: null
*/
@Test
public void put(){
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181");
try {
//建立连接
Connection conn= ConnectionFactory.createConnection(conf);
//获取表
Table table=conn.getTable(TableName.valueOf("demo"));
//用行键实例化put
Put put= new Put("rk001".getBytes());
//指定列簇名,列名,和值
put.addColumn("info".getBytes(),"name".getBytes(),"zhangsan".getBytes());
table.put(put);
table.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @Description: scan()查询一个表的所有信息
* @Param: 1
* @return: 1
*/
@Test
public void scan() throws IOException {
Configuration conf=HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node1:2181");
//建立连接
Connection conn=ConnectionFactory.createConnection(conf);
//获取表
Table table=conn.getTable(TableName.valueOf("demo"));
//初始化Scan实例
Scan scan=new Scan();
//增加过滤条件
scan.addColumn("info".getBytes(), "name".getBytes());
//返回结果
ResultScanner rss=table.getScanner(scan);
//迭代并取出结果
for(Result rs:rss){
String valStr=Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
System.out.println(valStr);
}
//关闭连接
table.close();
conn.close();
}
/**
* @Description: delete()删除表中的信息
* @Param: 1
* @return: 1
*/
@Test
public void delete() throws IOException {
Configuration conf=HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node1:2181");
//建立连接
Connection conn=ConnectionFactory.createConnection(conf);
//获取表
Table table=conn.getTable(TableName.valueOf("demo"));
// 用行键来实例化Delete实例
Delete del = new Delete("rk0001".getBytes());
// 执行删除
table.delete(del);
//关闭连接
table.close();
conn.close();
}
}