1、概述
Flux 是一种开源数据脚本语言,旨在查询、分析和处理数据。Flux支持多种数据源类型,包括:
- 时间序列数据库(例如InfluxDB)
- 关系型 SQL 数据库 (例如MySQL和PostgreSQL)
- CSV
Flux 将用于查询、处理、编写和操作数据的代码统一为单一语法。该语言被设计为可用、可读、灵活、可组合、可测试、可贡献和可共享。
要从概念上理解 Flux 的工作原理,请考虑水处理过程。水从水源中抽取,受到需求的限制,通过一系列管道输送到不同的站点进行改造(去除沉积物、净化等),并以可消耗的状态输送。
2、语法基础
2.1、管道转发运算符
管道转发运算符:|> 将一个函数的输出作为输入发送到下一个函数。在“水处理比喻”中,管道转发运算符是通过管道传送水(或数据)的管道。
代码语言:javascript复制data
|> someFunction()
|> anotherFunction()
2.2、简单的表达式
Flux 支持基本表达式。例如:
代码语言:javascript复制1 1
// Returns 2
10 * 3
// Returns 30
(12.0 18.0) / (2.0 ^ 2.0) (240.0 % 55.0)
// Returns 27.5
"John " "Doe " "is here!"
// Returns John Doe is here!
2.3、谓词表达式
谓词表达式使用“比较运算符、逻辑运算符”或两者来比较值,并计算为true或false。例如:
代码语言:javascript复制"John" == "John"
// Returns true
41 < 30
// Returns false
"John" == "John" and 41 < 30
// Returns false
"John" == "John" or 41 < 30
// Returns true
2.4、变量
使用赋值运算符:= 将表达式分配给变量 。使用变量的名称(标识符)返回其值:
代码语言:javascript复制s = "foo" // string
i = 1 // integer
f = 2.0 // float (floating point number)
s // Returns foo
i // Returns 1
f // Returns 2.0
变量可以分配给任何Flux 数据类型。
2.5、数据类型
Flux 支持多种数据类型,分为以下几类:
- 基本类型
- 复合类型
- 正则表达式类型
2.5.1、基本类型
Flux 的基本类型有:Boolean、Duration、String、Time、Float、Integer。
代码语言:javascript复制// Boolean
true
// Duration
23h4m5s
// String
"foo"
// Time
2021-01-01T00:00:00Z
// Float
1.0
// Integer
1
以下基本类型没有文字语法,但可以通过其他方式创建:
- Bytes
- Unsigned integers
- Nulls
2.5.2、复合类型
Flux 复合类型是从 Flux 基本类型构造的。分别有:Records、Arrays、Dictionaries、Functions
Records:是键值对的集合。每个键都是一个字符串。每个值可以是不同的数据类型,类似于JSON。例如:
代码语言:javascript复制{name:"Jim", age: 42, "favorite color": "red"}
Arrays:这个就很好理解了,就是相同类型值的集合。例如:
代码语言:javascript复制["foo","bar","baz","quz"]
Dictionaries:是具有相同类型的键和相同类型的值的键值对的集合。例如:
代码语言:javascript复制[1: "foo", 2: "bar"]
可用于用于dict.get()访问字典中的元素:
代码语言:javascript复制import "dict"
d = [1: "foo", 2: "bar"]
dict.get(dict: d, key: "1", default: "")
// Returns foo
Functions:是使用一组参数来执行操作的代码块。函数可以是命名的或匿名的。在括号 (()) 中定义参数,并使用箭头运算符 (=>) 将参数传递到运算中。例如:
代码语言:javascript复制square = (n) => n * n
square(n:3)
// Returns 9
除此之外还有一个 Predicate functions:谓词函数使用谓词表达式来计算输入并返回true 或 false。例如:
代码语言:javascript复制examplePredicate = (v) => v == "foo"
examplePredicate(v: "foo")
// Returns true
examplePredicate(v: "bar")
// Returns false
2.5.3、正则表达式类型
正则表达式是用于计算字符串的正则表达式模式。在谓词表达式中或与regexp包一起使用正则表达式。
代码语言:javascript复制regex = /^foo/
"foo" =~ regex
// Returns true
"bar" =~ regex
// Returns false
2.6、Packages
Flux 标准库被组织成包含函数和特定于包的选项的包。 默认情况下加载 Universe 包。 要加载其他包,请在 Flux 脚本的开头包含每个包的导入语句。
代码语言:javascript复制import "array"
import "math"
import "influxdata/influxdb/sample"
2.7、基本语法尝试
通过阅读上述部分后,你现在可以开始在实际用例中应用这些基本原则,例如创建数据流变量、自定义函数等。
2.7.1、定义数据流变量
Flux 中变量赋值的一种常见用例是为一个或多个输入数据流创建变量。 以下示例是使用sample.data() 查询空气传感器的数据并将不同的数据分配给相应的变量中:
代码语言:javascript复制import "influxdata/influxdb/sample"
data =
sample.data(set: "airSensor")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "airSensors")
temperature = data
|> filter(fn: (r) => r._field == "temperature")
humidity = data
|> filter(fn: (r) => r._field == "humidity")
这里的含义是查询过去15分钟 “airSensor”测量下的温度数据和湿度数据,并分别赋于变量:temperature 和humidity 中
2.7.2、自定义函数
创建一个函数 topN,返回 N 列中具有最高值的 _value 。 将输入流 (<-) 和要返回的结果 (n) 传到自定义函数中。 使用 sort() 和 limit() 查找数据集中的前 n 个结果。
代码语言:javascript复制topN = (tables=<-, n) =>
tables
|> sort(desc: true)
|> limit(n: n)
使用上面定义的自定义函数topN和上面定义的humidity变量,返回每个table中的前三个数据点。
代码语言:javascript复制humidity
|> topN(n:3)
3、构建最基本的查询
大多数 Flux 查询都遵循相同的基本结构。这里主要介绍熟悉使用 Flux 查询数据时要使用的基本概念和步骤。
3.1、基本查询结构
大多数基本 Flux 查询都包含以下步骤:
- Source:桶数据源
- Filter:过滤函数,满足条件的行包含在函数输出中。不满足指定条件的行将被删除。Flux 过滤函数有:range()、filter();其中range()是根据时间过滤;filter() 是根据指定列的值过滤
- Shape:许多查询需要修改数据结构。常见的数据调整包括 按列值或按时间重新分组数据或将列值转换为行。其中包括:group()、window() 、_start、_stop、pivot()、drop()、keep()
- Process:处理数据可以采取多种形式,包括:聚合数据、选择特定数据点、重写行、发送通知。
代码语言:javascript复制aggregateWindow()是一个辅助函数,可以塑造和处理数据。该函数按时间对数据进行开窗和分组,然后将聚合 或selector 函数应用于重构的表。
from(bucket: "example-bucket") // Source:查询example-bucket下的数据
|> range(start: -1d) // Filter:筛选过去1天的数据
|> filter(fn: (r) => r._field == "foo") // Filter:过滤字段值
|> group(columns: ["sensorID"]) // Shape:根据sensorID进行分组
|> mean() // Process:聚合函数
3.2、编写基本查询
经过上面的讲解和赘述,接下来我们试着写一个最基本的 Flux 查询来查询数据,按时间和列值过滤数据,然后应用聚合函数进行平均和分组。
1、导入influxdata/influxdb/sample包 并使用sample.data()函数 加载airSensor示例数据集。
代码语言:javascript复制import "influxdata/influxdb/sample"
sample.data(set: "airSensor")
此处的 sample.data(set: "airSensor") 也可以替换为:from(bucket: "airSensor")
2、将返回的数据转发到管道 range() 中,按时间过滤数据。返回最近1小时的数据。
代码语言:javascript复制import "influxdata/influxdb/sample"
sample.data(set: "airSensor")
|> range(start: -1h)
3、用 filter() 根据列值过滤行。在此示例中,仅返回包含字段值co的行。
代码语言:javascript复制import "influxdata/influxdb/sample"
sample.data(set: "airSensor")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "co")
4、用 mean() 计算数据的平均值。由于 InfluxDB 按系列对数据进行分组, mean()因此为每个唯一值返回一个sensor_id,其中包含单行以及该列中的平均值_value。
代码语言:javascript复制import "influxdata/influxdb/sample"
sample.data(set: "airSensor")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "co")
|> mean()
5、用 group() 进行分组:
代码语言:javascript复制import "influxdata/influxdb/sample"
sample.data(set: "airSensor")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "co")
|> mean()
|> group()
这个查询的结果如下所示:
_field | _measurement | sensor_id | _value |
---|---|---|---|
co | airSensors | TLM0100 | 0.4233 |
co | airSensors | TLM0101 | 0.4312 |
co | airSensors | TLM0102 | 0.4211 |
4、结尾
本期内容我们探讨了Flux的基础知识,包括概念、基础语法和基本操作。结合《influxDB初识,一个高效的时序数据库》这篇文章基本能够掌握Flux ,如果你有任何问题或想要分享你的观点,请在下方评论区提出,下一期内容将使用springboot整合Flux,实现查询和插入数据。有关更多详细的知识也可以阅读官方文档,感谢你花时间阅读我的博客!