Flux脚本语言入门教程

2023-11-08 11:06:13 浏览数 (2)

1、概述

Flux 是一种开源数据脚本语言,旨在查询、分析和处理数据。Flux支持多种数据源类型,包括:

  • 时间序列数据库(例如InfluxDB)
  • 关系型 SQL 数据库 (例如MySQL和PostgreSQL)
  • CSV

Flux 将用于查询、处理、编写和操作数据的代码统一为单一语法。该语言被设计为可用、可读、灵活、可组合、可测试、可贡献和可共享。

要从概念上理解 Flux 的工作原理,请考虑水处理过程。水从水源中抽取,受到需求的限制,通过一系列管道输送到不同的站点进行改造(去除沉积物、净化等),并以可消耗的状态输送。

2、语法基础

2.1、管道转发运算符

管道转发运算符:|> 将一个函数的输出作为输入发送到下一个函数。在“水处理比喻”中,管道转发运算符是通过管道传送水(或数据)的管道。

代码语言:javascript复制
data
  |> someFunction()
  |> anotherFunction()

2.2、简单的表达式

Flux 支持基本表达式。例如:

代码语言:javascript复制
1   1
// Returns 2

10 * 3
// Returns 30

(12.0   18.0) / (2.0 ^ 2.0)   (240.0 % 55.0)
// Returns 27.5

"John "   "Doe "   "is here!"
// Returns John Doe is here!

2.3、谓词表达式

谓词表达式使用“比较运算符、逻辑运算符”或两者来比较值,并计算为truefalse。例如:

代码语言:javascript复制
"John" == "John"
// Returns true

41 < 30
// Returns false

"John" == "John" and 41 < 30
// Returns false

"John" == "John" or 41 < 30
// Returns true

2.4、变量

使用赋值运算符:= 将表达式分配给变量 。使用变量的名称(标识符)返回其值:

代码语言:javascript复制
s = "foo" // string
i = 1 // integer
f = 2.0 // float (floating point number)

s // Returns foo
i // Returns 1
f // Returns 2.0

变量可以分配给任何Flux 数据类型。

2.5、数据类型

Flux 支持多种数据类型,分为以下几类:

  • 基本类型
  • 复合类型
  • 正则表达式类型
2.5.1、基本类型

Flux 的基本类型有:Boolean、Duration、String、Time、Float、Integer。

代码语言:javascript复制
// Boolean
true

// Duration
23h4m5s

// String
"foo"

// Time
2021-01-01T00:00:00Z

// Float
1.0

// Integer
1

以下基本类型没有文字语法,但可以通过其他方式创建:

  • Bytes
  • Unsigned integers
  • Nulls
2.5.2、复合类型

Flux 复合类型是从 Flux 基本类型构造的。分别有:Records、Arrays、Dictionaries、Functions

Records:是键值对的集合。每个键都是一个字符串。每个值可以是不同的数据类型,类似于JSON。例如:

代码语言:javascript复制
{name:"Jim", age: 42, "favorite color": "red"}

Arrays:这个就很好理解了,就是相同类型值的集合。例如:

代码语言:javascript复制
["foo","bar","baz","quz"]

Dictionaries:是具有相同类型的键相同类型的值的键值对的集合。例如:

代码语言:javascript复制
[1: "foo", 2: "bar"]

可用于用于dict.get()访问字典中的元素:

代码语言:javascript复制
import "dict"

d = [1: "foo", 2: "bar"]

dict.get(dict: d, key: "1", default: "")
// Returns foo

Functions:是使用一组参数来执行操作的代码块。函数可以是命名的匿名的。在括号 (()) 中定义参数,并使用箭头运算符 (=>) 将参数传递到运算中。例如:

代码语言:javascript复制
square = (n) => n * n

square(n:3)
// Returns 9

除此之外还有一个 Predicate functions:谓词函数使用谓词表达式来计算输入并返回truefalse。例如:

代码语言:javascript复制
examplePredicate = (v) => v == "foo"

examplePredicate(v: "foo")
// Returns true

examplePredicate(v: "bar")
// Returns false
2.5.3、正则表达式类型

正则表达式是用于计算字符串的正则表达式模式。在谓词表达式中或与regexp包一起使用正则表达式。

代码语言:javascript复制
regex = /^foo/

"foo" =~ regex
// Returns true

"bar" =~ regex
// Returns false

2.6、Packages

Flux 标准库被组织成包含函数和特定于包的选项的包。 默认情况下加载 Universe 包。 要加载其他包,请在 Flux 脚本的开头包含每个包的导入语句。

代码语言:javascript复制
import "array"
import "math"
import "influxdata/influxdb/sample"

2.7、基本语法尝试

通过阅读上述部分后,你现在可以开始在实际用例中应用这些基本原则,例如创建数据流变量、自定义函数等。

2.7.1、定义数据流变量

Flux 中变量赋值的一种常见用例是为一个或多个输入数据流创建变量。 以下示例是使用sample.data() 查询空气传感器的数据并将不同的数据分配给相应的变量中:

代码语言:javascript复制
import "influxdata/influxdb/sample"

data =
 sample.data(set: "airSensor")
                 |> range(start: -15m)
                 |> filter(fn: (r) => r._measurement == "airSensors")

temperature = data
                 |> filter(fn: (r) => r._field == "temperature")

humidity = data
              |> filter(fn: (r) => r._field == "humidity")

这里的含义是查询过去15分钟 “airSensor”测量下的温度数据和湿度数据,并分别赋于变量:temperature 和humidity 中

2.7.2、自定义函数

创建一个函数 topN,返回 N 列中具有最高值的 _value 。 将输入流 (<-) 和要返回的结果 (n) 传到自定义函数中。 使用 sort() 和 limit() 查找数据集中的前 n 个结果。

代码语言:javascript复制
topN = (tables=<-, n) =>
    tables
        |> sort(desc: true)
        |> limit(n: n)

使用上面定义的自定义函数topN和上面定义的humidity变量,返回每个table中的前三个数据点。

代码语言:javascript复制
humidity
    |> topN(n:3)

3、构建最基本的查询

大多数 Flux 查询都遵循相同的基本结构。这里主要介绍熟悉使用 Flux 查询数据时要使用的基本概念和步骤。

3.1、基本查询结构

大多数基本 Flux 查询都包含以下步骤:

  • Source:桶数据源
  • Filter:过滤函数,满足条件的行包含在函数输出中。不满足指定条件的行将被删除。Flux 过滤函数有:range()、filter();其中range()是根据时间过滤;filter() 是根据指定列的值过滤
  • Shape:许多查询需要修改数据结构。常见的数据调整包括 按列值按时间重新分组数据或将列值转换为行。其中包括:group()、window() 、_start、_stop、pivot()、drop()、keep()
  • Process:处理数据可以采取多种形式,包括:聚合数据、选择特定数据点、重写行、发送通知。

aggregateWindow()是一个辅助函数,可以塑造和处理数据。该函数按时间对数据进行开窗和分组,然后将聚合 或selector 函数应用于重构的表。

代码语言:javascript复制
from(bucket: "example-bucket")              // Source:查询example-bucket下的数据
    |> range(start: -1d)                    // Filter:筛选过去1天的数据
    |> filter(fn: (r) => r._field == "foo") // Filter:过滤字段值
    |> group(columns: ["sensorID"])         // Shape:根据sensorID进行分组
    |> mean()                               // Process:聚合函数

3.2、编写基本查询

经过上面的讲解和赘述,接下来我们试着写一个最基本的 Flux 查询来查询数据,按时间和列值过滤数据,然后应用聚合函数进行平均和分组。

1、导入influxdata/influxdb/sample包 并使用sample.data()函数 加载airSensor示例数据集。

代码语言:javascript复制
import "influxdata/influxdb/sample"

sample.data(set: "airSensor")

此处的 sample.data(set: "airSensor") 也可以替换为:from(bucket: "airSensor")

2、将返回的数据转发到管道 range() 中,按时间过滤数据。返回最近1小时的数据。

代码语言:javascript复制
import "influxdata/influxdb/sample"

sample.data(set: "airSensor")
    |> range(start: -1h)

3、用 filter() 根据列值过滤行。在此示例中,仅返回包含字段值co的行。

代码语言:javascript复制
import "influxdata/influxdb/sample"

sample.data(set: "airSensor")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field == "co")

4、用 mean() 计算数据的平均值。由于 InfluxDB 按系列对数据进行分组, mean()因此为每个唯一值返回一个sensor_id,其中包含单行以及该列中的平均值_value。

代码语言:javascript复制
import "influxdata/influxdb/sample"

sample.data(set: "airSensor")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field == "co")
    |> mean()

5、用 group() 进行分组:

代码语言:javascript复制
import "influxdata/influxdb/sample"

sample.data(set: "airSensor")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field == "co")
    |> mean()
    |> group()

这个查询的结果如下所示:

_field

_measurement

sensor_id

_value

co

airSensors

TLM0100

0.4233

co

airSensors

TLM0101

0.4312

co

airSensors

TLM0102

0.4211

4、结尾

本期内容我们探讨了Flux的基础知识,包括概念、基础语法和基本操作。结合《influxDB初识,一个高效的时序数据库》这篇文章基本能够掌握Flux ,如果你有任何问题或想要分享你的观点,请在下方评论区提出,下一期内容将使用springboot整合Flux,实现查询和插入数据。有关更多详细的知识也可以阅读官方文档,感谢你花时间阅读我的博客!

0 人点赞