物联网时代的答案 - Apache IoTDB

2021-03-14 20:12:09 浏览数 (1)

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

Apache 软件基金会于北京时间2020年9月23日宣布Apache IoTDB毕业成为 Apache 顶级项目。

截止目前,Github上的Star数只有1.2k,行动起来吧,朋友们!

前言

根据官网的介绍:

Apache IoTDB(物联网数据库)是一体化收集、存储、管理与分析物联网时序数据的软件系统。Apache IoTDB 采用轻量式架构,具有高性能和丰富的功能,并与Apache Hadoop、Spark和Flink等进行了深度集成,可以满足工业物联网领域的海量数据存储、高速数据读取和复杂数据分析需求。

至此我们了解到Apache IoTDB(物联网数据库)是为时间序列数据设计的集成数据管理引擎。它为用户提供

  • 提供数据收集,存储和分析一体化服务
  • 体量轻、性能高、易使用,完美对接Hadoop与Spark生态
  • 可以满足大规模数据存储,高速数据提取和复杂数据分析,因此适用于工业物联网应用中海量时间序列数据高速写入和复杂分析查询的需求

Apache IoTDB最初由清华大学软件学院团队开发。Apache 软件基金会于北京时间 2020 年9月23日宣布 Apache IoTDB 毕业成为 Apache 顶级项目!

在高端制造中,有许多设备配备了传感器,用于收集工作状态数据。气象站,风力涡轮机是常见的高端设备。这些设备可以运行TsFile在本地存储数据。这样,TsFile可以提供具有高吞吐量,高压缩率和毫秒查询的数据保存能力。与TsFile-Sync工具一起,TsFiles可以传输到数据中心。

除了IoTDB引擎,IoTDB还开发了一些组件来提供更好的IoT服务。以下将所有组件称为IoTDB套件,而IoTDB专门指IoTDB引擎。IoTDB套件可以在实际情况下提供一系列功能,例如数据收集,数据写入,数据存储,数据查询,数据可视化和数据分析。下图显示了IoTDB套件的所有组件带来的整体应用程序架构。

如上图所示,用户可以使用JDBC将设备上的传感器收集的时间序列数据导入本地/远程IoTDB。这些时间序列数据可以是系统状态数据(例如服务器负载和CPU内存等),消息队列数据,来自应用程序的时间序列数据或数据库中的其他时间序列数据。用户还可以将数据直接写入TsFile(本地或HDFS上)。对于写入IoTDB和本地TsFile的数据,用户可以使用TsFileSync工具将TsFile同步到HDFS,从而在Hadoop或Spark数据处理平台上执行数据处理任务,例如异常检测和机器学习,分析结果可以用相同的方式写回TsFile。此外,IoTDB和TsFile提供客户端工具,以满足用户以SQL形式,脚本形式和图形形式编写和查看数据的各种需求。

Apache IoTDB主要特点

高通量读写

Apache IoTDB可以支持数百万个低功耗和智能联网设备的高速写访问。它还提供高速读取访问以检索数据。

高效的目录结构

Apache IoTDB可以使用模糊搜索策略针对时间序列数据的复杂目录有效地组织来自IoT设备和大量时间序列数据的复杂数据结构。

丰富的查询语义

Apache IoTDB可以支持跨设备和传感器的时间序列数据的时间对齐,时间序列字段中的计算以及时间维度上的丰富聚合功能。

硬件成本低

Apache IoTDB可以达到磁盘存储的高压缩率(将1GB数据存储在硬盘上的成本不到0.23美元)。

灵活的部署

Apache IoTDB可以为用户提供云上的一键式安装工具,桌面上的终端工具以及云平台与本地计算机之间的桥接工具(数据同步工具)。

与开源生态系统的紧密集成

Apache IoTDB可以支持分析生态系统,例如Hadoop,Spark和Grafana作为可视化工具。

Apache IoTDB整体架构

IoTDB套件由若干个组件构成,共同形成“数据收集-数据写入-数据存储-数据查询-数据可视化-数据分析”等一系列功能。下图展示了使用IoTDB套件全部组件后形成的整体应用架构。所有组件形成IoTDB套件,而IoTDB特指其中的时间序列数据库组件。

用户可以通过JDBC将来自设备上传感器采集的时序数据、服务器负载和CPU内存等系统状态数据、消息队列中的时序数据、应用程序的时序数据或者其他数据库中的时序数据导入到本地或者远程的IoTDB中。用户还可以将上述数据直接写成本地(或位于HDFS上)的TsFile文件。

  • 对于写入到IoTDB的数据以及本地的TsFile文件,可以通过同步工具TsFileSync将数据文件同步到HDFS上,进而实现在Hadoop或Spark的数据处理平台上的诸如异常检测、机器学习等数据处理任务。
  • 对于写入到HDFS或者本地的TsFile文件,可以利用TsFile-Hadoop或TsFile-Spark连接器允许Hadoop或Spark进行数据处理。
  • 对于分析的结果,可以写回成TsFile文件。

IoTDB和TsFile还提供了相应的客户端工具,满足用户查看和写入数据的SQL形式、脚本形式和图形化形式等多种需求。

牛刀初试

我们用官网的一个例子来介绍一下 Apache IoTDB 的数据组织结构和模型。并通过客户端进行简单的SQL操作。

假设我们有如下的数据层级:

其层级关系为:集团层-电场层-设备层-传感器层。其中ROOT为根节点,传感器层的每一个节点称为叶子节点。在使用IoTDB的过程中,我们可以直接将由ROOT节点到每一个叶子节点路径上的属性用"."连接,将其作为一个IoTDB的时间序列的名称。图中最左侧的路径可以生成一个名为ROOT.ln.wf01.wt01.status的时间序列。

得到时间序列的名称之后,我们需要根据数据的实际场景和规模设置存储组。由于在本文所述场景中,每次到达的数据通常以集团为单位(即数据可能为跨电场、跨设备的),为了写入数据时避免频繁切换IO降低系统速度,且满足用户以集团为单位进行物理隔离数据的要求,我们将存储组设置在集团层。

根据模型结构,IoTDB中涉及如下基本概念:

  • 设备 设备指的是在实际场景中拥有传感器的装置。在IoTDB当中,所有的传感器都应有其对应的归属的设备。
  • 传感器 传感器是指在实际场景中的一种检测装置,它能感受到被测量的信息,并能将感受到的信息按一定规律变换成为电信号或其他所需形式的信息输出并发送给IoTDB。在IoTDB当中,存储的所有的数据及路径,都是以传感器为单位进行组织。
  • 存储组 用户可以将任意前缀路径设置成存储组。如有4条时间序列root.vehicle.d1.s1, root.vehicle.d1.s2, root.vehicle.d2.s1, root.vehicle.d2.s2,路径root.vehicle下的两个设备d1,d2可能属于同一个业主,或者同一个厂商,因此关系紧密。这时候就可以将前缀路径root.vehicle指定为一个存储组,这将使得IoTDB将其下的所有设备的数据存储在同一个文件夹下。未来root.vehicle下增加了新的设备,也将属于该存储组。

注意:不允许将一个完整路径(如上例的root.vehicle.d1.s1)设置成存储组。

设置合理数量的存储组可以带来性能的提升:既不会因为产生过多的存储文件(夹)导致频繁切换IO降低系统速度(并且会占用大量内存且出现频繁的内存-文件切换),也不会因为过少的存储文件夹(降低了并发度从而)导致写入命令阻塞。

当然还有例如路径、前缀路径等一些其他的概念,大家可以参考官网的文档。

接下来我们安装IoTDB,安装前需要保证设备上配有JDK>=1.8的运行环境,并配置好JAVA_HOME环境变量。IoTDB支持多种安装途径。用户可以使用三种方式对IoTDB进行安装——下载二进制可运行程序、使用源码、使用docker镜像。

我们可以在这里下载安装文件:http://iotdb.incubator.apache.org/Download/

用户可以使用sbin文件夹下的start-server脚本启动IoTDB。Linux系统与MacOS系统启动命令如下:

代码语言:javascript复制
> nohup sbin/start-server.sh >/dev/null 2>&1 &
or
> nohup sbin/start-server.sh -c <conf_path> -rpc_port <rpc_port> >/dev/null 2>&1 &

启动后出现如图提示即为启动成功。

代码语言:javascript复制
 _____       _________  ______   ______
|_   _|     |  _   _  ||_   _ `.|_   _ 
  | |   .--.|_/ | | _|  | | `.  | |_) |
  | | / .'`   | |      | |  | | |  __'.
 _| |_| __. | _| |_    _| |_.' /_| |__) |
|_____|'.__.' |_____|  |______.'|_______/  version x.x.x


IoTDB> login successfully
IoTDB>

在这里,我们首先介绍一下使用Cli工具创建时间序列、插入数据并查看数据的方法。

首先使用SET STORAGE GROUP语句定义存储组。SQL语句如下:

IoTDB> SET STORAGE GROUP TO root.ln

我们可以使用SHOW STORAGE GROUP语句来查看系统当前所有的存储组,SQL语句如下:

IoTDB> SHOW STORAGE GROUP

执行结果为:

代码语言:javascript复制
 ----------------------------------- 
|                      Storage Group|
 ----------------------------------- 
|                            root.ln|
 ----------------------------------- 
storage group number = 1

存储组设定后,使用CREATE TIMESERIES语句可以创建新的时间序列,创建时间序列时需要定义数据的类型和编码方式。此处我们创建两个时间序列,SQL语句如下:

代码语言:javascript复制
IoTDB> CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN
IoTDB> CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=RLE

为了查看指定的时间序列,我们可以使用SHOW TIMESERIES < Path>语句,其中< Path>表示时间序列对应的路径,默认值为空,表示查看系统中所有的时间序列。下面是两个例子:

使用SHOW TIMESERIES语句查看系统中存在的所有时间序列,SQL语句如下:

代码语言:javascript复制
IoTDB> SHOW TIMESERIES

执行结果为:

代码语言:javascript复制
 ------------------------------- --------------- -------- -------- 
|                     Timeseries|  Storage Group|DataType|Encoding|
 ------------------------------- --------------- -------- -------- 
|       root.ln.wf01.wt01.status|        root.ln| BOOLEAN|   PLAIN|
|  root.ln.wf01.wt01.temperature|        root.ln|   FLOAT|     RLE|
 ------------------------------- --------------- -------- -------- 
Total timeseries number = 2

查看具体的时间序列root.ln.wf01.wt01.status的SQL语句如下:

代码语言:javascript复制
IoTDB> SHOW TIMESERIES root.ln.wf01.wt01.status

执行结果为:

代码语言:javascript复制
 ------------------------------ -------------- -------- -------- 
|                    Timeseries| Storage Group|DataType|Encoding|
 ------------------------------ -------------- -------- -------- 
|      root.ln.wf01.wt01.status|       root.ln| BOOLEAN|   PLAIN|
 ------------------------------ -------------- -------- -------- 
Total timeseries number = 1

接下来,我们使用INSERT语句向root.ln.wf01.wt01.status时间序列中插入数据,在插入数据时需要首先指定时间戳和路径后缀名称:

代码语言:javascript复制
IoTDB> INSERT INTO root.ln.wf01.wt01(timestamp,status) values(100,true);

我们也可以向多个时间序列中同时插入数据,这些时间序列同属于一个时间戳:

代码语言:javascript复制
IoTDB> INSERT INTO root.ln.wf01.wt01(timestamp,status,temperature) values(200,false,20.71)

最后,我们查询之前插入的数据。使用SELECT语句我们可以查询指定的时间序列的数据结果,SQL语句如下:

代码语言:javascript复制
IoTDB> SELECT status FROM root.ln.wf01.wt01

查询结果如下:

代码语言:javascript复制
 ----------------------- ------------------------ 
|                   Time|root.ln.wf01.wt01.status|
 ----------------------- ------------------------ 
|1970-01-01T08:00:00.100|                    true|
|1970-01-01T08:00:00.200|                   false|
 ----------------------- ------------------------ 
Total line number = 2

我们也可以查询多个时间序列的数据结果,SQL语句如下:

代码语言:javascript复制
IoTDB> SELECT * FROM root.ln.wf01.wt01

查询结果如下:

代码语言:javascript复制
 ----------------------- -------------------------- ----------------------------- 
|                   Time|  root.ln.wf01.wt01.status|root.ln.wf01.wt01.temperature|
 ----------------------- -------------------------- ----------------------------- 
|1970-01-01T08:00:00.100|                      true|                         null|
|1970-01-01T08:00:00.200|                     false|                        20.71|
 ----------------------- -------------------------- ----------------------------- 
Total line number = 2

输入quit或exit可退出Cli结束本次会话。

代码语言:javascript复制
IoTDB> quit

代码语言:javascript复制
IoTDB> exit

用户可以使用$IOTDB_HOME/sbin文件夹下的stop-server脚本停止IoTDB。

Linux系统与MacOS系统停止命令如下:

$sbin/stop-server.sh

使用JDBC连接IoTDB

对于我们开发者来讲,可以在 MAVEN 中使用 IoTDB JDBC:

代码语言:javascript复制
<dependencies>
    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <artifactId>iotdb-jdbc</artifactId>
      <version>0.10.1</version>
    </dependency>
</dependencies>

示例如下:

代码语言:javascript复制
import java.sql.*;
import org.apache.iotdb.jdbc.IoTDBSQLException;

public class JDBCExample {
  /**
   * Before executing a SQL statement with a Statement object, you need to create a Statement object using the createStatement() method of the Connection object.
   * After creating a Statement object, you can use its execute() method to execute a SQL statement
   * Finally, remember to close the 'statement' and 'connection' objects by using their close() method
   * For statements with query results, we can use the getResultSet() method of the Statement object to get the result set.
   */
  public static void main(String[] args) throws SQLException {
    Connection connection = getConnection();
    if (connection == null) {
      System.out.println("get connection defeat");
      return;
    }
    Statement statement = connection.createStatement();
    //Create storage group
    try {
      statement.execute("SET STORAGE GROUP TO root.demo");
    }catch (IoTDBSQLException e){
      System.out.println(e.getMessage());
    }


    //Show storage group
    statement.execute("SHOW STORAGE GROUP");
    outputResult(statement.getResultSet());

    //Create time series
    //Different data type has different encoding methods. Here use INT32 as an example
    try {
      statement.execute("CREATE TIMESERIES root.demo.s0 WITH DATATYPE=INT32,ENCODING=RLE;");
    }catch (IoTDBSQLException e){
      System.out.println(e.getMessage());
    }
    //Show time series
    statement.execute("SHOW TIMESERIES root.demo");
    outputResult(statement.getResultSet());
    //Show devices
    statement.execute("SHOW DEVICES");
    outputResult(statement.getResultSet());
    //Count time series
    statement.execute("COUNT TIMESERIES root");
    outputResult(statement.getResultSet());
    //Count nodes at the given level
    statement.execute("COUNT NODES root LEVEL=3");
    outputResult(statement.getResultSet());
    //Count timeseries group by each node at the given level
    statement.execute("COUNT TIMESERIES root GROUP BY LEVEL=3");
    outputResult(statement.getResultSet());


    //Execute insert statements in batch
    statement.addBatch("insert into root.demo(timestamp,s0) values(1,1);");
    statement.addBatch("insert into root.demo(timestamp,s0) values(1,1);");
    statement.addBatch("insert into root.demo(timestamp,s0) values(2,15);");
    statement.addBatch("insert into root.demo(timestamp,s0) values(2,17);");
    statement.addBatch("insert into root.demo(timestamp,s0) values(4,12);");
    statement.executeBatch();
    statement.clearBatch();

    //Full query statement
    String sql = "select * from root.demo";
    ResultSet resultSet = statement.executeQuery(sql);
    System.out.println("sql: "   sql);
    outputResult(resultSet);

    //Exact query statement
    sql = "select s0 from root.demo where time = 4;";
    resultSet= statement.executeQuery(sql);
    System.out.println("sql: "   sql);
    outputResult(resultSet);

    //Time range query
    sql = "select s0 from root.demo where time >= 2 and time < 5;";
    resultSet = statement.executeQuery(sql);
    System.out.println("sql: "   sql);
    outputResult(resultSet);

    //Aggregate query
    sql = "select count(s0) from root.demo;";
    resultSet = statement.executeQuery(sql);
    System.out.println("sql: "   sql);
    outputResult(resultSet);

    //Delete time series
    statement.execute("delete timeseries root.demo.s0");

    //close connection
    statement.close();
    connection.close();
  }

  public static Connection getConnection() {
    // JDBC driver name and database URL
    String driver = "org.apache.iotdb.jdbc.IoTDBDriver";
    String url = "jdbc:iotdb://127.0.0.1:6667/";

    // Database credentials
    String username = "root";
    String password = "root";

    Connection connection = null;
    try {
      Class.forName(driver);
      connection = DriverManager.getConnection(url, username, password);
    } catch (ClassNotFoundException e) {
      e.printStackTrace();
    } catch (SQLException e) {
      e.printStackTrace();
    }
    return connection;
  }

  /**
   * This is an example of outputting the results in the ResultSet
   */
  private static void outputResult(ResultSet resultSet) throws SQLException {
    if (resultSet != null) {
      System.out.println("--------------------------");
      final ResultSetMetaData metaData = resultSet.getMetaData();
      final int columnCount = metaData.getColumnCount();
      for (int i = 0; i < columnCount; i  ) {
        System.out.print(metaData.getColumnLabel(i   1)   " ");
      }
      System.out.println();
      while (resultSet.next()) {
        for (int i = 1; ; i  ) {
          System.out.print(resultSet.getString(i));
          if (i < columnCount) {
            System.out.print(", ");
          } else {
            System.out.println();
            break;
          }
        }
      }
      System.out.println("--------------------------n");
    }
  }
}

至此为止,我们一次完整的使用IoTDB之旅就结束了。后面的文章中,我还会为大家持续带来更多更深入的实际案例应用。

0 人点赞