此项目是一个小测试,将postgre中的某些字段读取到hbase中变成某个表的列族,其中postgre和hbase已经在云服务器上建立好,用的docker技术,开放相应端口,并且win上用管道安全连接。 此项目用到了JPA技术,实现entity和postgre数据库的交互。 首先要加入相应的依赖:
代码语言:javascript复制 <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop-hbase</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies>
</project>
相应的yaml配置文件:
代码语言:javascript复制hbase:
zookeeper:
quorum: xxxx
property:
clientPort: 2181
zookeeper:
znode:
parent: /zkData
spring.datasource:
url: jdbc:postgresql://localhost:5432/db1
username: xxxx
password: xxxx
spring.jpa:
database: postgresql
properties.hibernate.dialect: org.hibernate.dialect.PostgreSQL9Dialect
hibernate.ddl-auto: update
show-sql: false
logging.level:
root: info
加入后进行开发即可: entity实例如下(映射着postgre中的一张表device_type):
entity代码:
代码语言:javascript复制package com.nevt.db.repository.entity;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.*;
import java.io.Serializable;
/**
* (DeviceType)实体类
*
* @author makejava
* @since 2020-12-28 15:50:04
*/
@Data
@Entity
@Table(name = "device_type")
@JsonIgnoreProperties(ignoreUnknown = true)
@EntityListeners(AuditingEntityListener.class)
public class DeviceType implements Serializable {
private static final long serialVersionUID = 106469502944492174L;
@Id
@Column(name = "id")
private Integer id;
@Column(name = "name")
private String name;
@Column(name = "column_family")
private String columnFamily;
@Column(name = "data_station_type_id")
private Integer dataStationTypeId;
}
数据访问层使用JPA提供的接口继承即可:
代码语言:javascript复制package com.nevt.db.repository;
import com.nevt.db.repository.entity.DeviceType;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
/**
* (DeviceType)表数据库访问层
*
* @author makejava
* @since 2020-12-28 15:50:04
*/
public interface DeviceTypeRepository extends JpaRepository<DeviceType, Integer>,
JpaSpecificationExecutor<DeviceType> {
}
相应的hbaseconfig文件利用yaml数据创造hbase连接如下:
代码语言:javascript复制package com.nevt.configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import java.io.IOException;
@Configuration
public class HBaseConfig {
@Value("${hbase.zookeeper.quorum}")
private String zookeeperQuorum;
@Value("${hbase.zookeeper.property.clientPort}")
private String clientPort;
@Value("${zookeeper.znode.parent}")
private String znodeParent;
@Bean
public Connection hbaseConnection() throws IOException {
System.out.println("creating HBase bean");
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
Connection connection = ConnectionFactory.createConnection(configuration);
return connection;
}
}
核心代码postgre及hbase的类如下: postgre:
代码语言:javascript复制import cn.hutool.core.collection.SpliteratorUtil;
import com.nevt.db.repository.DeviceTypeRepository;
import com.nevt.db.repository.entity.DeviceType;
import org.junit.Test;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.SQLOutput;
import java.util.ArrayList;
import java.util.List;
@Component
public class DBService {
@Resource
private DeviceTypeRepository deviceTypeRepository;
public List<String> getColumnFamily(int dataStationType) {
List<String> result = new ArrayList<>();
List<DeviceType> deviceTypeList = deviceTypeRepository.findAll();
for (DeviceType deviceType : deviceTypeList) {
System.out.println(deviceType);
if (deviceType.getDataStationTypeId() == dataStationType) {
result.add(deviceType.getColumnFamily());
}
}
return result;
}
}
hbase(实现了和postgre的交互):
代码语言:javascript复制package com.nevt.service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* @Auther: gzq
* @Date: 2021/1/7 - 01 - 07 - 16:27
* @Description: com.nevt.service
*/
@Component
@EnableScheduling
public class HBaseService {
@Autowired
private Connection hbaseConnection;
@Autowired
private DBService dbService;
/*
* 制氢厂数据写入HBase数据库表
* 数据库表RowKey = <data_source_id>:<timestamp>
* @param tableName 写出要添加列族的表名
* @param dataStationType 填postgre数据库里面对应的字段
*/
public void writeHydrogenFactory(String tableName, int dataStationType) throws IOException {
Admin admin = hbaseConnection.getAdmin();
List<String> columnFamily = dbService.getColumnFamily(dataStationType);
System.out.println(2);
System.out.println(columnFamily);
if (admin.tableExists(TableName.valueOf(tableName))) {
ifTableExist(columnFamily, admin, tableName);
} else {
ifTableNotExist(columnFamily, admin, tableName);
}
}
private void ifTableExist(List<String> columnFamily, Admin admin, String tableName) {
for (String column : columnFamily) {
System.out.println("Table Exist!");
//如果没有表就要创建表用如下方法
HColumnDescriptor newFamily = new HColumnDescriptor(column.getBytes());
System.out.println(1);
//try catch的原因:有可能该字段之前已经添加过了,就不用添加了,但是有些没添加的还要添加,所以先在这里把异
// 常处理掉,后面的字段可以进行添加,不处理的话后面的字段加不上,这里直接抛出异常
try {
admin.addColumn(TableName.valueOf(tableName), newFamily);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("ColumnFamily has added!");
}
}
private void ifTableNotExist(List<String> columnFamily, Admin admin, String tableName) throws IOException {
System.out.println("Table Not Exist!");
HTableDescriptor tableCreate = new HTableDescriptor(TableName.valueOf(tableName));
for (String column : columnFamily) {
System.out.println(column);
HColumnDescriptor columnName = new HColumnDescriptor(column.getBytes());
tableCreate.addFamily(columnName);
}
admin.createTable(tableCreate);
System.out.println("Table and columnFamily have established!");
}
}
测试代码:
代码语言:javascript复制package com.nevt;
import com.nevt.service.HBaseService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.io.IOException;
@SpringBootTest()
class HBaseTest {
@Resource
HBaseService hbaseService;
@Test
void testWrite() throws IOException {
hbaseService.writeHydrogenFactory("data:hydrogen_station_data",10002);
// hbaseService.writeHydrogenFactory("data:hydrogen_vehicle_data",10003);
// hbaseService.writeHydrogenFactory("data:test2", 10003);
}
}
查看hbase中的数据:
并且查看列族是否增加成功:
查看postgre中的数据:
可以对应上。 这样就实现了postgre和hbase之间的交互。 另外附上hbase的客户端的一些操作语句: (1)删除表 先disable再drop disable “表名” drop “表名” (2)删除列族 alter ‘ table name ’, ‘delete’ => ‘ column family ’ (3)插看某表具体信息 desc “表名”