springboot整合postgre和hbase实现互相交互功能

2021-01-13 11:16:29 浏览数 (1)

此项目是一个小测试,将postgre中的某些字段读取到hbase中变成某个表的列族,其中postgre和hbase已经在云服务器上建立好,用的docker技术,开放相应端口,并且win上用管道安全连接。 此项目用到了JPA技术,实现entity和postgre数据库的交互。 首先要加入相应的依赖:

代码语言:javascript复制
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.5.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.yaml</groupId>
            <artifactId>snakeyaml</artifactId>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.1</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.5.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.htrace</groupId>
            <artifactId>htrace-core</artifactId>
            <version>3.0.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-hbase</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.3</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
            <version>2.3.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>
</project>

相应的yaml配置文件:

代码语言:javascript复制
hbase:
  zookeeper:
    quorum: xxxx
    property:
      clientPort: 2181

zookeeper:
  znode:
    parent: /zkData

spring.datasource:
  url: jdbc:postgresql://localhost:5432/db1
  username: xxxx
  password: xxxx
spring.jpa:
  database: postgresql
  properties.hibernate.dialect: org.hibernate.dialect.PostgreSQL9Dialect
  hibernate.ddl-auto: update
  show-sql: false

logging.level:
  root: info

加入后进行开发即可: entity实例如下(映射着postgre中的一张表device_type):

entity代码:

代码语言:javascript复制
package com.nevt.db.repository.entity;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;

import javax.persistence.*;
import java.io.Serializable;

/**
 * (DeviceType)实体类
 *
 * @author makejava
 * @since 2020-12-28 15:50:04
 */
@Data
@Entity
@Table(name = "device_type")
@JsonIgnoreProperties(ignoreUnknown = true)
@EntityListeners(AuditingEntityListener.class)
public class DeviceType implements Serializable {
    private static final long serialVersionUID = 106469502944492174L;

    @Id
    @Column(name = "id")
    private Integer id;

    @Column(name = "name")
    private String name;

    @Column(name = "column_family")
    private String columnFamily;

    @Column(name = "data_station_type_id")
    private Integer dataStationTypeId;

}

数据访问层使用JPA提供的接口继承即可:

代码语言:javascript复制
package com.nevt.db.repository;

import com.nevt.db.repository.entity.DeviceType;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;

/**
 * (DeviceType)表数据库访问层
 *
 * @author makejava
 * @since 2020-12-28 15:50:04
 */
public interface DeviceTypeRepository extends JpaRepository<DeviceType, Integer>,
        JpaSpecificationExecutor<DeviceType> {

}

相应的hbaseconfig文件利用yaml数据创造hbase连接如下:

代码语言:javascript复制
package com.nevt.configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;

import java.io.IOException;

@Configuration
public class HBaseConfig {

    @Value("${hbase.zookeeper.quorum}")
    private String zookeeperQuorum;

    @Value("${hbase.zookeeper.property.clientPort}")
    private String clientPort;

    @Value("${zookeeper.znode.parent}")
    private String znodeParent;

    @Bean
    public Connection hbaseConnection() throws IOException {
        System.out.println("creating HBase bean");
        org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
        Connection connection = ConnectionFactory.createConnection(configuration);
        return connection;
    }
}

核心代码postgre及hbase的类如下: postgre:

代码语言:javascript复制
import cn.hutool.core.collection.SpliteratorUtil;
import com.nevt.db.repository.DeviceTypeRepository;
import com.nevt.db.repository.entity.DeviceType;
import org.junit.Test;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.sql.SQLOutput;
import java.util.ArrayList;
import java.util.List;

@Component
public class DBService {

    @Resource
    private DeviceTypeRepository deviceTypeRepository;

   

    public List<String> getColumnFamily(int dataStationType) {
        List<String> result = new ArrayList<>();
        List<DeviceType> deviceTypeList = deviceTypeRepository.findAll();
        for (DeviceType deviceType : deviceTypeList) {
            System.out.println(deviceType);
            if (deviceType.getDataStationTypeId() == dataStationType) {
                result.add(deviceType.getColumnFamily());
            }
        }
        return result;
    }
 }

hbase(实现了和postgre的交互):

代码语言:javascript复制
package com.nevt.service;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * @Auther: gzq
 * @Date: 2021/1/7 - 01 - 07 - 16:27
 * @Description: com.nevt.service
 */

@Component
@EnableScheduling
public class HBaseService {

    @Autowired
    private Connection hbaseConnection;

    @Autowired
    private DBService dbService;



    /*
     * 制氢厂数据写入HBase数据库表
     * 数据库表RowKey = <data_source_id>:<timestamp>
     * @param tableName 写出要添加列族的表名
     * @param dataStationType 填postgre数据库里面对应的字段
     */

    public void writeHydrogenFactory(String tableName, int dataStationType) throws IOException {

        Admin admin = hbaseConnection.getAdmin();

        List<String> columnFamily = dbService.getColumnFamily(dataStationType);
        System.out.println(2);
        System.out.println(columnFamily);
        if (admin.tableExists(TableName.valueOf(tableName))) {
            ifTableExist(columnFamily, admin, tableName);
        } else {
            ifTableNotExist(columnFamily, admin, tableName);
        }
    }

    private void ifTableExist(List<String> columnFamily, Admin admin, String tableName) {
        for (String column : columnFamily) {
            System.out.println("Table Exist!");
            //如果没有表就要创建表用如下方法
            HColumnDescriptor newFamily = new HColumnDescriptor(column.getBytes());
            System.out.println(1);
            //try catch的原因:有可能该字段之前已经添加过了,就不用添加了,但是有些没添加的还要添加,所以先在这里把异
            // 常处理掉,后面的字段可以进行添加,不处理的话后面的字段加不上,这里直接抛出异常
            try {
                admin.addColumn(TableName.valueOf(tableName), newFamily);
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("ColumnFamily has added!");
        }
    }

    private void ifTableNotExist(List<String> columnFamily, Admin admin, String tableName) throws IOException {
        System.out.println("Table Not Exist!");
        HTableDescriptor tableCreate = new HTableDescriptor(TableName.valueOf(tableName));
        for (String column : columnFamily) {
            System.out.println(column);
            HColumnDescriptor columnName = new HColumnDescriptor(column.getBytes());
            tableCreate.addFamily(columnName);
        }
        admin.createTable(tableCreate);
        System.out.println("Table and columnFamily have established!");
    }
}

测试代码:

代码语言:javascript复制
package com.nevt;


import com.nevt.service.HBaseService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;


import javax.annotation.Resource;
import java.io.IOException;

@SpringBootTest()
class HBaseTest {

    @Resource
    HBaseService hbaseService;

    @Test
    void testWrite() throws IOException {
        hbaseService.writeHydrogenFactory("data:hydrogen_station_data",10002);
//        hbaseService.writeHydrogenFactory("data:hydrogen_vehicle_data",10003);
//        hbaseService.writeHydrogenFactory("data:test2", 10003);


    }
}

查看hbase中的数据:

并且查看列族是否增加成功:

查看postgre中的数据:

可以对应上。 这样就实现了postgre和hbase之间的交互。 另外附上hbase的客户端的一些操作语句: (1)删除表 先disable再drop disable “表名” drop “表名” (2)删除列族 alter ‘ table name ’, ‘delete’ => ‘ column family ’ (3)插看某表具体信息 desc “表名”

0 人点赞