客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类

2022-03-09 09:01:47 浏览数 (1)

定义解析kafka数据的Bean对象类

一、定义消费kafka字符串的Bean对象基类

根据数据来源不同可以分为OGG数据Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类

实现步骤:

  • 公共模块java目录下的 parser 包下创建 MessageBean 抽象类
  • 编写代码
    • 继承自 Serializable 接口
    • 创建 serialVersionUID 属性
    • 定义 table 属性,实现 setter/getter 方法

参考代码:

代码语言:javascript复制
package cn.it.logistics.common.beans.parser;

import java.io.Serializable;

/**
 * 根据数据源定义抽象类,数据源:
 * 1)ogg
 * 2)canal
 * 两者有共同的table属性
 */
public abstract class MessageBean implements Serializable {
 
    private static final long serialVersionUID = -8216415778785426469L;

    private String table;

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    @Override
    public String toString() {
        return table;
    }
}

为什么创建serialVersionUID:

serialVersionUID适用于Java的序列化机制。简单来说,Java的序列化机制是通过判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体类的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是InvalidCastException。

使用idea生成serialVersionUID:

操作步骤

说明

1

设置自动生成 serialVersionUID

2

选中对应的类名,然后按 alt enter 快捷键

3

结果显示

二、​​​​​​​定义消费OGG字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 OggMessageBean
  • 继承自 MessageBean 抽象类

参考代码:

代码语言:javascript复制
package cn.it.logistics.common.beans.parser;

import javax.print.DocFlavor;
import java.util.Map;

/**
 * 定义消费出来的ogg的数据的javaBean对象
 * {
 *     "table": "IT.tbl_route",            //表名:库名.表名
 *     "op_type": "U",                         //操作类型:U表示修改
 *     "op_ts": "2020-10-08 09:10:54.000774",
 *     "current_ts": "2020-10-08T09:11:01.925000",
 *     "pos": "00000000200006645758",
 *     "before": {                            //操作前的字段集合
 *        "id": 104,
 *        "start_station": "东莞中心",
 *        "start_station_area_id": 441900,
 *        "start_warehouse_id": 1,
 *        "end_station": "蚌埠中转部",
 *        "end_station_area_id": 340300,
 *        "end_warehouse_id": 107,
 *        "mileage_m": 1369046,
 *        "time_consumer_minute": 56172,
 *        "state": 1,
 *        "cdt": "2020-02-02 18:51:39",
 *        "udt": "2020-02-02 18:51:39",
 *        "remark": null
 *        },
 *     "after": {                         //操作后的字段集合
 *        "id": 104,
 *        "start_station": "东莞中心",
 *        "start_station_area_id": 441900,
 *        "start_warehouse_id": 1,
 *        "end_station": "TBD",
 *        "end_station_area_id": 340300,
 *        "end_warehouse_id": 107,
 *        "mileage_m": 1369046,
 *        "time_consumer_minute": 56172,
 *        "state": 1,
 *        "cdt": "2020-02-02 18:51:39",
 *        "udt": "2020-02-02 18:51:39",
 *        "remark": null
 *    }
 * }
 */
public class OggMessageBean extends MessageBean {
    //定义操作类型
    private String op_type;

    @Override
    public void setTable(String table) {
        //如果表名不为空
        if (table != null && !table.equals("")) {
            table = table.replaceAll("[A-Z] \.", "");
        }
        super.setTable(table);
    }

    public String getOp_type() {
        return op_type;
    }

    public void setOp_type(String op_type) {
        this.op_type = op_type;
    }

    public String getOp_ts() {
        return op_ts;
    }

    public void setOp_ts(String op_ts) {
        this.op_ts = op_ts;
    }

    public String getCurrent_ts() {
        return current_ts;
    }

    public void setCurrent_ts(String current_ts) {
        this.current_ts = current_ts;
    }

    public String getPos() {
        return pos;
    }

    public void setPos(String pos) {
        this.pos = pos;
    }

    public Map<String, Object> getBefore() {
        return before;
    }

    public void setBefore(Map<String, Object> before) {
        this.before = before;
    }

    public Map<String, Object> getAfter() {
        return after;
    }

    public void setAfter(Map<String, Object> after) {
        this.after = after;
    }

    //操作时间
    private String op_ts;

    @Override
    public String toString() {
        return "OggMessageBean{"  
                "table='"   super.getTable()   '''  
                ", op_type='"   op_type   '''  
                ", op_ts='"   op_ts   '''  
                ", current_ts='"   current_ts   '''  
                ", pos='"   pos   '''  
                ", before="   before  
                ", after="   after  
                '}';
    }

    /**
     * 返回需要处理的列的集合
     * @return
     */
    public Map<String, Object> getValue() {
        //如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
        if (after == null) {
            return before;
        } else {
            return after;
        }
    }

    //同步时间
    private String current_ts;
    //偏移量
    private String pos;
    //操作之前的数据
    private Map<String, Object> before;
    //操作之后的数据
    private Map<String, Object> after;
}

三、​​​​​​​定义消费Canal字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 CanalMessageBean
  • 继承自 MessageBean 抽象类

参考代码:

代码语言:javascript复制
package cn.it.logistics.common.beans.parser;

import java.util.List;
import java.util.Map;

/**
 * 定义消费出来的canal的数据对应的javaBean对象
 * {
 *     "data": [{
 *        "id": "1",
 *        "name": "北京",
 *        "tel": "222",
 *        "mobile": "1111",
 *        "detail_addr": "北京",
 *        "area_id": "1",
 *        "gis_addr": "1",
 *        "cdt": "2020-10-08 17:20:12",
 *        "udt": "2020-11-05 17:20:16",
 *        "remark": null
 *        }],
 *     "database": "crm",
 *     "es": 1602148867000,
 *     "id": 15,
 *     "isDdl": false,
 *     "mysqlType": {
 *        "id": "bigint(20)",
 *        "name": "varchar(50)",
 *        "tel": "varchar(20)",
 *        "mobile": "varchar(20)",
 *        "detail_addr": "varchar(100)",
 *        "area_id": "bigint(20)",
 *        "gis_addr": "varchar(20)",
 *        "cdt": "datetime",
 *        "udt": "datetime",
 *        "remark": "varchar(100)"
 *    },
 *     "old": [{
 *        "tel": "111"
 *    }],
 *     "sql": "",
 *     "sqlType": {
 *        "id": -5,
 *        "name": 12,
 *        "tel": 12,
 *        "mobile": 12,
 *        "detail_addr": 12,
 *        "area_id": -5,
 *        "gis_addr": 12,
 *        "cdt": 93,
 *        "udt": 93,
 *        "remark": 12
 *    },
 *     "table": "crm_address",
 *     "ts": 1602148867311,
 *     "type": "UPDATE"               //修改数据
 * }
 */
public class CanalMessageBean extends MessageBean {
    //操作的数据集合
    private List<Map<String, Object>> data;

    public List<Map<String, Object>> getData() {
        return data;
    }

    public void setData(List<Map<String, Object>> data) {
        this.data = data;
    }

    public String getDatabase() {
        return database;
    }

    public void setDatabase(String database) {
        this.database = database;
    }

    public Long getEs() {
        return es;
    }

    public void setEs(Long es) {
        this.es = es;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public boolean isDdl() {
        return isDdl;
    }

    public void setDdl(boolean ddl) {
        isDdl = ddl;
    }

    public Map<String, Object> getMysqlType() {
        return mysqlType;
    }

    public void setMysqlType(Map<String, Object> mysqlType) {
        this.mysqlType = mysqlType;
    }

    public String getOld() {
        return old;
    }

    public void setOld(String old) {
        this.old = old;
    }

    public String getSql() {
        return sql;
    }

    public void setSql(String sql) {
        this.sql = sql;
    }

    public Map<String, Object> getSqlType() {
        return sqlType;
    }

    public void setSqlType(Map<String, Object> sqlType) {
        this.sqlType = sqlType;
    }


    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    //数据库名称
    private String database;
    private Long es;
    private Long id;
    private boolean isDdl;
    private Map<String, Object> mysqlType;
    private String old;
    private String sql;
    private Map<String, Object> sqlType;
    private Long ts;
    private String type;

    /**
     * 重写父类的settable方法,将表名修改成统一的前缀
     * @param table
     */
    @Override
    public void setTable(String table) {
        if(table!=null && !table.equals("")){
            if(table.startsWith("crm_")) {
                table = table.replace("crm_", "tbl_");
            }
        }
        super.setTable(table);
    }
}

0 人点赞