案例解析 GBase8s 在工业互联网平台中的应用

2021-04-08 21:52:54 浏览数 (1)

工业互联网平台

工业互联网作为新一代信息技术与制造业深度融合的产物,通过实现人、机、物的全面互联,构建了一个全要素、全产业链、全价值链连接的、新型的生产制造和服务体系,是促进传统产业转型升级、实现高质量发展的重要驱动。企业工业互联网包括网络、平台、安全三大体系,其中工业互联网平台是工业全要素、全产业链、全价值链连接的枢纽,是实现制造业数字化、网络化、智能化过程中工业资源配置的核心,已成为当下各行各业瞩目的焦点。

工业4.0 和 中国智造2025

工信部长苗圩在讲到德国工业4.0与中国制造2025时,曾这样概括:如出一辙、异曲同工、殊途同归。因此,两者表述不同,但内涵基本一致。

工业4.0 : 由德国提出,主要指提升制造业的智能化水平,建立具有适应性、资源效率及基因工程学的智慧工厂,在商业流程及价值流程中整合客户及商业伙伴。其技术基础是网络实体系统及物联网 。

中国智造2025 : 中国制造2025,是中国政府实施制造强国战略第一个十年的行动纲领。《中国制造2025》提出,坚持“创新驱动、质量为先、绿色发展、结构优化、人才为本”的基本方针,坚持“市场主导、政府引导,立足当前、着眼长远,整体推进、重点突破,自主发展、开放合作”的基本原则,通过“三步走”实现制造强国的战略目标:

  • 第一步,到2025年迈入制造强国行列;
  • 第二步,到2035年中国制造业整体达到世界制造强国阵营中等水平;
  • 第三步,到新中国成立一百年时,综合实力进入世界制造强国前列。

围绕实现制造强国的战略目标,《中国制造2025》明确了9项战略任务和重点,提出了8个方面的战略支撑和保障。

2016年4月6日国务院总理李克强主持召开国务院常务会议,会议通过了《装备制造业标准化和质量提升规划》,要求对接《中国制造2025》。7月19日国务院常务会议部署创建“中国制造2025”国家级示范区,专家指出,“中国制造2025”提至国家级,较以前城市试点有所升级。“7月19日部署的‘中国制造2025’国家级示范区相当于此前‘中国制造2025’城市试点示范的升级版,”工信部赛迪研究院规划所副所长张洪国对《21世纪经济报道》表示,此前是以工信部为主来批复“中国制造2025”试点示范城市,在国家制造强国建设领导小组指导下开展相关工作的;今后将由国务院来审核、批复国家级的示范区,相关文件也将由国务院来统一制定。

工业大数据

在中国智造2025的大前提下,工业大数据也就应运而生了。那么什么是“大数据“,什么又是“工业大数据“呢?

所谓“大数据”,指的是所涉及的数据量规模巨大到无法通过目前主流软件工具,在合理时间内达到截取、管理、处理、并整理成为帮助企业经营决策更积极目的的信息。其特征,数据容量大、多样、快速和价值密度低。

工业大数据除具有一般大数据的特征(数据容量大、多样、快速和价值密度低)外,还具有时序性、强关联性、准确性、闭环性等特征。

数据容量大(volume):数据的大小决定所考虑的数据的价值和潜在的信息。工业数据体量比较大,大量机器设备的高频数据和互联网数据持续涌入,大型工业企业的数据集将达到PB级甚至EB级别。

多样(variety):指数据类型的多样性和来源广泛。工业数据分布广泛,分布于机器设备、工业产品、管理系统、互联网等各个环节,并且结构复杂,既有结构化和半结构化的传感数据,也有非结构化数据。

快速(velocity):指获得和处理数据的速度。工业数据处理速度需求多样,生产现场级要求分析时限达到毫秒级,管理与决策应用需要支持交互式或批量数据分析。

价值密度低(value):工业大数据更强调用户价值驱动和数据本身的可用性,包括:提升创新能力和生产经营效率及促进个性化定制、服务化转型等智能制造新模式变革。

时序性(sequence):工业大数据具有较强的时序性,如订单、设备状态数据等。

强关联性(strong-relevance):一方面,产品生命周期同一阶段的数据具有强关联性,如产品零部件组成、工况、设备状态、维修情况、零部件补充采购等;另一方面,产品生命周期的研发设计、生产、服务等不同环节的数据之间需要进行关联。

准确性(accuracy):主要指数据的真实性、完整性和可靠性,更加关注数据质量以及处理、分析技术和方法的可靠性。对数据分析的置信度要求较高,仅依靠统计相关性分析不足以支撑故障诊断、预测预警等工业应用,需要将物理模型与数据模型结合,挖掘因果关系。

闭环性(closed-loop):包括产品全生命周期横向过程中数据链条的封闭和关联以及智能制造纵向数据采集和处理过程中,需要支撑状态感知、分析、反馈、控制等闭环场景下的动态持续调整和优化。

  工业大数据作为大数据的一个应用行业,在具有广阔应用前景的同时,对传统的数据管理技术与数据分析技术也提出了很大的挑战。

以 GBase8s 为核心的工业互联网数据平台逻辑架构

工业互联网数据平台应该以提升产品智能化深入拓展行业应用为己任。我们的逻辑架构也以此展开,首先我们从边缘(端)回去数据,通过技术手段将数据重新整合,并对外提供价值。

传统批处理式的数据平台,已经不再适应现代信息发展的要求的了,就更无法适配工业互联网数据平台的要求。所以我们采取了一种类 lambda 架构的结构形式,基于GBase8s 强大的事务处理能力,为整个平台提供数据支撑。

案例解析

接下来,我们将通过一个保姆级教程,来模拟一个简单的工业互联网数据处理模型。

设备通过mqtt向平台发送数据,但是一般设备数据都不会携带具体的设备信息,只会携带mac,或是一些其他标识,这样我们入库的时候就需要一次清理工作。基于上一章节我们的架构设计,我们只需要在8s里记录一个设备表,来存储必要信息,然后通过flink向mqtt流广播数据,这样我们就可以无缝添加设备,并进行处理了。处理后的数据sink到8s,再通过 8s 提供的 cdc(Change Data Capture 变更数据捕获) 功能。我们可以对增量数据进行更多应用需要的处理工作。

模拟MQTT环境

这里我们使用 docker 创建一个mqtt服务,并通过程序模拟,向其发送数据

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx

下面是用于模拟发送数据

代码语言:javascript复制
import cn.gbase.mqtt.MQTTSource;
import com.google.gson.Gson;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.Hashtable;
import java.util.Random;
//数据模拟器
public class Emmit {

 public static void main(String[] args) {
     String[] gmacs = new String[]{
             "127.0.0.1",
             "127.0.0.2",
             "127.0.0.3",
             "127.0.0.4",
             "127.0.0.5",
             "127.0.0.6"
     };
     for(;;){
         int i = new Random().nextInt(gmacs.length);
         String gmac = gmacs[i];
         Hashtable<String,String> data = new Hashtable<>();
         data.put("gmac",gmac);
         data.put("v1",new Random().nextInt(300) "");
         data.put("v2",new Random().nextInt(100) "");

         Gson g = new Gson();
         String jsonData = g.toJson(data);

         Emmit.emmit(jsonData);
     }

 }

 public static void emmit(String content){
     MemoryPersistence persistence = new MemoryPersistence();

     try {
         MqttClient client = new MqttClient(MQTTSource.broker,"client2",persistence);

         // MQTT 连接选项
         MqttConnectOptions connOpts = new MqttConnectOptions();
         connOpts.setUserName("admin");
         connOpts.setPassword("public".toCharArray());
         // 保留会话
         connOpts.setCleanSession(true);
         // 建立连接
         System.out.println("Connecting to broker: "   MQTTSource.broker);
         client.connect(connOpts);

         System.out.println("Connected");

         // 消息发布所需参数
         System.out.println("Publishing message: "   content);
         MqttMessage message = new MqttMessage(content.getBytes());
         message.setQos(MQTTSource.qos);
         client.publish(MQTTSource.pubTopic, message);
         System.out.println("Message published");
         client.disconnect();
         System.out.println("Disconnected");
         client.close();

     } catch (MqttException me) {
         me.printStackTrace();
     }
 }


}

创建MQTT Source

这个类的主要目的是从mqtt里回去输入数据。并将其封装成一个元组(Tuple)返回给下一个步骤。

代码语言:javascript复制
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.eclipse.paho.client.mqttv3.*;

import java.nio.charset.StandardCharsets;


public class MQTTSource implements SourceFunction<Tuple2<String, String>> {

 public static String subTopic = "testtopic/#";
 public static String pubTopic = "testtopic/1";
 public static int qos = 2;
 public static String broker = "tcp://127.0.0.1:1883";
 public static String clientId = "emqx_test";


 public static final String USERNAME = "admin";
 public static final String PASSWORD = "public";

 private transient MqttClient client;
 private transient volatile boolean running;
 private transient Object waitLock;


 public void stop() {
     close();
 }

 @Override
 public void run(final SourceContext<Tuple2<String, String>> ctx) throws Exception {
     MqttConnectOptions connectOptions = new MqttConnectOptions();
     connectOptions.setCleanSession(true);
     connectOptions.setAutomaticReconnect(true);
     connectOptions.setUserName(USERNAME);
     connectOptions.setPassword(PASSWORD.toCharArray());
     client = new MqttClient(broker, clientId);
     client.connect(connectOptions);

     client.subscribe(subTopic, (topic, message) -> {
         System.out.println(message);
         String msg = new String(message.getPayload(), StandardCharsets.UTF_8);
         ctx.collect(Tuple2.of(msg,topic));
     });

     running = true;
     waitLock = new Object();

     while (running) {
         synchronized (waitLock) {
             waitLock.wait(100L);
         }

     }
 }

 @Override
 public void cancel() {
     close();
 }

 private void close() {
     try {
         if (client != null) {
             client.disconnect();
         }
     } catch (MqttException exception) {

     } finally {
         this.running = false;
     }

     // leave main method
     synchronized (waitLock) {
         waitLock.notify();
     }
 }
}

从 GBase8s 获取设备表信息

这里我们会定时从信息表里面获取到一些数据,并将其广播出去。

代码语言:javascript复制
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Hashtable;
import java.util.Map;

//RichSourceFunction RichParallelSourceFunction
public class GBase8sSource extends RichSourceFunction<Map<String, String>> {
 private static final Logger logger = LoggerFactory.getLogger(GBase8sSource.class);
 public static String driverClassName = "com.gbasedbt.jdbc.IfxDriver";
 public static String db_url = "jdbc:gbasedbt-sqli://172.30.232.114:1533/test1:GBASEDBTSERVER=ol_gbasedbt1210;NEWCODESET=UTF-8,cp1252,819;DB_LOCALE=zh_cn.utf8;CLIENT_LOCALE=zh_cn.utf8;";
 public static String user = "gbasedbt";
 public static String password = "dafei1288";

 private Connection connection = null;
 private Statement s = null;
 private volatile boolean isRunning = true;


 @Override
 public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     Class.forName(driverClassName);
     connection = DriverManager.getConnection(db_url, user, password);//获取连接
     s = connection.createStatement();
 }

 @Override
 public void run(SourceContext<Map<String, String>> ctx) throws Exception {
     Map<String, String> deviceMap = new Hashtable<String, String>();
     try {
         while (isRunning) {
             ResultSet resultSet = s.executeQuery("select mid, gmac,name from mtable");
             resultSet.beforeFirst();
             while (resultSet.next()) {

                 String gmac = resultSet.getString(2);
                 String name = resultSet.getString(3);
                 //System.out.println("gmac = " gmac   " , name = " name );
                 deviceMap.put(gmac, name);

             }
             System.out.println();
             System.out.println("flash map ==> "   deviceMap);
             System.out.println();
             ctx.collect(deviceMap);
             deviceMap.clear();
             Thread.sleep(1000 * 60);
         }
     } catch (Exception e) {
         logger.error("runException:{}", e);
     }
 }

 //关闭数据库连接
 @Override
 public void cancel() {
     try {
         super.close();
         if (connection != null) {
             connection.close();
         }
         if (s != null) {
             s.close();
         }
     } catch (Exception e) {
         logger.error("runException:{}", e);
     }
     isRunning = false;
 }
}

将处理结果写回数据库

此类的作用,将数据结果写回

代码语言:javascript复制
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class GBase8sSink extends RichSinkFunction<DeviceData> {
 private PreparedStatement ps = null;
 private Connection connection = null;

 @Override
 public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     connection = getConn();
     ps = connection.prepareStatement("insert into factable(name,gmac,v1,v2,updatetime) values (?,?,?,?,?);");
 }
 private Connection getConn() {
     try {
         Class.forName(GBase8sSource.driverClassName);
         connection = DriverManager.getConnection(GBase8sSource.db_url, GBase8sSource.user, GBase8sSource.password);
     } catch (Exception e) {
         e.printStackTrace();
     }
     return connection;
 }

 //每一个元素的插入,都会被调用一次invoke方法
 @Override
 public void invoke(DeviceData dd, Context context) throws Exception {
     ps.setString(1,dd.getName());
     ps.setString(2,dd.getGmac());
     ps.setString(3,dd.getV1());
     ps.setString(4,dd.getV2());
     ps.setString(5,dd.getUpdatetime());
     ps.execute();
 }

 @Override
 public void close() throws Exception {
     super.close();
     if(connection != null){
         connection.close();
     }
     if (ps != null){
         ps.close();
     }
 }
}

这个类仅作为DTO使用

代码语言:javascript复制
public class DeviceData {

 private String name;
 private String gmac;
 private String v1;
 private String v2;
 private String updatetime;

 @Override
 public String toString() {
     return "DeviceData{"  
//                "mid='"   mid   '''  
             ", name='"   name   '''  
             ", gmac='"   gmac   '''  
             ", v1='"   v1   '''  
             ", v2='"   v2   '''  
             ", updatetime='"   updatetime   '''  
             '}';
 }

 public String getName() {
     return name;
 }

 public void setName(String name) {
     this.name = name;
 }

 public String getGmac() {
     return gmac;
 }

 public void setGmac(String gmac) {
     this.gmac = gmac;
 }

 public String getV1() {
     return v1;
 }

 public void setV1(String v1) {
     this.v1 = v1;
 }

 public String getV2() {
     return v2;
 }

 public void setV2(String v2) {
     this.v2 = v2;
 }

 public String getUpdatetime() {
     return updatetime;
 }

 public void setUpdatetime(String updatetime) {
     this.updatetime = updatetime;
 }
}

流批混合

此类的作用,混合流与批,并将处理的数据写回数据库。

代码语言:javascript复制
import cn.gbase.mqtt.MQTTSource;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.*;


public class MqttFlinkMain {
 private static Map<String, String> deviceMap = new Hashtable<String, String>();

 public static void main(String[] args) throws Exception {

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

     // 广播更新设备对照表
     DataStream<Map<String, String>> deviceStream = env.addSource(new GBase8sSource());
     deviceStream.broadcast().map(new MapFunction<Map<String, String>, Object>() {
         @Override
         public Object map(Map<String, String> value) {
             deviceMap = value;
             return null;
         }
     });

     // mqtt 数据输入
     DataStream<Tuple2<String, String>> inputStream = env.addSource(new MQTTSource());

     DataStream<DeviceData> dataStream = inputStream.rebalance()
             .flatMap(new FlatMapFunction<Tuple2<String, String>, DeviceData>() {
                 @Override
                 public void flatMap(Tuple2<String, String> value, Collector<DeviceData> out) {
                     String message = value.f0;
                     String topic = value.f1;
                     List<DeviceData> d = dataHandle(message, topic);
                     for (DeviceData line : d) {
                         out.collect(line);
                     }
                 }
             });
     //打印输出
     dataStream.print();
     //添加 8s sink
     dataStream.addSink(new GBase8sSink()).name("gbase8s").setParallelism(2);

     env.execute("mqttFlinkMain");
 }

 private static List<DeviceData> dataHandle(String message, String topic) {
     List<DeviceData> d = new ArrayList<>();
     try {
         System.out.println("message = " message);
         System.out.println("topic = " topic);
         Gson g = new Gson();
         DeviceData dd = new DeviceData();
         Hashtable<String,String>  mdata = g.fromJson(message,Hashtable.class);

         dd.setGmac(mdata.get("gmac"));
         dd.setV1(mdata.get("v1"));
         dd.setV2(mdata.get("v2"));
         String name = deviceMap.get(mdata.get("gmac"));
         dd.setName(name);
         dd.setUpdatetime(System.currentTimeMillis() "");

         d.add(dd);

     } catch (Throwable t) {
         t.printStackTrace();
     }
     return d;
 }
}

总结与展望

启动flink开始接受数据

启动数据模拟程序,用于模拟mqtt提交

好了,我们可以看到数据已经开始源源不断灌进来了。

通过上面的一个简单模拟,我们构建了一个简单的工业互联网数据处理模型。

我们不难发现,CDC并没有提到,CDC部分其实也是非常有意思的部分,但不在我们本次的讨论范围之内,未来会带来相关部分的内容。

参考文献

https://www.sohu.com/a/339738486_99916165

https://blog.csdn.net/u012447842/article/details/89175772

https://zhuanlan.zhihu.com/p/143696144

http://blog.itpub.net/69953737/viewspace-2741287/

https://pingcap.com/blog-cn/when-tidb-and-flink-are-combined/

https://blog.csdn.net/qq_30187071/article/details/110429771

0 人点赞