Elasticsearch service通过企业微信定期发送报告

2022-06-22 22:38:26 浏览数 (1)

在上一篇文章《如何选择Elastic Stack中的Alert和Watcher》中,我们介绍了Alert和Watcher的使用场景。也提到Watcher与Kibana Alert的一个重要不同是,Watcher也可以用来调度Elasticsearch的任务。其中一个常见的用途是调度报告的定时生成和发送电子邮件。当我们在使用Elasticsearch service作为数据引擎进行各种与数据有关的搜索和分析工作时,通常需要将数据汇总,做成各种可视化的仪表板,定期发送各种报告(比如,运营汇总报告,安全分析报告,服务异常报告等)。本文中,我们将介绍:

  • 在Kibana上,如何生成仪表板的PDF或PNG报告
  • 如何通过腾讯云的serverless函数服务:
    • 定期生成报告
    • 将报告发送到企业微信

生成仪表板的PDF或PNG报告

要自动生成 PDF 和 CSV 报告,需要生成一个 POST URL,然后使用 Watcher 或脚本提交 HTTP请求。在本文中,我们是通过腾讯云的serverless函数服务来执行脚本,提交HTTP请求

创建一个 POST URL

创建触发报告以生成 PDF 和 CSV 报告的 POST URL。

要为 PDF ,PNG报告创建 POST URL:

  1. 打开主菜单,然后单击DashboardVisualize LibraryCanvas
  2. 打开要作为报告查看 的仪表板、可视化或Canvas工作板。
  3. 从工具栏中,单击共享 > PDF 报告,然后选择一个选项:
    • 如果您使用的是DashboardVisulize Library,请单击Copy POST URL
    • 如果您使用的是Canvas,请单击高级选项 > 复制 POST URL

要为 CSV 报告创建 POST URL:

  1. 打开主菜单,然后单击Discover
  2. 打开您要共享的已保存搜索。
  3. 在工具栏中,单击共享 > CSV 报告 > 复制 POST URL

获取POST URL获取POST URL

当我们获取POST URL之后,每次访问该链接,均可触发一次生成报告的任务,并且将会记录于Elasticsearch当中。我们可通过Kibana上的管理界面查看任务的状态,以及下载对应的报告

报告任务界面报告任务界面

例如,我们可以通过以下脚本,进行任务触发

代码语言:javascript复制
curl 
-XPOST  
-u elastic  
-H 'kbn-xsrf: true'  
'http://0.0.0.0:5601/api/reporting/generate/csv?jobParams=...' 

需要注意,在通过HTTP请求触发时,对于配置了基础安全的集群,需要提供用户认证信息。

通过腾讯云的serverless函数服务定期生成和发送报告

Elasticsearch的Watcher功能提供了集群内的定期报告生成和发送功能。比如:

代码语言:javascript复制
PUT _watcher/watch/error_report
{
  "trigger" : {
    "schedule": {
      "interval": "1h"
    }
  },
  "actions" : {
    "email_admin" : { 
      "email": {
        "to": "'Recipient Name <recipient@example.com>'",
        "subject": "Error Monitoring Report",
        "attachments" : {
          "error_report.pdf" : {
            "reporting" : {
              "url": "http://0.0.0.0:5601/api/reporting/generate/printablePdf?jobParams=...", 
              "retries":40, 
              "interval":"15s", 
              "auth":{ 
                "basic":{
                  "username":"elastic",
                  "password":"changeme"
                }
              }
            }
          }
        }
      }
    }
  }
}

但Watcher的该功能仅能通过电子邮件发送报告。在不少场景,电子邮件的通达性不如企业微信等即时通信工具。

当我们选择企业微信作为发送报告的工具时,需要以脚本的方式触发报告的生成。该场景下,我们可以选择的方案很多,这里介绍的是通过腾讯云的云函数服务:


云函数(Serverless Cloud Function,SCF)是腾讯云为企业和开发者们提供的无服务器执行环境,帮助您在无需购买和管理服务器的情况下运行代码。您只需使用平台支持的语言编写核心代码并设置代码运行的条件,即可在腾讯云基础设施上弹性、安全地运行代码。SCF 是实时文件处理和数据处理等场景下理想的计算平台。

我们可以将示例代码部署到云函数中,并启用特定的触发器进行触发

代码语言:python代码运行次数:0复制
import hashlib

import requests
from requests.auth import HTTPBasicAuth
import base64
import json
import time

# Kibana的地址
host="https://es-i4hx8bxq.kibana.tencentelasticsearch.com:5601"
username=<your username>
password=<your password>
headers = {
    'kbn-xsrf': "true",
}
# 企微的webhook
qyapi_webhook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=<your key>"
# dashoard地址
dashboard_url="https://es-i4hx8bxq.kibana.tencentelasticsearch.com:5601/goto/5a40604be221c859894b218a7f38e6f3"
# 针对dashboard生成报告的POST URL
report_url = "https://es-i4hx8bxq.kibana.tencentelasticsearch.com:5601/api/reporting/generate/png?jobParams=..."

def generate_report(url):

    # 发送post url指令,生成报告
    response = requests.post(url, timeout=10, headers=headers, auth=HTTPBasicAuth(username, password))
    if response.ok:
        # 下载报告
        download_path = json.loads(response.text)['path']
        status = json.loads(response.text)['job']['status']
        if status == 'pending':
            time.sleep(60)

        response = requests.get(host download_path, auth=HTTPBasicAuth(username, password))

        if response.text == 'pending':
            requests.post(qyapi_webhook, data=json.dumps({"msgtype":"text", "text": {"content": "报告生成异常"}}))
            return
        # 将报告转码(based64   md5验证)
        content = json.dumps({
            "msgtype": "image",
            "image": {
                "base64": str(base64.b64encode(response.content), encoding='utf-8'),
                "md5": str(hashlib.md5(response.content).hexdigest())
            }})
        # 将报告发送到企业微信
        requests.post(qyapi_webhook, data=json.dumps({"msgtype": "markdown", "markdown": {"content": "报告已生成,具体可以查看[链接](" dashboard_url ")"}}))
        response = requests.post(qyapi_webhook, data=content)
        print(response.text)
        return

    requests.post(qyapi_webhook, data=json.dumps({"msgtype":"text", "text": {"content": "报告生成异常"}}))


def main_handler(event, context):
    generate_report(report_url)
if __name__ == '__main__':
    main_handler("", "")

注意:示例中设置了一个60秒的sleep,是因为根据仪表板加载数据所需时间的不同,报告需要1~2分钟的生产时间。为了保证报告生产任务完成之后再读取报告,需要设置一个等待时间。我们也可以把该脚本拆分成两个云函数,一个负责生产报告,一个负责下载报告,并通过企业微信发送。

示例:企微收到的报告示例:企微收到的报告

0 人点赞