我必须承认,这篇文章只是与Grafana和InfluxDB一起玩的借口。InfluxDB是一个很酷的数据库,专门用于处理时间序列数据。Grafana是一个用于时间序列分析的开源工具。我想构建一个简单的原型。这个想法是:
- 一个Arduino设备(ESP32向Mosquitto服务器发出MQTT事件。我将使用电位计来模拟一个传感器。例如,想象一下温度传感器而不是电位计。这是我在以前的项目中使用过的电路 。
- 一个Python脚本将在我的Raspberry Pi上侦听MQTT事件,并将值保存到InfluxDB数据库。
- 我将用Grafana监控电位计给出的时间序列的状态。
- 当10秒内的平均值高于阈值时,我将在Grafana中创建一个警报。这将在警报更改其状态时触发WebHook。
- 一个Python Flask服务器,一个微服务,将监听WebHook并发出MQTT事件,具体取决于其状态。
- 一个NodeMcu(一种Arduino设备)将监听此MQTT事件并激活LED。如果警报为ON,它将发出红色LED信号;如果警报为OFF,它将发出绿色LED信号。
服务器
正如我之前所说,我们需要三台服务器:
- MQTT服务器(Mosquitto)
- InfluxDB服务器
- Grafana服务器
我们将使用Docker。Docker主机将在Raspberry Pi3上运行。Raspberry Pi是一个ARM设备,因此我们需要这个架构的Docker镜像。
代码语言:javascript复制version: '2'
services:
mosquitto:
image: pascaldevink/rpi-mosquitto
container_name: moquitto
ports:
- "9001:9001"
- "1883:1883"
restart: always
influxdb:
image: hypriot/rpi-influxdb
container_name: influxdb
restart: always
environment:
- INFLUXDB_INIT_PWD="password"
- PRE_CREATE_DB="iot"
ports:
- "8083:8083"
- "8086:8086"
volumes:
- ~/docker/rpi-influxdb/data:/data
grafana:
image: fg2it/grafana-armhf:v4.6.3
container_name: grafana
restart: always
ports:
- "3000:3000"
volumes:
- grafana-db:/var/lib/grafana
- grafana-log:/var/log/grafana
- grafana-conf:/etc/grafana
volumes:
grafana-db:
driver: local
grafana-log:
driver: local
grafana-conf:
driver: local
ESP32
ESP32部分非常简单。我们只需要将电位器连接到Esp32。电位器有三个引脚:GND,信号和Vcc。对于Signal,我们将使用引脚32。
我们只需要配置我们的Wi-Fi网络,连接到我们的MQTT服务器,并在每个循环中发出电位计值。
代码语言:javascript复制#include <PubSubClient.h>
#include <WiFi.h>
const int potentiometerPin = 32;
// Wi-Fi configuration
const char * ssid = "my_wifi_ssid";
const char * password = "my_wifi_password";
// MQTT configuration
const char * server = "192.168.1.111";
const char * topic = "/pot";
const char * clientName = "com.gonzalo123.esp32";
String payload;
WiFiClient wifiClient;
PubSubClient client(wifiClient);
void wifiConnect() {
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.print("WiFi connected.");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
void mqttReConnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
if (client.connect(clientName)) {
Serial.println("connected");
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void mqttEmit(String topic, String value) {
client.publish((char * ) topic.c_str(), (char * ) value.c_str());
}
void setup() {
Serial.begin(115200);
wifiConnect();
client.setServer(server, 1883);
delay(1500);
}
void loop() {
if (!client.connected()) {
mqttReConnect();
}
int current = (int)((analogRead(potentiometerPin) * 100) / 4095);
mqttEmit(topic, (String) current);
delay(500);
}
MQTT监听器
ESP32发出一个带有电位计值的事件(“/ pot”)。因此,我们将创建一个监听MQTT的MQTT监听器,并将该值持久保存到InfluxDB。
代码语言:javascript复制import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import datetime
import logging
def persists(msg):
current_time = datetime.datetime.utcnow().isoformat()
json_body = [
{
"measurement": "pot",
"tags": {},
"time": current_time,
"fields": {
"value": int(msg.payload)
}
}
]
logging.info(json_body)
influx_client.write_points(json_body)
logging.basicConfig(level=logging.INFO)
influx_client = InfluxDBClient('docker', 8086, database='iot')
client = mqtt.Client()
client.on_connect = lambda self, mosq, obj, rc: self.subscribe("/pot")
client.on_message = lambda client, userdata, msg: persists(msg)
client.connect("docker", 1883, 60)
client.loop_forever()
Grafana
在格拉法纳,我们需要做两件事。首先,我们将从InfluxDB服务器创建一个数据源。从这里开始,它非常简单。
接下来,我们将创建一个仪表板。我们只有一个时间序列在电位器的值内。我必须承认,我的仪表板上有许多我为了好玩而创造的东西。
那是我用来绘制主图的查询。
代码语言:javascript复制SELECT
last("value") FROM "pot"
WHERE
time >= now() - 5m
GROUP BY
time($interval) fill(previous)
在这里,我们可以看到仪表板。
在这里,我们可以看到我的警报配置:
我还使用WebHook创建了一个通知通道。当警报状态发生变化时,Grafana将使用此WebHook进行通知。
WebHook Listener
Grafana将发出一个WebHook,因此我们需要一个REST端点来收集WebHook调用。我通常使用PHP / Lumen来创建REST服务器,但是,在这个项目中,我将使用Python和Flask。
我们需要处理HTTP Basic Auth并发出MQTT事件。MQTT是一个非常简单的协议,但它有一个非常好的功能,就像这里的手套一样。让我解释。
想象一下,我们已经启动并运行了系统,状态“正常”。现在,我们连接一个设备(例如一个大红/绿灯)。由于在我们连接灯之前“ok”事件被触发,我们的绿灯不会打开。如果我们想看到任何光线,我们需要等待“警报”事件。这不酷。
MQTT允许我们“保留”消息。这意味着我们可以使用“retain”标志向一个主题发出消息,并且当我们稍后将一个设备连接到此主题时,它将接收该消息。在这里,它正是我们所需要的。
代码语言:javascript复制from flask import Flask
from flask import request
from flask_httpauth import HTTPBasicAuth
import paho.mqtt.client as mqtt
import json
client = mqtt.Client()
app = Flask(__name__)
auth = HTTPBasicAuth()
# http basic auth credentials
users = {
"user": "password"
}
@auth.get_password
def get_pw(username):
if username in users:
return users.get(username)
return None
@app.route('/alert', methods=['POST'])
@auth.login_required
def alert():
client.connect("docker", 1883, 60)
data = json.loads(request.data.decode('utf-8'))
if data['state'] == 'alerting':
client.publish(topic="/alert", payload="1", retain=True)
elif data['state'] == 'ok':
client.publish(topic="/alert", payload="0", retain=True)
client.disconnect()
return "ok"
if __name__ == "__main__":
app.run(host='0.0.0.0')
NodeMcu
最后,NodeMcu。这部分类似于ESP32。我们的LED位于引脚4和5上。我们还需要配置Wi-Fi并连接到MQTT服务器。NodeMcu和ESP32是类似的设备,但不一样。例如,我们需要使用不同的库来连接到Wi-Fi。
该设备将监听MQTT事件并触发一个或另一个LED,具体取决于其状态。
代码语言:javascript复制#include <PubSubClient.h>
#include <ESP8266WiFi.h>
const int ledRed = 4;
const int ledGreen = 5;
// Wi-Fi configuration
const char * ssid = "my_wifi_ssid";
const char * password = "my_wifi_password";
// MQTT configuration
const char * server = "192.168.1.111";
const char * topic = "/alert";
const char * clientName = "com.gonzalo123.nodemcu";
int value;
int percent;
String payload;
WiFiClient wifiClient;
PubSubClient client(wifiClient);
void wifiConnect() {
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.print("WiFi connected.");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
void mqttReConnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
if (client.connect(clientName)) {
Serial.println("connected");
client.subscribe(topic);
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void callback(char * topic, byte * payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
String data;
for (int i = 0; i < length; i ) {
data = (char) payload[i];
}
cleanLeds();
int value = data.toInt();
switch (value) {
case 1:
digitalWrite(ledRed, HIGH);
break;
case 0:
digitalWrite(ledGreen, HIGH);
break;
}
Serial.print("] value:");
Serial.println((int) value);
}
void cleanLeds() {
digitalWrite(ledRed, LOW);
digitalWrite(ledGreen, LOW);
}
void setup() {
Serial.begin(9600);
pinMode(ledRed, OUTPUT);
pinMode(ledGreen, OUTPUT);
cleanLeds();
Serial.println("start");
wifiConnect();
client.setServer(server, 1883);
client.setCallback(callback);
delay(1500);
}
void loop() {
Serial.print(".");
if (!client.connected()) {
mqttReConnect();
}
client.loop();
delay(500);
}
原文标题《Playing With Docker, MQTT, Grafana, InfluxDB, Python, and Arduino》
作者:Gonzalo Ayuso
译者:February
不代表云加社区观点,更多详情请查看原文链接