discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现

2018-03-27 10:52:28 浏览数 (1)

about云discuz论坛apache日志hadoop大数据分析项目: 数据时如何导入hbase与hive的到了这里项目的基本核心功能已经完成。这里介绍一下hive以及hbase是如何入库以及代码实现。 首先我们将hbase与hive整合,详细参考 about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的 about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的 整合完毕,我们就可以通过mapreduce把数据导入hbase,当然在导入hbase的同时,hive数据同时也可以查询出结果。 那么我们是如何导入hbase的,思路前面已经介绍,这里采用的是hbase put。以后的版本中,我们将采用多种方法来实现此功能包括hive分区、hbase后面如果遇到问题,我们可能还会重构。 开发环境介绍: 1.Eclipse 2.Hadoop2.2 3.hbase-0.98.3-hadoop2 思路: 在导入hbase的过程中,我们直接使用了mapreduce中的map函数,reduce在这里对我们没有太大的用处,我们这里借助的是mapreduce的分布式,提高查询效率。 mapreduce中map函数主要实现了哪些功能 1.清洗数据 通过

  1. public static void StringResolves(String line, Context context)

函数实现 2.数据的导入 通过public static void addData(String rowKey, String tableName, String[] column1, String[] value1, Context context) 函数实现

下面贴上代码: HbaseMain.java代码

代码语言:javascript复制
package www.aboutyun.com;

import java.io.IOException;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class HbaseMain {



       static final String INPUT_PATH = "hdfs://master:8020/test.txt";

       static final String OUT_PATH = "hdfs://master:8020/Output";



       public static void main(String[] args) throws IOException,

                       InterruptedException, ClassNotFoundException {



               // 主类

               Configuration conf = new Configuration();

               Job job = Job.getInstance(conf, HbaseMain.class.getSimpleName());

               job.setJarByClass(HbaseMain.class);

               // 寻找输入

               FileInputFormat.setInputPaths(job, INPUT_PATH);

               // 1.2对输入数据进行格式化处理的类

               job.setInputFormatClass(TextInputFormat.class);

               job.setMapperClass(HbaseMap.class);

               // 1.2指定map输出类型<key,value>类型

               job.setMapOutputKeyClass(Text.class);

               job.setMapOutputValueClass(LongWritable.class);

               job.setNumReduceTasks(0);

               // 指定输出路径

               FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

 

               job.waitForCompletion(true);



       }

}


HbaseMap.java代码

代码语言:javascript复制
package www.aboutyun.com;



import java.io.IOException;

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Locale;

import java.util.Random;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;



public class HbaseMap extends Mapper<LongWritable, Text, Text, IntWritable> {

       private static Configuration conf = null;

       /**

        * 初始化配置

        */



       static {

               conf = HBaseConfiguration.create();

               conf.set("hbase.zookeeper.quorum", "master");// 使用eclipse时必须添加这个,否则无法定位

               conf.set("hbase.zookeeper.property.clientPort", "2181");

       }



       /**************************************************************************/

       public void map(LongWritable key, Text line, Context context)

                       throws IOException, InterruptedException {



               try {

                       StringResolves(line.toString(), context);

               } catch (ParseException e) {

                       // TODO Auto-generated catch block

                       e.printStackTrace();

               }



       }



       /**************************************************************************/

       // 字符串解析



       public static void StringResolves(String line, Context context)

                       throws ParseException {

               String ipField, dateField, urlField, browserField;



               // 获取ip地址

               ipField = line.split("- -")[0].trim();



               // 获取时间,并转换格式

               int getTimeFirst = line.indexOf("[");

               int getTimeLast = line.indexOf("]");

               String time = line.substring(getTimeFirst   1, getTimeLast).trim();

               Date dt = null;

               DateFormat df1 = DateFormat.getDateTimeInstance(DateFormat.LONG,

                               DateFormat.LONG);

               dt = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US)

                               .parse(time);

               dateField = df1.format(dt);

               SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHMM");

               String dateField1 = sdf.format(dt);

               // 获取url

               String[] getUrl = line.split(""");



               String firtGeturl = getUrl[1].substring(3).trim();



               String secondGeturl = getUrl[3].trim();

               urlField = firtGeturl   "分隔符"   secondGeturl;



               // 获取浏览器

               String[] getBrowse = line.split(""");

               String strBrowse = getBrowse[5].toString();

               String str = "(KHTML, like Gecko)";

               int i = strBrowse.indexOf(str);

               strBrowse = strBrowse.substring(i);

               String strBrowse1[] = strBrowse.split("\/");

               strBrowse = strBrowse1[0].toString();

               String strBrowse2[] = strBrowse.split("\)");

               browserField = strBrowse2[1].trim();



               // 添加到数据库



               String rowKey = ipField   dateField1   urlField

                                 new Random().nextInt();

               String[] cols = new String[] { "IpAddress", "AccressTime", "Url",

                               "UserBrowser", };

               String[] colsValue = new String[] { ipField, dateField, urlField,

                               browserField };



               try {

                       addData(rowKey, "LogTable", cols, colsValue, context);

                       context.write(new Text("1"), new IntWritable(1));



               } catch (IOException | InterruptedException e) {

                       // TODO Auto-generated catch block

                       e.printStackTrace();

               }

       }



       /*

        * 为表添加数据(适合知道有多少列族的固定表)

        * 

        * @rowKey rowKey

        * 

        * @tableName 表名

        * 

        * @column1 第一个列族列表

        * 

        * @value1 第一个列的值的列表

        */

       public static void addData(String rowKey, String tableName,

                       String[] column1, String[] value1, Context context)

                       throws IOException {



               Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey

               HTable table = new HTable(conf, Bytes.toBytes(tableName));// HTabel负责跟记录相关的操作如增删改查等//

                                                                                                                                       // 获取表

               HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族

                               .getColumnFamilies();



               for (int i = 0; i < columnFamilies.length; i  ) {

                       String familyName = columnFamilies[i].getNameAsString(); // 获取列族名

                       if (familyName.equals("Info")) { // info列族put数据

                               for (int j = 0; j < column1.length; j  ) {

                                       put.add(Bytes.toBytes(familyName),

                                                       Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));

                               }

                       }



               }

               table.put(put);

               // context.write(new Text(rowKey), null);

               System.out.println("add data Success!");

       }



}




后面我们将会不断完善此功能。

上面的一些准备工作,就不要说了,这里展现一下运行后的效果:
hive效果图

Hbase效果图

这样就达到了效果。后面我们使用hive统计,然后通过将统计结果展示,项目基本完成,后面就不断完善即可。

0 人点赞