使用Docker,MQTT,Grafana,InfluxDB,Python和Arduino

2018-11-29 15:03:37 浏览数 (1)

我必须承认,这篇文章只是与Grafana和InfluxDB一起玩的借口。InfluxDB是一个很酷的数据库,专门用于处理时间序列数据。Grafana是一个用于时间序列分析的开源工具。我想构建一个简单的原型。这个想法是:

  • 一个Arduino设备(ESP32向Mosquitto服务器发出MQTT事件。我将使用电位计来模拟一个传感器。例如,想象一下温度传感器而不是电位计。这是我在以前的项目中使用过的电路 。
  • 一个Python脚本将在我的Raspberry Pi上侦听MQTT事件,并将值保存到InfluxDB数据库。
  • 我将用Grafana监控电位计给出的时间序列的状态。
  • 当10秒内的平均值高于阈值时,我将在Grafana中创建一个警报。这将在警报更改其状态时触发WebHook。
  • 一个Python Flask服务器,一个微服务,将监听WebHook并发出MQTT事件,具体取决于其状态。
  • 一个NodeMcu(一种Arduino设备)将监听此MQTT事件并激活LED。如果警报为ON,它将发出红色LED信号;如果警报为OFF,它将发出绿色LED信号。

服务器

正如我之前所说,我们需要三台服务器:

  • MQTT服务器(Mosquitto)
  • InfluxDB服务器
  • Grafana服务器

我们将使用Docker。Docker主机将在Raspberry Pi3上运行。Raspberry Pi是一个ARM设备,因此我们需要这个架构的Docker镜像。

代码语言:javascript复制
version: '2'
services:
  mosquitto:
    image: pascaldevink/rpi-mosquitto
    container_name: moquitto
    ports:
     - "9001:9001"
     - "1883:1883"
    restart: always
  influxdb:
    image: hypriot/rpi-influxdb
    container_name: influxdb
    restart: always
    environment:
     - INFLUXDB_INIT_PWD="password"
     - PRE_CREATE_DB="iot"
    ports:
     - "8083:8083"
     - "8086:8086"
    volumes:
     - ~/docker/rpi-influxdb/data:/data
  grafana:
    image: fg2it/grafana-armhf:v4.6.3
    container_name: grafana
    restart: always
    ports:
     - "3000:3000"
    volumes:
      - grafana-db:/var/lib/grafana
      - grafana-log:/var/log/grafana
      - grafana-conf:/etc/grafana
volumes:
  grafana-db:
    driver: local  
  grafana-log:
    driver: local
  grafana-conf:
    driver: local

ESP32

ESP32部分非常简单。我们只需要将电位器连接到Esp32。电位器有三个引脚:GND,信号和Vcc。对于Signal,我们将使用引脚32。

我们只需要配置我们的Wi-Fi网络,连接到我们的MQTT服务器,并在每个循环中发出电位计值。

代码语言:javascript复制
#include <PubSubClient.h> 
#include <WiFi.h>
const int potentiometerPin = 32;
// Wi-Fi configuration
const char * ssid = "my_wifi_ssid";
const char * password = "my_wifi_password";
// MQTT configuration
const char * server = "192.168.1.111";
const char * topic = "/pot";
const char * clientName = "com.gonzalo123.esp32";
String payload;
WiFiClient wifiClient;
PubSubClient client(wifiClient);
void wifiConnect() {
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    Serial.println("");
    Serial.print("WiFi connected.");
    Serial.print("IP address: ");
    Serial.println(WiFi.localIP());
}
void mqttReConnect() {
    while (!client.connected()) {
        Serial.print("Attempting MQTT connection...");
        if (client.connect(clientName)) {
            Serial.println("connected");
        } else {
            Serial.print("failed, rc=");
            Serial.print(client.state());
            Serial.println(" try again in 5 seconds");
            delay(5000);
        }
    }
}
void mqttEmit(String topic, String value) {
    client.publish((char * ) topic.c_str(), (char * ) value.c_str());
}
void setup() {
    Serial.begin(115200);
    wifiConnect();
    client.setServer(server, 1883);
    delay(1500);
}
void loop() {
    if (!client.connected()) {
        mqttReConnect();
    }
    int current = (int)((analogRead(potentiometerPin) * 100) / 4095);
    mqttEmit(topic, (String) current);
    delay(500);
}

MQTT监听器

ESP32发出一个带有电位计值的事件(“/ pot”)。因此,我们将创建一个监听MQTT的MQTT监听器,并将该值持久保存到InfluxDB。

代码语言:javascript复制
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import datetime
import logging
def persists(msg):
    current_time = datetime.datetime.utcnow().isoformat()
    json_body = [
        {
            "measurement": "pot",
            "tags": {},
            "time": current_time,
            "fields": {
                "value": int(msg.payload)
            }
        }
    ]
    logging.info(json_body)
    influx_client.write_points(json_body)
logging.basicConfig(level=logging.INFO)
influx_client = InfluxDBClient('docker', 8086, database='iot')
client = mqtt.Client()
client.on_connect = lambda self, mosq, obj, rc: self.subscribe("/pot")
client.on_message = lambda client, userdata, msg: persists(msg)
client.connect("docker", 1883, 60)
client.loop_forever()

Grafana

在格拉法纳,我们需要做两件事。首先,我们将从InfluxDB服务器创建一个数据源。从这里开始,它非常简单。

接下来,我们将创建一个仪表板。我们只有一个时间序列在电位器的值内。我必须承认,我的仪表板上有许多我为了好玩而创造的东西。

那是我用来绘制主图的查询。

代码语言:javascript复制
SELECT 
  last("value") FROM "pot" 
WHERE 
  time >= now() - 5m 
GROUP BY 
  time($interval) fill(previous)

在这里,我们可以看到仪表板。

在这里,我们可以看到我的警报配置:

我还使用WebHook创建了一个通知通道。当警报状态发生变化时,Grafana将使用此WebHook进行通知。

WebHook Listener

Grafana将发出一个WebHook,因此我们需要一个REST端点来收集WebHook调用。我通常使用PHP / Lumen来创建REST服务器,但是,在这个项目中,我将使用Python和Flask。

我们需要处理HTTP Basic Auth并发出MQTT事件。MQTT是一个非常简单的协议,但它有一个非常好的功能,就像这里的手套一样。让我解释。

想象一下,我们已经启动并运行了系统,状态“正常”。现在,我们连接一个设备(例如一个大红/绿灯)。由于在我们连接灯之前“ok”事件被触发,我们的绿灯不会打开。如果我们想看到任何光线,我们需要等待“警报”事件。这不酷。

MQTT允许我们“保留”消息。这意味着我们可以使用“retain”标志向一个主题发出消息,并且当我们稍后将一个设备连接到此主题时,它将接收该消息。在这里,它正是我们所需要的。

代码语言:javascript复制
from flask import Flask
from flask import request
from flask_httpauth import HTTPBasicAuth
import paho.mqtt.client as mqtt
import json
client = mqtt.Client()
app = Flask(__name__)
auth = HTTPBasicAuth()
# http basic auth credentials
users = {
    "user": "password"
}
@auth.get_password
def get_pw(username):
    if username in users:
        return users.get(username)
    return None
@app.route('/alert', methods=['POST'])
@auth.login_required
def alert():
    client.connect("docker", 1883, 60)
    data = json.loads(request.data.decode('utf-8'))
    if data['state'] == 'alerting':
        client.publish(topic="/alert", payload="1", retain=True)
    elif data['state'] == 'ok':
        client.publish(topic="/alert", payload="0", retain=True)
    client.disconnect()
    return "ok"
if __name__ == "__main__":
    app.run(host='0.0.0.0')

NodeMcu

最后,NodeMcu。这部分类似于ESP32。我们的LED位于引脚4和5上。我们还需要配置Wi-Fi并连接到MQTT服务器。NodeMcu和ESP32是类似的设备,但不一样。例如,我们需要使用不同的库来连接到Wi-Fi。

该设备将监听MQTT事件并触发一个或另一个LED,具体取决于其状态。

代码语言:javascript复制
#include <PubSubClient.h>
#include <ESP8266WiFi.h>
const int ledRed = 4;
const int ledGreen = 5;
// Wi-Fi configuration
const char * ssid = "my_wifi_ssid";
const char * password = "my_wifi_password";
// MQTT configuration
const char * server = "192.168.1.111";
const char * topic = "/alert";
const char * clientName = "com.gonzalo123.nodemcu";
int value;
int percent;
String payload;
WiFiClient wifiClient;
PubSubClient client(wifiClient);
void wifiConnect() {
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    Serial.println("");
    Serial.print("WiFi connected.");
    Serial.print("IP address: ");
    Serial.println(WiFi.localIP());
}
void mqttReConnect() {
    while (!client.connected()) {
        Serial.print("Attempting MQTT connection...");
        if (client.connect(clientName)) {
            Serial.println("connected");
            client.subscribe(topic);
        } else {
            Serial.print("failed, rc=");
            Serial.print(client.state());
            Serial.println(" try again in 5 seconds");
            delay(5000);
        }
    }
}
void callback(char * topic, byte * payload, unsigned int length) {
    Serial.print("Message arrived [");
    Serial.print(topic);
    String data;
    for (int i = 0; i < length; i  ) {
        data  = (char) payload[i];
    }
    cleanLeds();
    int value = data.toInt();
    switch (value) {
    case 1:
        digitalWrite(ledRed, HIGH);
        break;
    case 0:
        digitalWrite(ledGreen, HIGH);
        break;
    }
    Serial.print("] value:");
    Serial.println((int) value);
}
void cleanLeds() {
    digitalWrite(ledRed, LOW);
    digitalWrite(ledGreen, LOW);
}
void setup() {
    Serial.begin(9600);
    pinMode(ledRed, OUTPUT);
    pinMode(ledGreen, OUTPUT);
    cleanLeds();
    Serial.println("start");
    wifiConnect();
    client.setServer(server, 1883);
    client.setCallback(callback);
    delay(1500);
}
void loop() {
    Serial.print(".");
    if (!client.connected()) {
        mqttReConnect();
    }
    client.loop();
    delay(500);
}

原文标题《Playing With Docker, MQTT, Grafana, InfluxDB, Python, and Arduino》

作者:Gonzalo Ayuso

译者:February

不代表云加社区观点,更多详情请查看原文链接

0 人点赞