08:离线分析:Hbase表设计及构建
目标:掌握Hbase表的设计及创建表的实现
路径
- step1:基础设计
- step2:Rowkey设计
- step3:分区设计
- step4:建表
实施
基础设计
- Namespace:MOMO_CHAT
- Table:MOMO_MSG
- Family:C1
- Qualifier:与数据中字段名保持一致
Rowkey设计
查询需求:根据发件人id 收件人id 消息日期 查询聊天记录
- 发件人账号
- 收件人账号
- 时间
设计规则:业务、唯一、长度、散列、组合
设计实现
- 加盐方案:CRC、Hash、MD5、MUR
- => 8位、16位、32位
MD5Hash【发件人账号_收件人账号_消息时间 =》 8位】_发件人账号_收件人账号_消息时间
分区设计
- Rowkey前缀:MD5编码,由字母和数字构成
- 数据并发量:高
- 分区设计:使用HexSplit16进制划分多个分区
建表
- 启动Hbase:start-hbase.sh
- 进入客户端:hbase shell
#创建NS
create_namespace 'MOMO_CHAT'
#建表
create 'MOMO_CHAT:MOMO_MSG', {NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}
小结
- 掌握Hbase表的设计及创建表的实现
09:离线分析:Kafka消费者构建
目标:实现离线消费者的开发
路径
整体实现的路径
代码语言:javascript复制//入口:调用实现消费Kafka,将数据写入Hbase
public void main(){
//step1:消费Kafka
consumerKafka();
}
//用于消费Kafka数据
public void consumerKafka(){
prop = new Properties()
KafkaConsumer consumer = new KafkaConsumer(prop)
consumer.subscribe("MOMO_MSG")
ConsumerRecords records = consumer.poll
//基于每个分区来消费和处理
record :Topic、Partition、Offset、Key、Value
//step2:写入Hbase
writeToHbase(value)
//提交这个分区的offset
commitSycn(offset 1)
}
//用于将value的数据写入Hbase方法
public void writeToHbase(){
//step1:构建连接
//step2:构建Table对象
//step3:构建Put对象
//获取rowkey
rowkey = getRowkey(value)
Put put = new Put(rowkey)
put.添加每一列
table.put()
}
public String getRowkey(){
value.getSender
value.getReceiver
value.getTime
rowkey = MD5 sender receiverId time
return rowkey
}
实施
代码语言:javascript复制 /**
* 用于消费Kafka的数据,将合法数据写入Hbase
*/
private static void consumerKafkaToHbase() throws Exception {
//构建配置对象
Properties props = new Properties();
//指定服务端地址
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
//指定消费者组的id
props.setProperty("group.id", "momo");
//关闭自动提交
props.setProperty("enable.auto.commit", "false");
//指定K和V反序列化的类型
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//构建消费者的连接
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//指定订阅哪些Topic
consumer.subscribe(Arrays.asList("MOMO_MSG"));
//持续拉取数据
while (true) {
//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//todo:3-处理拉取到的数据:打印
//取出每个分区的数据进行处理
Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
//对每个分区的数据做处理
for (TopicPartition partition : partitions) {
List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
//处理这个分区的数据
long offset = 0;
for (ConsumerRecord<String, String> record : partRecords) {
//获取Topic
String topic = record.topic();
//获取分区
int part = record.partition();
//获取offset
offset = record.offset();
//获取Key
String key = record.key();
//获取Value
String value = record.value();
System.out.println(topic "t" part "t" offset "t" key "t" value);
//将Value数据写入Hbase
if(value != null && !"".equals(value) && value.split("