定义解析kafka数据的Bean对象类
一、定义消费kafka字符串的Bean对象基类
根据数据来源不同可以分为OGG数据和Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类
实现步骤:
- 在公共模块的java目录下的 parser 包下创建 MessageBean 抽象类
- 编写代码
- 继承自 Serializable 接口
- 创建 serialVersionUID 属性
- 定义 table 属性,实现 setter/getter 方法
参考代码:
代码语言:javascript复制package cn.it.logistics.common.beans.parser;
import java.io.Serializable;
/**
* 根据数据源定义抽象类,数据源:
* 1)ogg
* 2)canal
* 两者有共同的table属性
*/
public abstract class MessageBean implements Serializable {
private static final long serialVersionUID = -8216415778785426469L;
private String table;
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
@Override
public String toString() {
return table;
}
}
为什么创建serialVersionUID:
serialVersionUID适用于Java的序列化机制。简单来说,Java的序列化机制是通过判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体类的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是InvalidCastException。
使用idea生成serialVersionUID:
操作步骤 | 说明 |
---|---|
1 | 设置自动生成 serialVersionUID |
2 | 选中对应的类名,然后按 alt enter 快捷键 |
3 | 结果显示 |
二、定义消费OGG字符串的Bean对象
实现步骤:
- 在公共模块的 parser 包下创建 OggMessageBean 类
- 继承自 MessageBean 抽象类
参考代码:
代码语言:javascript复制package cn.it.logistics.common.beans.parser;
import javax.print.DocFlavor;
import java.util.Map;
/**
* 定义消费出来的ogg的数据的javaBean对象
* {
* "table": "IT.tbl_route", //表名:库名.表名
* "op_type": "U", //操作类型:U表示修改
* "op_ts": "2020-10-08 09:10:54.000774",
* "current_ts": "2020-10-08T09:11:01.925000",
* "pos": "00000000200006645758",
* "before": { //操作前的字段集合
* "id": 104,
* "start_station": "东莞中心",
* "start_station_area_id": 441900,
* "start_warehouse_id": 1,
* "end_station": "蚌埠中转部",
* "end_station_area_id": 340300,
* "end_warehouse_id": 107,
* "mileage_m": 1369046,
* "time_consumer_minute": 56172,
* "state": 1,
* "cdt": "2020-02-02 18:51:39",
* "udt": "2020-02-02 18:51:39",
* "remark": null
* },
* "after": { //操作后的字段集合
* "id": 104,
* "start_station": "东莞中心",
* "start_station_area_id": 441900,
* "start_warehouse_id": 1,
* "end_station": "TBD",
* "end_station_area_id": 340300,
* "end_warehouse_id": 107,
* "mileage_m": 1369046,
* "time_consumer_minute": 56172,
* "state": 1,
* "cdt": "2020-02-02 18:51:39",
* "udt": "2020-02-02 18:51:39",
* "remark": null
* }
* }
*/
public class OggMessageBean extends MessageBean {
//定义操作类型
private String op_type;
@Override
public void setTable(String table) {
//如果表名不为空
if (table != null && !table.equals("")) {
table = table.replaceAll("[A-Z] \.", "");
}
super.setTable(table);
}
public String getOp_type() {
return op_type;
}
public void setOp_type(String op_type) {
this.op_type = op_type;
}
public String getOp_ts() {
return op_ts;
}
public void setOp_ts(String op_ts) {
this.op_ts = op_ts;
}
public String getCurrent_ts() {
return current_ts;
}
public void setCurrent_ts(String current_ts) {
this.current_ts = current_ts;
}
public String getPos() {
return pos;
}
public void setPos(String pos) {
this.pos = pos;
}
public Map<String, Object> getBefore() {
return before;
}
public void setBefore(Map<String, Object> before) {
this.before = before;
}
public Map<String, Object> getAfter() {
return after;
}
public void setAfter(Map<String, Object> after) {
this.after = after;
}
//操作时间
private String op_ts;
@Override
public String toString() {
return "OggMessageBean{"
"table='" super.getTable() '''
", op_type='" op_type '''
", op_ts='" op_ts '''
", current_ts='" current_ts '''
", pos='" pos '''
", before=" before
", after=" after
'}';
}
/**
* 返回需要处理的列的集合
* @return
*/
public Map<String, Object> getValue() {
//如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
if (after == null) {
return before;
} else {
return after;
}
}
//同步时间
private String current_ts;
//偏移量
private String pos;
//操作之前的数据
private Map<String, Object> before;
//操作之后的数据
private Map<String, Object> after;
}
三、定义消费Canal字符串的Bean对象
实现步骤:
- 在公共模块的 parser 包下创建 CanalMessageBean 类
- 继承自 MessageBean 抽象类
参考代码:
代码语言:javascript复制package cn.it.logistics.common.beans.parser;
import java.util.List;
import java.util.Map;
/**
* 定义消费出来的canal的数据对应的javaBean对象
* {
* "data": [{
* "id": "1",
* "name": "北京",
* "tel": "222",
* "mobile": "1111",
* "detail_addr": "北京",
* "area_id": "1",
* "gis_addr": "1",
* "cdt": "2020-10-08 17:20:12",
* "udt": "2020-11-05 17:20:16",
* "remark": null
* }],
* "database": "crm",
* "es": 1602148867000,
* "id": 15,
* "isDdl": false,
* "mysqlType": {
* "id": "bigint(20)",
* "name": "varchar(50)",
* "tel": "varchar(20)",
* "mobile": "varchar(20)",
* "detail_addr": "varchar(100)",
* "area_id": "bigint(20)",
* "gis_addr": "varchar(20)",
* "cdt": "datetime",
* "udt": "datetime",
* "remark": "varchar(100)"
* },
* "old": [{
* "tel": "111"
* }],
* "sql": "",
* "sqlType": {
* "id": -5,
* "name": 12,
* "tel": 12,
* "mobile": 12,
* "detail_addr": 12,
* "area_id": -5,
* "gis_addr": 12,
* "cdt": 93,
* "udt": 93,
* "remark": 12
* },
* "table": "crm_address",
* "ts": 1602148867311,
* "type": "UPDATE" //修改数据
* }
*/
public class CanalMessageBean extends MessageBean {
//操作的数据集合
private List<Map<String, Object>> data;
public List<Map<String, Object>> getData() {
return data;
}
public void setData(List<Map<String, Object>> data) {
this.data = data;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public Long getEs() {
return es;
}
public void setEs(Long es) {
this.es = es;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public boolean isDdl() {
return isDdl;
}
public void setDdl(boolean ddl) {
isDdl = ddl;
}
public Map<String, Object> getMysqlType() {
return mysqlType;
}
public void setMysqlType(Map<String, Object> mysqlType) {
this.mysqlType = mysqlType;
}
public String getOld() {
return old;
}
public void setOld(String old) {
this.old = old;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Map<String, Object> getSqlType() {
return sqlType;
}
public void setSqlType(Map<String, Object> sqlType) {
this.sqlType = sqlType;
}
public Long getTs() {
return ts;
}
public void setTs(Long ts) {
this.ts = ts;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
//数据库名称
private String database;
private Long es;
private Long id;
private boolean isDdl;
private Map<String, Object> mysqlType;
private String old;
private String sql;
private Map<String, Object> sqlType;
private Long ts;
private String type;
/**
* 重写父类的settable方法,将表名修改成统一的前缀
* @param table
*/
@Override
public void setTable(String table) {
if(table!=null && !table.equals("")){
if(table.startsWith("crm_")) {
table = table.replace("crm_", "tbl_");
}
}
super.setTable(table);
}
}