1. 需求分析
1.微博内容的浏览,数据库表设计 2.用户社交体现:关注用户,取关用户 3.拉取关注的人的微博内容
微博表的分析
- 1.
- 2. 项目所需要的表
2. 代码实现
1. 代码设计总览
代码语言:javascript复制// 一.创建
1.创建命名空间以及表名的定义
2.创建微博内容表
3.创建用户关系表
4.创建用户微博内容接收邮件表
代码语言:javascript复制// 二. 测试
5.发布微博内容
6.添加关注用户
7.移除(取关)用户
8.获取关注的人的微博内容
2.
2. 创建项目及添加依赖
- 创建项目结构如下图
- 依赖
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
3. 创建
- 1. WeiboDao
package com.buwenbuhuo.hbase.weibo.dao;
import com.buwenbuhuo.hbase.weibo.constant.Names;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author 卜温不火
* @create 2020-05-13 21:25
* com.buwenbuhuo.hbase.weibo.dao - the name of the target package where the new class or interface will be created.
* weibo0513 - the name of the current project.
*/
public class WeiboDao {
public static Connection connection = null;
static {
try {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop002,hadoop003,hadoop004");
connection = ConnectionFactory.createConnection(conf);
}catch (IOException e){
e.printStackTrace();
}
}
public void createNamespace(String namespace) throws IOException {
Admin admin = connection.getAdmin();
NamespaceDescriptor namespce = NamespaceDescriptor.create(namespace).build();
admin.createNamespace(namespce);
admin.close();
}
public void createTable(String tableName, String... families) throws IOException {
// 因为下面的存在,此处可以省略
createTable(tableName,1,families);
}
public void createTable(String tableName,Integer versions, String... families) throws IOException {
Admin admin = connection.getAdmin();
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName));
for (String family : families) {
HColumnDescriptor familyDesc = new HColumnDescriptor(family);
familyDesc.setMaxVersions(versions);
table.addFamily(familyDesc);
}
admin.createTable(table);
admin.close();
}
public void putCell(String tableName, String rowKey, String family, String column, String value) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family),Bytes.toBytes(column),Bytes.toBytes(value));
table.put(put);
table.close();
}
public List<String> getRowKeysByPrefix(String tableName, String prefix) throws IOException {
ArrayList<String> list = new ArrayList<>();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes(prefix));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
byte[] row = result.getRow();
String rowKey = Bytes.toString(row);
list.add(rowKey);
}
scanner.close();
table.close();
return list;
}
public void putCells(String tableName, List<String> rowKeys, String family, String column, String value) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
ArrayList<Put> puts = new ArrayList<>();
// 遍历RowKeys
for (String rowKey : rowKeys) {
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family),Bytes.toBytes(column),Bytes.toBytes(value));
puts.add(put);
}
table.put(puts);
table.close();
}
public List<String> getRowKeysByRange(String tableName, String startRow, String stopRow) throws IOException {
List<String> list = new ArrayList<>();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
byte[] row = result.getRow();
String rowKey = Bytes.toString(row);
list.add(rowKey);
}
scanner.close();
table.close();
return list;
}
public void deleteRow(String tableName, String rowKey) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
table.close();
}
public void deleteCells(String tableName, String rowKey, String family, String column) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumns(Bytes.toBytes(family),Bytes.toBytes(column));
table.delete(delete);
table.close();
}
public List<String> getCellsByPrefix(String tableName, String prefix, String family, String column) throws IOException {
List<String> list = new ArrayList<>();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes(prefix));
scan.addColumn(Bytes.toBytes(family),Bytes.toBytes(column));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
list.add(Bytes.toString(CellUtil.cloneValue(cells[0])));
}
scanner.close();
table.close();
return list;
}
public List<String> getFamilyByRowKey(String tableName, String rowKey, String family) throws IOException {
List<String> list = new ArrayList<>();
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.setMaxVersions(Names.INBOX_DATA_VERSIONS);
get.addFamily(Bytes.toBytes(family));
Result result = table.get(get);
for (Cell cell : result.rawCells()) {
list.add(Bytes.toString(CellUtil.cloneValue(cell)));
}
table.close();
return list;
}
public List<String> getCellsByRowKey(String tableName, List<String> rowKeys, String family, String column) throws IOException {
List<String> weibos = new ArrayList<>();
Table table = connection.getTable(TableName.valueOf(tableName));
List<Get> gets = new ArrayList<>();
for (String rowKey : rowKeys) {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family),Bytes.toBytes(column));
gets.add(get);
}
Result[] results = table.get(gets);
for (Result result : results) {
String weibo = Bytes.toString(CellUtil.cloneValue(result.rawCells()[0]));
weibos.add(weibo);
}
table.close();
return weibos;
}
}
- 2. WeiboService
package com.buwenbuhuo.hbase.weibo.service;
import com.buwenbuhuo.hbase.weibo.constant.Names;
import com.buwenbuhuo.hbase.weibo.dao.WeiboDao;
import javax.naming.Name;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author 卜温不火
* @create 2020-05-13 21:27
* com.buwenbuhuo.hbase.weibo.service - the name of the target package where the new class or interface will be created.
* weibo0513 - the name of the current project.
*/
public class WeiboService {
private WeiboDao dao = new WeiboDao();
public void init() throws IOException {
//1) 创建命名空间以及表名的定义
dao.createNamespace(Names.NAMESPACE_WEIBO);
//2) 创建微博内容表
dao.createTable(Names.TABLE_WEIBO,Names.WEIBO_FAMILY_DATA);
//3) 创建用户关系表
dao.createTable(Names.TABLE_RELATION,Names.RELATION_FAMILY_DATA);
//4) 创建用户微博内容接收邮件表
dao.createTable(Names.TABLE_INBOX,Names.INBOX_DATA_VERSIONS,Names.INBOX_FAMILY_DATA);
}
public void publish(String star, String content) throws IOException {
// 1. 在weibo表中插入一条数据
String rowKey = star "_" System.currentTimeMillis();
dao.putCell(Names.TABLE_WEIBO,rowKey,Names.WEIBO_FAMILY_DATA,Names.WEIBO_COLUMN_CONTENT,content);
// 2. 从relation表中获取star的所有fansID (默认有粉丝逻辑有些问题)
String prefix = star ":followedby:";
List<String> list = dao.getRowKeysByPrefix(Names.TABLE_RELATION,prefix);
if (list.size()<= 0){
return;
}
List<String> fansIds = new ArrayList<>();
// 遍历
for (String row : list) {
String[] split = row.split(":");
// 获取粉丝ID
fansIds.add(split[2]);
}
// 3. 向所有fans的inbox中插入本条weibo的id
// 循环调用 or 批量调用
dao.putCells(Names.TABLE_INBOX,fansIds,Names.INBOX_FAMILY_DATA,star,rowKey);
}
public void follow(String fans, String star) throws IOException {
// 1. 向relation表中插入两条数据
String rowKey1 = fans ":follow:" star;
String rowKey2 = star ":followedby:" fans;
String time = System.currentTimeMillis() "";
dao.putCell(Names.TABLE_RELATION,rowKey1,Names.RELATION_FAMILY_DATA,Names.RELATION_COLUMN_TIME,time);
dao.putCell(Names.TABLE_RELATION,rowKey2,Names.RELATION_FAMILY_DATA,Names.RELATION_COLUMN_TIME,time);
// 2. 从weibo表中获取star的近期weibo
// 拿取所有
String startRow = star;
String stopRow = star "|";
List<String> list = dao.getRowKeysByRange(Names.TABLE_WEIBO,startRow,stopRow);
// 判断
if (list.size() <= 0){
return;
}
// 获取近期的weibo
// 使用三元运算符进行判断
int fromIndex = list.size() > Names.INBOX_DATA_VERSIONS?list.size()-Names.INBOX_DATA_VERSIONS:0;
List<String> recentWeiboIds = list.subList(fromIndex, list.size());
// 3. 向fans的inbox表中插入star的近期weiboId
for (String recentWeiboId : recentWeiboIds) {
dao.putCell(Names.TABLE_INBOX,fans, Names.INBOX_FAMILY_DATA,star,recentWeiboId);
}
}
public void unFollow(String fans, String star) throws IOException {
// 1. 删除relation表中的两条数据
String rowKey1 = fans ":follow:" star;
String rowKey2 = star ":followedby:" fans;
dao.deleteRow(Names.TABLE_RELATION,rowKey1);
dao.deleteRow(Names.TABLE_RELATION,rowKey2);
// 2. 删除inbox表中的一列
dao.deleteCells(Names.TABLE_INBOX,fans,Names.INBOX_FAMILY_DATA,star);
}
public List<String> getAllWeiboByUserId(String star) throws IOException {
return dao.getCellsByPrefix(Names.TABLE_WEIBO,star,Names.WEIBO_FAMILY_DATA,Names.WEIBO_COLUMN_CONTENT);
}
public List<String> getAllRecentWeibos(String fans) throws IOException {
// 1. 从inbox中获取fans的所有的star的近期weiboId
List<String> list = dao.getFamilyByRowKey(Names.TABLE_INBOX,fans,Names.INBOX_FAMILY_DATA);
// 2. 根据weiboID去weibo表中查询内容
return dao.getCellsByRowKey(Names.TABLE_WEIBO,list,Names.WEIBO_FAMILY_DATA,Names.WEIBO_COLUMN_CONTENT);
}
}
- 3. Names
package com.buwenbuhuo.hbase.weibo.constant;
/**
* @author 卜温不火
* @create 2020-05-13 23:28
* com.buwenbuhuo.hbase.weibo.constant - the name of the target package where the new class or interface will be created.
* weibo0513 - the name of the current project.
*/
public class Names {
public final static String NAMESPACE_WEIBO = "weibo";
public final static String TABLE_WEIBO = "weibo:weibo";
public final static String TABLE_RELATION = "weibo:relation";
public final static String TABLE_INBOX = "weibo:inbox";
public final static String WEIBO_FAMILY_DATA = "data";
public final static String RELATION_FAMILY_DATA = "data";
public final static String INBOX_FAMILY_DATA = "data";
public final static String WEIBO_COLUMN_CONTENT = "content";
public final static String RELATION_COLUMN_TIME = "time";
public final static Integer INBOX_DATA_VERSIONS = 3;
}
- 4. WeiboController
package com.buwenbuhuo.hbase.weibo.controller;
import com.buwenbuhuo.hbase.weibo.service.WeiboService;
import java.io.IOException;
import java.util.List;
/**
* @author 卜温不火
* @create 2020-05-13 21:27
* com.buwenbuhuo.hbase.weibo.controller - the name of the target package where the new class or interface will be created.
* weibo0513 - the name of the current project.
*/
public class WeiboController {
private WeiboService service = new WeiboService();
public void init() throws IOException {
service.init();
}
//5) 发布微博内容
public void publish(String star,String content) throws IOException {
service.publish(star,content);
}
//6) 添加关注用户
public void follow(String fans,String star) throws IOException {
service.follow(fans,star);
}
//7) 移除(取关)用户
public void unFollow(String fans,String star) throws IOException {
service.unFollow(fans,star);
}
//8) 获取关注的人的微博内容
// 8.1 获取某个明星的所有weibo
public List<String> getAllWeibosByUserID(String star) throws IOException {
return service.getAllWeiboByUserId(star);
}
// 8.2 获取关注的所有star的近期weibo
public List<String> getAllRecentWeibos(String fans) throws IOException {
return service.getAllRecentWeibos(fans);
}
}
- 5. WeiboAPP
package com.buwenbuhuo.hbase.weibo;
import com.buwenbuhuo.hbase.weibo.controller.WeiboController;
import java.io.IOException;
import java.util.List;
/**
* @author 卜温不火
* @create 2020-05-13 21:24
* com.buwenbuhuo.hbase.weibo - the name of the target package where the new class or interface will be created.
* weibo0513 - the name of the current project.
*/
public class WeiboAPP {
private static WeiboController controller = new WeiboController();
public static void main(String[] args) throws IOException {
// 1. 创建表的初始化
// controller.init();
// 2. 发微博(发五条微博)
// controller.publish("buwenbuhuo","Happy 1");
// controller.publish("buwenbuhuo","Happy 2");
// controller.publish("buwenbuhuo","Happy 3");
// controller.publish("buwenbuhuo","Happy 4");
// controller.publish("buwenbuhuo","Happy 5");
// 3. 关注微博
// controller.follow("1002","buwenbuhuo");
// controller.follow("1003","buwenbuhuo");
// 4. 获取微博内容
// 最新的消息(获取)
// List<String> allRecentWeibos = controller.getAllRecentWeibos("1002");
// 查看数据
// for (String allRecentWeibo : allRecentWeibos) {
// System.out.println(allRecentWeibo);
// }
// 5. 取关微博
// controller.unFollow("1002","buwenbuhuo");
// 查看数据
// for (String allRecentWeibo : allRecentWeibos) {
// System.out.println(allRecentWeibo);
// }
// 6. 获取某一个人的所有微博
// List<String> allWeibosByUserID = controller.getAllWeibosByUserID("buwenbuhuo");
//
// for (String s : allWeibosByUserID) {
// System.out.println(s);
// }
}
}
4. 测试
- 1. 建表的初始化
// 1. 创建表的初始化
controller.init();
- 2. 发微博
// 2. 发微博(发五条微博)
controller.publish("buwenbuhuo","Happy 1");
controller.publish("buwenbuhuo","Happy 2");
controller.publish("buwenbuhuo","Happy 3");
controller.publish("buwenbuhuo","Happy 4");
controller.publish("buwenbuhuo","Happy 5");
// 查看weibo
hbase(main):002:0> scan 'weibo:weibo'
- 3. 关注微博
// 3. 关注微博
controller.follow("buwen","buwenbuhuo");
controller.follow("buhuo","buwenbuhuo");
// 查看是否关注
hbase(main):004:0> scan 'weibo:relation'
- 4. 获取微博内容
// 最新的消息(获取)
List<String> allRecentWeibos = controller.getAllRecentWeibos("1002");
// 查看数据
for (String allRecentWeibo : allRecentWeibos) {
System.out.println(allRecentWeibo);
}
- 5. 取关微博
controller.unFollow("1002","buwenbuhuo");
List<String> allRecentWeibos = controller.getAllRecentWeibos("1002");
// 查看数据
for (String allRecentWeibo : allRecentWeibos) {
System.out.println(allRecentWeibo);
}
hbase(main):014:0> scan 'weibo:relation'
- 6. 获取所有人的微博
// 6. 获取某一个人的所有微博
List<String> allWeibosByUserID = controller.getAllWeibosByUserID("buwenbuhuo");
for (String s : allWeibosByUserID) {
System.out.println(s);
}
本次的分享就到这里了