HBase项目实战系列(1) | Weibo项目简易版(附全代码)

2020-10-28 16:44:48 浏览数 (1)

1. 需求分析

1.微博内容的浏览,数据库表设计 2.用户社交体现:关注用户,取关用户 3.拉取关注的人的微博内容

微博表的分析

  • 1.
  • 2. 项目所需要的表

2. 代码实现

1. 代码设计总览

代码语言:javascript复制
// 一.创建
1.创建命名空间以及表名的定义
2.创建微博内容表
3.创建用户关系表
4.创建用户微博内容接收邮件表
代码语言:javascript复制
// 二. 测试
5.发布微博内容
6.添加关注用户
7.移除(取关)用户
8.获取关注的人的微博内容

2.

2. 创建项目及添加依赖

  • 创建项目结构如下图
  • 依赖
代码语言:javascript复制
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.3.1</version>
    </dependency>
    </dependencies>

3. 创建

  • 1. WeiboDao
代码语言:javascript复制
package com.buwenbuhuo.hbase.weibo.dao;

import com.buwenbuhuo.hbase.weibo.constant.Names;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author 卜温不火
 * @create 2020-05-13 21:25
 * com.buwenbuhuo.hbase.weibo.dao - the name of the target package where the new class or interface will be created.
 * weibo0513 - the name of the current project.
 */
public class WeiboDao {

    public static Connection connection = null;

    static {
        try {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "hadoop002,hadoop003,hadoop004");
            connection = ConnectionFactory.createConnection(conf);
        }catch (IOException e){
            e.printStackTrace();
        }

    }

    public void createNamespace(String namespace) throws IOException {

        Admin admin = connection.getAdmin();

        NamespaceDescriptor namespce = NamespaceDescriptor.create(namespace).build();

        admin.createNamespace(namespce);

        admin.close();

    }

    public void createTable(String tableName, String... families) throws IOException {

        // 因为下面的存在,此处可以省略
        createTable(tableName,1,families);
    }

    public void createTable(String tableName,Integer versions, String... families) throws IOException {
        Admin admin = connection.getAdmin();

        HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName));

        for (String family : families) {

            HColumnDescriptor familyDesc = new HColumnDescriptor(family);

            familyDesc.setMaxVersions(versions);

            table.addFamily(familyDesc);
        }

        admin.createTable(table);
        admin.close();
    }

    public void putCell(String tableName, String rowKey, String family, String column, String value) throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));

        Put put = new Put(Bytes.toBytes(rowKey));

        put.addColumn(Bytes.toBytes(family),Bytes.toBytes(column),Bytes.toBytes(value));

        table.put(put);

        table.close();

    }


    public List<String> getRowKeysByPrefix(String tableName, String prefix) throws IOException {

        ArrayList<String> list = new ArrayList<>();

        Table table = connection.getTable(TableName.valueOf(tableName));

        Scan scan = new Scan();
        
        scan.setRowPrefixFilter(Bytes.toBytes(prefix));

        ResultScanner scanner = table.getScanner(scan);

        for (Result result : scanner) {

            byte[] row = result.getRow();
            String rowKey = Bytes.toString(row);
            list.add(rowKey);
        }

        scanner.close();
        table.close();

        return  list;
    }

    public void putCells(String tableName, List<String> rowKeys, String family, String column, String value) throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));

        ArrayList<Put> puts = new ArrayList<>();

        // 遍历RowKeys
        for (String rowKey : rowKeys) {
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(family),Bytes.toBytes(column),Bytes.toBytes(value));
            puts.add(put);

        }

        table.put(puts);

        table.close();

    }

    public List<String> getRowKeysByRange(String tableName, String startRow, String stopRow) throws IOException {

        List<String> list = new ArrayList<>();

        Table table = connection.getTable(TableName.valueOf(tableName));

        Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));

        ResultScanner scanner = table.getScanner(scan);

        for (Result result : scanner) {

            byte[] row = result.getRow();
            String rowKey = Bytes.toString(row);
            list.add(rowKey);
        }

        scanner.close();
        table.close();

        return  list;
    }

    public void deleteRow(String tableName, String rowKey) throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));

        Delete delete = new Delete(Bytes.toBytes(rowKey));

        table.delete(delete);

        table.close();
    }

    public void deleteCells(String tableName, String rowKey, String family, String column) throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));

        Delete delete = new Delete(Bytes.toBytes(rowKey));

        delete.addColumns(Bytes.toBytes(family),Bytes.toBytes(column));

        table.delete(delete);

        table.close();

    }

    public List<String> getCellsByPrefix(String tableName, String prefix, String family, String column) throws IOException {

        List<String> list = new ArrayList<>();

        Table table = connection.getTable(TableName.valueOf(tableName));

        Scan scan = new Scan();

        scan.setRowPrefixFilter(Bytes.toBytes(prefix));

        scan.addColumn(Bytes.toBytes(family),Bytes.toBytes(column));

        ResultScanner scanner = table.getScanner(scan);

        for (Result result : scanner) {
            Cell[] cells = result.rawCells();
            list.add(Bytes.toString(CellUtil.cloneValue(cells[0])));
        }

        scanner.close();
        table.close();

        return list;
    }

    public List<String> getFamilyByRowKey(String tableName, String rowKey, String family) throws IOException {

        List<String> list = new ArrayList<>();

        Table table = connection.getTable(TableName.valueOf(tableName));

        Get get = new Get(Bytes.toBytes(rowKey));

        get.setMaxVersions(Names.INBOX_DATA_VERSIONS);

        get.addFamily(Bytes.toBytes(family));

        Result result = table.get(get);

        for (Cell cell : result.rawCells()) {
            list.add(Bytes.toString(CellUtil.cloneValue(cell)));
        }

        table.close();

        return list;
    }

    public List<String> getCellsByRowKey(String tableName, List<String> rowKeys, String family, String column) throws IOException {

        List<String> weibos = new ArrayList<>();

        Table table = connection.getTable(TableName.valueOf(tableName));

        List<Get> gets = new ArrayList<>();

        for (String rowKey : rowKeys) {
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addColumn(Bytes.toBytes(family),Bytes.toBytes(column));

            gets.add(get);
        }

        Result[] results = table.get(gets);

        for (Result result : results) {
            String weibo = Bytes.toString(CellUtil.cloneValue(result.rawCells()[0]));
            weibos.add(weibo);
        }

        table.close();

        return weibos;
    }
}
  • 2. WeiboService
代码语言:javascript复制
package com.buwenbuhuo.hbase.weibo.service;

import com.buwenbuhuo.hbase.weibo.constant.Names;
import com.buwenbuhuo.hbase.weibo.dao.WeiboDao;

import javax.naming.Name;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author 卜温不火
 * @create 2020-05-13 21:27
 * com.buwenbuhuo.hbase.weibo.service - the name of the target package where the new class or interface will be created.
 * weibo0513 - the name of the current project.
 */
public class WeiboService {

    private WeiboDao dao = new WeiboDao();

    public void init() throws IOException {

        //1) 创建命名空间以及表名的定义
        dao.createNamespace(Names.NAMESPACE_WEIBO);

        //2) 创建微博内容表
        dao.createTable(Names.TABLE_WEIBO,Names.WEIBO_FAMILY_DATA);

        //3) 创建用户关系表
        dao.createTable(Names.TABLE_RELATION,Names.RELATION_FAMILY_DATA);

        //4) 创建用户微博内容接收邮件表
        dao.createTable(Names.TABLE_INBOX,Names.INBOX_DATA_VERSIONS,Names.INBOX_FAMILY_DATA);

    }

    public void publish(String star, String content) throws IOException {

        // 1. 在weibo表中插入一条数据
        String rowKey = star   "_"   System.currentTimeMillis();
        dao.putCell(Names.TABLE_WEIBO,rowKey,Names.WEIBO_FAMILY_DATA,Names.WEIBO_COLUMN_CONTENT,content);

        // 2. 从relation表中获取star的所有fansID (默认有粉丝逻辑有些问题)
        String prefix = star ":followedby:";
        List<String> list = dao.getRowKeysByPrefix(Names.TABLE_RELATION,prefix);

        if (list.size()<= 0){
            return;
        }

        List<String> fansIds = new ArrayList<>();

        // 遍历
        for (String row : list) {
            String[] split = row.split(":");
            // 获取粉丝ID
            fansIds.add(split[2]);
        }


        // 3. 向所有fans的inbox中插入本条weibo的id
        // 循环调用 or 批量调用
        dao.putCells(Names.TABLE_INBOX,fansIds,Names.INBOX_FAMILY_DATA,star,rowKey);


    }

    public void follow(String fans, String star) throws IOException {

        // 1. 向relation表中插入两条数据
        String rowKey1 = fans   ":follow:"   star;
        String rowKey2 = star   ":followedby:"   fans;
        String time = System.currentTimeMillis()   "";
        dao.putCell(Names.TABLE_RELATION,rowKey1,Names.RELATION_FAMILY_DATA,Names.RELATION_COLUMN_TIME,time);
        dao.putCell(Names.TABLE_RELATION,rowKey2,Names.RELATION_FAMILY_DATA,Names.RELATION_COLUMN_TIME,time);


        // 2. 从weibo表中获取star的近期weibo

        // 拿取所有
        String startRow = star;
        String stopRow = star   "|";
        List<String> list = dao.getRowKeysByRange(Names.TABLE_WEIBO,startRow,stopRow);

        // 判断
        if (list.size() <= 0){
            return;
        }

        // 获取近期的weibo
        // 使用三元运算符进行判断
        int fromIndex = list.size() > Names.INBOX_DATA_VERSIONS?list.size()-Names.INBOX_DATA_VERSIONS:0;
        List<String> recentWeiboIds = list.subList(fromIndex, list.size());


        // 3. 向fans的inbox表中插入star的近期weiboId
        for (String recentWeiboId : recentWeiboIds) {
            dao.putCell(Names.TABLE_INBOX,fans, Names.INBOX_FAMILY_DATA,star,recentWeiboId);
        }


    }

    public void unFollow(String fans, String star) throws IOException {

        // 1. 删除relation表中的两条数据
        String rowKey1 = fans   ":follow:"   star;
        String rowKey2 = star   ":followedby:"   fans;
        dao.deleteRow(Names.TABLE_RELATION,rowKey1);
        dao.deleteRow(Names.TABLE_RELATION,rowKey2);


        // 2. 删除inbox表中的一列
        dao.deleteCells(Names.TABLE_INBOX,fans,Names.INBOX_FAMILY_DATA,star);


    }

    public List<String> getAllWeiboByUserId(String star) throws IOException {

        return dao.getCellsByPrefix(Names.TABLE_WEIBO,star,Names.WEIBO_FAMILY_DATA,Names.WEIBO_COLUMN_CONTENT);


    }

    public List<String> getAllRecentWeibos(String fans) throws IOException {

        // 1. 从inbox中获取fans的所有的star的近期weiboId
        List<String> list = dao.getFamilyByRowKey(Names.TABLE_INBOX,fans,Names.INBOX_FAMILY_DATA);


        // 2. 根据weiboID去weibo表中查询内容
        return dao.getCellsByRowKey(Names.TABLE_WEIBO,list,Names.WEIBO_FAMILY_DATA,Names.WEIBO_COLUMN_CONTENT);


    }
}
  • 3. Names
代码语言:javascript复制
package com.buwenbuhuo.hbase.weibo.constant;



/**
 * @author 卜温不火
 * @create 2020-05-13 23:28
 * com.buwenbuhuo.hbase.weibo.constant - the name of the target package where the new class or interface will be created.
 * weibo0513 - the name of the current project.
 */
public class Names {

    public final static String NAMESPACE_WEIBO = "weibo";

    public final static String TABLE_WEIBO = "weibo:weibo";
    public final static String TABLE_RELATION = "weibo:relation";
    public final static String TABLE_INBOX = "weibo:inbox";

    public final static String WEIBO_FAMILY_DATA = "data";
    public final static String RELATION_FAMILY_DATA = "data";
    public final static String INBOX_FAMILY_DATA = "data";

    public final static String WEIBO_COLUMN_CONTENT = "content";
    public final static String RELATION_COLUMN_TIME = "time";

    public final static Integer INBOX_DATA_VERSIONS = 3;

}
  • 4. WeiboController
代码语言:javascript复制
package com.buwenbuhuo.hbase.weibo.controller;

import com.buwenbuhuo.hbase.weibo.service.WeiboService;

import java.io.IOException;
import java.util.List;

/**
 * @author 卜温不火
 * @create 2020-05-13 21:27
 * com.buwenbuhuo.hbase.weibo.controller - the name of the target package where the new class or interface will be created.
 * weibo0513 - the name of the current project.
 */
public class WeiboController {

    private WeiboService service = new WeiboService();

    public void init() throws IOException {

        service.init();
    }



        //5) 发布微博内容
    public void publish(String star,String content) throws IOException {

        service.publish(star,content);

    }

        //6) 添加关注用户
    public void follow(String fans,String star) throws IOException {

        service.follow(fans,star);
    }


        //7) 移除(取关)用户
    public void unFollow(String fans,String star) throws IOException {

        service.unFollow(fans,star);
    }

        //8) 获取关注的人的微博内容
            // 8.1 获取某个明星的所有weibo

    public List<String> getAllWeibosByUserID(String star) throws IOException {

      return service.getAllWeiboByUserId(star);

    }

            // 8.2 获取关注的所有star的近期weibo
     public List<String> getAllRecentWeibos(String fans) throws IOException {

        return service.getAllRecentWeibos(fans);
     }



}
  • 5. WeiboAPP
代码语言:javascript复制
package com.buwenbuhuo.hbase.weibo;

import com.buwenbuhuo.hbase.weibo.controller.WeiboController;

import java.io.IOException;
import java.util.List;

/**
 * @author 卜温不火
 * @create 2020-05-13 21:24
 * com.buwenbuhuo.hbase.weibo - the name of the target package where the new class or interface will be created.
 * weibo0513 - the name of the current project.
 */
public class WeiboAPP {

    private static WeiboController controller = new WeiboController();

    public static void main(String[] args) throws IOException {

        // 1. 创建表的初始化
//        controller.init();


        // 2. 发微博(发五条微博)
//        controller.publish("buwenbuhuo","Happy 1");
//        controller.publish("buwenbuhuo","Happy 2");
//        controller.publish("buwenbuhuo","Happy 3");
//        controller.publish("buwenbuhuo","Happy 4");
//        controller.publish("buwenbuhuo","Happy 5");


        // 3. 关注微博
//        controller.follow("1002","buwenbuhuo");
//        controller.follow("1003","buwenbuhuo");


        // 4. 获取微博内容
        // 最新的消息(获取)
//       List<String> allRecentWeibos = controller.getAllRecentWeibos("1002");

        // 查看数据
//        for (String allRecentWeibo : allRecentWeibos) {
//            System.out.println(allRecentWeibo);
//        }


        // 5. 取关微博
//        controller.unFollow("1002","buwenbuhuo");
        // 查看数据
//        for (String allRecentWeibo : allRecentWeibos) {
//            System.out.println(allRecentWeibo);
//        }

        // 6. 获取某一个人的所有微博
//        List<String> allWeibosByUserID = controller.getAllWeibosByUserID("buwenbuhuo");
//
//        for (String s : allWeibosByUserID) {
//            System.out.println(s);
//        }


    }
}

4. 测试

  • 1. 建表的初始化
代码语言:javascript复制
        // 1. 创建表的初始化
        controller.init();
  • 2. 发微博
代码语言:javascript复制
        // 2. 发微博(发五条微博)
        controller.publish("buwenbuhuo","Happy 1");
        controller.publish("buwenbuhuo","Happy 2");
        controller.publish("buwenbuhuo","Happy 3");
        controller.publish("buwenbuhuo","Happy 4");
        controller.publish("buwenbuhuo","Happy 5");

// 查看weibo
hbase(main):002:0> scan 'weibo:weibo'
  • 3. 关注微博
代码语言:javascript复制
        // 3. 关注微博
        controller.follow("buwen","buwenbuhuo");
        controller.follow("buhuo","buwenbuhuo");

// 查看是否关注
hbase(main):004:0> scan 'weibo:relation'
  • 4. 获取微博内容
代码语言:javascript复制
        // 最新的消息(获取)
        List<String> allRecentWeibos = controller.getAllRecentWeibos("1002");

        // 查看数据
        for (String allRecentWeibo : allRecentWeibos) {
            System.out.println(allRecentWeibo);
        }
  • 5. 取关微博
代码语言:javascript复制
        controller.unFollow("1002","buwenbuhuo");
        List<String> allRecentWeibos = controller.getAllRecentWeibos("1002");
        // 查看数据
        for (String allRecentWeibo : allRecentWeibos) {
            System.out.println(allRecentWeibo);
        }


hbase(main):014:0> scan 'weibo:relation'
  • 6. 获取所有人的微博
代码语言:javascript复制
        // 6. 获取某一个人的所有微博
        List<String> allWeibosByUserID = controller.getAllWeibosByUserID("buwenbuhuo");

        for (String s : allWeibosByUserID) {
            System.out.println(s);
        }

  本次的分享就到这里了

0 人点赞