摘要
本專題作品開發一套基於ELK的日誌分析服務,提供多種日誌類型解析及對應的可視化功能。目前提供的日誌類型分別為系統日誌(syslog)、網功能變數名稱稱系統日誌(dnslog)及網頁伺服器日誌(nginxlog),可視化功能包含圖表呈現和文字說明等。
本服務運用ELK(Elasticsearch、Logstash、Kibana)作為主要分析系統。Elasticsearch主要有搜集、分析、儲存等功能,可以對整個資料庫進行檢索,並在本專題中作為存儲日誌的主要系統。Kibana可以把從Elasticsearch中獲取的大量日誌數據進行可視化解析,便於數據分析。Logstash可以對日誌數據進行採集和解讀。
根據使用者傳輸日誌類型的不同在logstash中進行分析,使用不同的正規表達式對其進行欄位劃分,經實際測試本系統可以識別大多數使用通用格式的日誌。可視化部分便是通過劃分出來的欄位進行呈現。倘若使用者發送的日誌中包括錯誤日誌,系統會收集這些日誌並將其發送到使用者預設的郵箱進行報警提示。
在用戶註冊成功時,前端系統會自動分配端口及登錄kibana的賬號並在登錄之後呈現在用戶介面。用戶把將要進行分析的日誌通過udp的方法傳輸至對應端口,便可以登錄kibana查看實時可視化資訊。用戶可以將多台主機上多種類型的日誌發送至其專屬端口,並且也可以根據個人意願在kibana中對可視化界面進行自定義編輯。當用戶終止服務之後,php會自動刪除該用戶資料並將其端口和kibana賬號分配給下一個用戶來使用。
目錄
第一章 概述 1
1.1動機 1
1.2專題目標 2
1.3專題的專案管理與成本分析 3
第二章 相關技術 5
2.1 rsyslog 5
2.2 Filebeat 5
2.3 GeoIP 5
2.4 DNS 6
2.5 Nginx 6
第三章 系統簡介 6
3.1 網頁端 7
3.2 資料庫端 7
3.3 ELK端 8
3.3.1 Elasticsearch 8
3.3.2 Logstash 8
3.3.3 Kibana 9
第四章 系統實作 9
4.1網頁端 9
4.1.1 登錄與注冊界面 9
4.1.2 主界面 10
4.1.3 注銷用戶功能 12
4.2 資料庫端 13
4.3 伺服器端 16
4.3.1 Elasticsearch 16
4.3.2 Logstash 18
4.3.3 Kibana 20
4.3.4 郵件報警腳本 21
第五章 結論 24
5.1 結論 24
5.2未來發展 25
參考文獻 26
概述
1.1動機
隨著隨著伺服器、IoT 和應用程式等在互聯網社會中快速發展,這也導致由機器生成的數據出現爆炸式成長,其中包括日誌。日誌本身是沒有價值的,只有對日誌進行分析加以利用的時候才會有價值,日誌中包含非常多的有用的資訊,不光包括運維層面,還包括業務層面,安全層面。而日誌分析最主要的作用體現在不僅可以更容易的監管系統所發生的錯誤從而優化系統效能,而且在安全領域,通過日誌可以反映出很多的安全攻擊行為,比如,用戶登錄錯誤的記錄,異常訪問等。日誌還能夠告訴你很多關於網路中所發生事件的資訊,包括性能資訊、故障檢測和入侵檢測。這樣的日誌收集過程涉及到對由您的 IT 系統和技術基礎設施生成的機器數據進行搜索、分析與可視化,以便獲得運維方面的見解。但複雜的內容使用傳統的採集手段已經遠遠不能滿足這樣的大規模採集
目前大數據的處理趨勢主要是即時性,同時,硬體成本降低,在這種情況下出現了許多日誌分析平臺,其中以ELK(分佈式搜索引擎Elasticsearch、日誌搜集過濾工具Logstash、數據可視化分析平臺Kibana)以它強大的功能脫穎而出。它可以從龐大的日誌資訊數據中準確及時篩選出相關的資訊,對資訊數據進行監控和維護,對日誌可以實行統計分析.ELK不僅提高了運維人員的工作效率和品質,還能即時地監控日誌數據運行狀態,並回饋處理。但想要使用ELK的服務,需要先部署ELK的各個組件,而部署一項長期運行的服務並非易事,這無形中拉高了ELK的使用門檻。因此,我們想通過提前部署的方式降低ELK的使用門檻,讓用戶能夠通過我們的服務,簡單快捷地享受ELK的強大功能
1.2專題目標
本專題製作日誌分析服務,主要是面向中小型企業,伺服器管理員,運維人員,資安新人等等擁有伺服器並有分析需求之人員,並只需要人員了解如何發送日誌到其專屬端口便可以實現實時分析可視化。為此本專題設計用戶可以將多台主機上多種類型的日誌發送至其專屬端口,並且也可以根據個人意願在可視化界面進行自定義編輯,讓使用者可以根據想要獲取的需求進行調整。另外,考慮到會有錯誤日誌傳輸進服務,該系統會將對每條日誌進行錯誤判斷並定時通過用戶預設郵箱回報用戶。
本专题的可視化處理可實現通過點選時間及劃分欄位查找出指定的日誌信息與顯示每種日誌類型傳輸總量。而在分析syslog方面主要的功能是可以將每條日誌的priority value篩選出來並且進行提示,方便使用者查看syslog的狀態與優先級。在dnslog可以將不同用戶在不同時段訪問dns服務器繪製成圖表,並且可以將用戶ip所在地通過geoip繪製成地圖呈現。Nginxlog則是可以在圖表中持續監控流量以及使用人數等數據。
在郵件報警功能方面則是主要在接收日誌是想進行判斷是否為錯誤信息,如果是便收集信息到用戶的專屬文件中,讓後通過python腳本讀取數據庫獲取該日誌對應用戶的默認郵箱並將其發送。因為擔心文件過大郵件無法發送的問題,在成功發送給用戶後便會將其刪除。上述功能會在沒五分鐘進行一次,以防過度郵件轟炸用戶被郵箱發送到垃圾箱。
1.3專題的專案管理與成本分析
本專題製作從 2022 年 7 月開始至 2022 年 12月結束,共歷時6個月(參見表 1-1)。本專題工作人員共2人,工作量累計共花費 6 個人月 (參見表1-2),工作分配如表 1.3 所示。
相關技術
本專題在相關技術部分會介紹在用戶發送日誌的方法rsyslog,日誌接收工具Filebeats,獲取IP地理位置之插件GeoIP,DNS及nginx的基本介紹
2.1 rsyslog
rsyslog是linux系統中用來實現日誌功能的服務。一般情況下是默認安裝並自動啓用的。讓rsyslog的主要功能是用來采集日誌,它支持將日誌輸出到各種數據庫,如MySQL、MongoDB、ElasticSearch等。本專題使用rsyslog來發送日誌到接收日誌的主機以測試服務是否正常運行。對於將日誌儲存在linux系統中的用戶,也可以使用rsyslog把日誌發送本專題的日誌接收端
2.2 Filebeat
Filebeat是用於轉發和集中日誌數據的輕量級傳送工具,可以監視指定的日誌文件或位置,收集日誌並把它們轉發到Elasticsearch或Logstash進行索引。相較於Logstash,Filebeat的CPU與内存占用低,作爲發送端是不錯的工具。因此用戶也可以安裝Filebeat,通過Filebeat來將日誌傳送到指定的IP位址。但作爲接收端時,Filebeat無法對日誌作進一步的處理,因此本專題在Logstash作爲接收日誌的工具
2.3 GeoIP
GeoIP即爲IP地理位置數據庫,可以根據IP位址獲得地理信息,如:所在大洲、經緯度、國家與地區、省市、ASN等。也因爲這個特性,GeoIP被廣泛應用與各大代理軟件。Logstash中的GeoIP插件使用的是GeoLite2數據庫,即GeoIP2的免費版本。本專題使用Logstash的GeoIP插件來將用戶日誌中的IP信息轉換爲地理信息,再輸出到Elasticsearch中,再由Kibana讀取這些地理位置信息,生成IP地圖,增强可視化功能的表現。
2.4 DNS
DNS(Domain Name System)網域名稱系統是網際網路的一項服務。它作爲域名將和IP位址相互對對映的一個分散式資料庫,能夠讓人更方便地存取網際網路。DNS伺服器產生的日誌會携帶有使用者的IP,本專題通過分析DNS日誌中的IP地址,將其轉換為地理位置保存,從而在可視化階段給用戶呈現DNS「用戶地圖」,幫助使用本專題服務的DNS伺服器擁有者更好地觀察伺服器的使用情況
2.5 Nginx
Nginx是一款輕量級的Web服務器、反向代理服務器,由於它的內存占用少,啟動極快,高並發能力強,在互聯網項目中被廣泛應用。Nginx日誌中存有所有與網站訪問者有關的活動記錄,例如客戶端向伺服器發出的請求、客戶端的IP地址、客戶端所使用的瀏覽器等。因此本專題通過Nginx日誌的分析,來幫助網站擁有者監控網站在一段時間内的使用情況。
系統簡介
本專題設計一個基於ELK的日誌分析系統,通過讓用戶注冊、登錄、傳輸日誌,最後登錄Kibana來使用我們提供的一個簡便的日誌分析功能。整套系統由兩大部分組成,處理用戶注冊登入的網頁端,接收用戶傳輸日誌的ELK端。用戶登入網頁端,輸入個人信息,服務器進行儲存並分配給用戶他所對應的Kibana之賬密與Kibana Space。整套系統由三個部分組成,網頁端、資料庫端、ELK端
3.1 網頁端
本專題的網頁主要負責注冊、登入、引導用戶幾大功能。用戶可以透過輸入賬號、密碼、郵箱等個人信息進行注冊。注冊完成並登錄後,用戶能夠在主頁面看到自己被分配的Kibana的相關信息。用戶也可以使用主界面的幾個功能按鈕,進行不同功能的使用。比如前往Kibnana,或是對自己注冊時輸入的個人信息作修改。用戶也可以登出,或直接注銷自己的賬號來終止服務。
3.2 資料庫端
本專題的資料庫端主要有兩個表格,分別為「用戶信息資料表」(表1-1)以及「接收端口資料表」(表1-2)。「用戶信息資料表」(表1-1)包含了用戶注冊時填入的個人信息以及被分配的Kibana信息。當用戶注冊與修改個人資料時,將會更新此表。接收端口資料表包含用來接收用戶日誌的主機的IP位址與端口號以及表示此端口是否可用的特徵值。用戶注冊時,會從此表抓取可用端口進行分配。
表 3-1 用戶信息資料表
資料表欄位 | 資料型態 | 說明 |
---|---|---|
account | varchar(30) | 用戶注冊時輸入的賬號 |
password | varchar(30) | 用戶注冊時輸入的密碼 |
email_add | varchar(30) | 用戶注冊時輸入的郵箱 |
port | int(11) | 分配給用戶的端口 |
tohost | varchar(15) | 分配給用戶的主機位址 |
kibana_account | varchar(30) | 分配給用戶的Kibana賬號 |
kibana_password | varchar(30) | 分配給用戶的Kibana密碼 |
space | varchar(50) | 分配給用戶的Kibana Space |
available | tinyint(1) | 用戶身份是否有效 |
表 3-2 接收端口資料表
資料表欄位 | 資料型態 | 說明 |
---|---|---|
port_number | int(11) | 接受用戶日誌的端口 |
owned_host | varchar(15) | 接受用戶日誌的IP位址 |
kibana_account | varchar(30) | 預設的Kibana賬號 |
kibana_password | varchar(30) | 預設的Kibana密碼 |
space | varchar(50) | 預設的Kibana Space |
available | tinyint(1) | 端口是否可用 |
3.3 ELK端
本專題使用ELK實現對日誌的諸多操作。以下將ELK分爲三部分介紹
3.3.1 Elasticsearch
Elasticsearch 是一個建置在 Apache Lucene 上的分散式搜尋和分析引擎,它可以快速地储存、搜索和分析海量数据。Elasticsearch使用index(索引)來分類數據。本專題主要使用它的儲存數據的功能,將其作爲存放用戶日誌的資料庫。
3.3.2 Logstash
Logstash是一個開源數據收集引擎,具有實時管道功能。Logstash可以動態地將來自不同數據源的數據統一起來,並將數據傳輸到設定的目的地。
本專題使用Logstash采集用戶傳輸的日誌,相對於另一款日誌采集工具Filebeat,Logstash具有Grok功能,能夠將日誌做欄位的劃分,這能很好地幫助我們做日誌的解析與可視化。
3.3.3 Kibana
Kibana 是一款免費開源的前端應用程序,其基礎是 Elastic Stack,可以為 Elasticsearch 中索引的數據提供搜索和數據可視化功能。本專題使用Kibana爲用戶提供日誌的查看界面
系統實作
本章將詳細介紹網頁端、資料庫端與ELK端的程式碼,以及在實作過程中碰到的問題與解決方案
4.1網頁端
本節將會介紹網頁端各個功能的實現以及部分重要的程式碼。網頁端主要由PHP HTML實作,HTML負責網頁的呈現,PHP負責與資料庫的連結。
4.1.1 登錄與注冊界面
登錄頁面的主要功能是引導用戶登錄與注冊,由不需與資料庫做連結,因此完全采用html編寫,同時用CSS添加修飾,保證頁面的美觀。注冊頁面沿用登錄頁面的架構,加入PHP代碼與資料庫連結,告知用戶注冊是否成功,并將成功注冊的用戶的個人信息存入資料庫
<body> <form action="login_process.php" method="POST"> <div class="container"> <h1>登錄</h1> <div class="ipt-box"> <input type="text" placeholder="账号" autocomplete="off" name="username"> </div> <div class="ipt-box"> <input type="password" id="password" placeholder="密码" autocomplete="off" name="password"> <i class="eye fa fa-eye-slash"></i> <div class="beam"></div> </div> <button class="btn-login" onclick="check(this.form)"> 登錄 </button> <a href="./registration.php" class="btn-regi"> <h1 class="regi-text">注冊</h1> </a> </div> </form> </body> |
---|
4.1.2 主界面
本專題希望主界面能夠引導用戶將log傳送到我們的主機,同時幫助用戶前往Kibana。因此,網頁端的主界面主要用於展示用戶分配到的Port與Kibana的相關信息。展示給用戶的信息由PHP程式碼前往資料庫取得,之後同樣通過PHP實現取得的資料的呈現
<!DOCTYPE html> <html> <head> <meta http-equiv="content-type" content="text/html; charset=utf-8"> <meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no"> <title>Login</title> <!-- 引入字體圖示庫 --> <link rel="stylesheet" href="https://cdn.staticfile.org/font-awesome/4.7.0/css/font-awesome.css"> <link rel="stylesheet" href="171.css"> </head> <style> a { text-decoration: none; } a.btn-regi { width: 75%; height: 50px; margin-top: 30px; border: none; outline: none; background-color: #7875ac; color: #fff; border-radius: 5px; font-size: 18px; letter-spacing: 8px; text-indent: 8px; cursor: pointer; text-align: center; } .show-password .btn-regi { background-color: #fff; color: #000; } .regi-text { display: block; width: 100%; height: 50px; line-height: 50px; font-size: 18px; font-weight: lighter; } .show-password .regi-text { color: #000; font-weight: 500; font-size: 18px; } </style> <script type="text/javascript"> function check(form) { document.myform.submit(); } </script> <body> <form action="login_process.php" method="POST"> <div class="container"> <h1>登錄</h1> <div class="ipt-box"> <input type="text" placeholder="账号" autocomplete="off" name="username"> </div> <div class="ipt-box"> <input type="password" id="password" placeholder="密码" autocomplete="off" name="password"> <i class="eye fa fa-eye-slash"></i> <div class="beam"></div> </div> <button class="btn-login" onclick="check(this.form)"> 登錄 </button> < a href=" " class="btn-regi"> <h1 class="regi-text">注冊</h1> </ a> </div> </form> </body> </html> <script src="login.js"></script> |
---|
除此之外,還在主界面實現了幾個供用戶使用的功能,比如前往Kibana、修改用戶個人信息、登出、以及注銷用戶。其中,注銷功能額外使用python脚本來對Elasticsearch進行操作,這在下一小節中會詳細介紹
4.1.3 注銷用戶功能
注銷功能需要做到兩點。第一點是刪除用戶存於資料庫的個人信息,此項由PHP完成。第二點是將用戶傳輸到Elasticsearch中的日誌清除。Elasticsearch提供直接用curl命令操作的功能,但由於在PHP使用curl較爲麻煩,因此此處使用編寫好的python脚本作爲插件,來做curl的使用。PHP引用Python脚本,將要刪除的用戶特徵信息傳遞給Python脚本,由Python脚本使用curl訪問Elasticsearch將存有用戶日誌的index重建。
//delete.php <?php session_start(); require_once("./db_connect.php"); $params = "logstash-" . $rs['port']; #傳遞給python腳本的入口參數 $path = "python test.py "; //需要注意的是:末尾要加一個空格 passthru($path . $params); //等同於命令`python python.py 參數`,並接收列印出來的資訊 exec('./test.py $params', $arr); $c = exec("python3 ./test.py {$params}", $out, $res); //print_r(urldecode($out[0])); $port = $rs['port']; $tohost = $rs['tohost']; echo $query_string = "UPDATE portinfo SET available=1 WHERE port_number=$port AND owned_host='$tohost'"; mysqli_query($con, $query_string); $query_string = "DELETE FROM userinfo WHERE account='$username'"; mysqli_query($con, $query_string); echo '<meta http-equiv=REFRESH CONTENT=2;url=./login.html>'; ?> |
---|
//delete.py import os import sys try: os.popen("curl -u elastic:123456 -XDELETE 'http://localhost:9200/" sys.argv[1] "'") os.popen("curl -u elastic:123456 -XPUT 'http://localhost:9200/" sys.argv[1] "'") except BaseException as a: print (a) |
4.2 資料庫端
本專題采用mysql作爲儲存用戶個人信息的資料庫。本組設計了db_connect.php文件用於連接資料庫並進行初步的信息抓取,並在需要連接資料庫的PHP文件中引用此文件,以此減少代碼編寫量。
<?php session_start(); require_once("./include/configure.php"); $con = mysqli_connect("localhost", "root", "yxul4dj4"); $username = $_SESSION['loginID']; mysqli_select_db($con, "adminuser"); $result = mysqli_query($con, "select * from userinfo where account = '$username'") or die("Failed to query database " . mysqli_error($con)); $rs = mysqli_fetch_array($result); ?> |
---|
前端頁面使用PHP處理用戶注冊時,會先查詢資料庫中是否存在重複的賬號。若無,則將新注冊用戶存入資料庫並通知用戶注冊成功。注冊成功時,會從「接收端口資料表」(表1-1)抓取可用的端口分配給新用戶,寫入「用戶信息資料表」(表1-1)。
<?php session_start(); require_once("./include/configure.php"); $username = $_POST['username']; //獲取login.html中提交的用戶名 $password = $_POST['password']; //獲取login.html中提交的密碼 // $_SESSION['loginID'] = $_POST['username']; $_SESSION['loginID'] = $username; $con = mysqli_connect($dbhost, $dbuser, $dbpwd); //連接本地資料庫 mysqli_select_db($con, "adminuser"); //在所有的資料庫中選擇叫 adminuser 的資料庫 $result = mysqli_query($con, "select * from userinfo where account = '$username' and password ='$password'") or die("Failed to query database " . mysqli_error($con)); //在 adminuser 資料庫下的 userinfo 表單下查詢所有的用戶名和密碼,否則die,報錯 $row = mysqli_fetch_array($result); //值傳給數組 if ($row['account'] === $username && $row['password'] === $password) { // header("Location: index.php?username=$username&password=$password"); header("Location: test.php?username=$username&password=$password"); } else { echo "<script>alert('登錄失敗!')</script>"; echo '<meta http-equiv=REFRESH CONTENT=2;url=./login.html>'; } ?> |
---|
<?php session_start(); require_once("./include/configure.php"); $username = $_POST['n_username']; //獲取login.html中提交的用戶名 $password = $_POST['n_password']; //獲取login.html中提交的密碼 $email = $_POST['n_email']; $_SESSION['loginID'] = $_POST['n_username']; $con = mysqli_connect($dbhost, $dbuser, $dbpwd); //連接本地資料庫 mysqli_select_db($con, "adminuser"); //在所有的資料庫中選擇叫 adminuser 的資料庫 $result = mysqli_query($con, "select * from userinfo where account = '$username'") or die("Failed to query database " . mysqli_error($con)); $row = mysqli_fetch_array($result); if ($row['account'] != null) { echo "重複的用戶名!" . $row['account'] . " "; echo "<meta http-equiv=REFRESH CONTENT=2;url=./registration.php>"; } else { $result = mysqli_query($con, "select * from portinfo where available = 1") or die("Failed to query database " . mysqli_error($con)); $row = mysqli_fetch_array($result); if ($row['port_number']) { $port = $row['port_number']; $tohost = $row['owned_host']; $k_u = $row['kibana_account']; $k_p = $row['kibana_password']; $space = $row['space']; $query_string = "UPDATE portinfo SET available=0 WHERE port_number=$port AND owned_host='$tohost'"; mysqli_query($con, $query_string); $sql = "INSERT INTO userinfo (account, password, email_add, port, tohost, kibana_account, kibana_password, space) values ('$username', '$password', '$email', '$port', '$tohost', '$k_u', '$k_p', '$space')" if (mysqli_query($con, $sql)) { echo "<script>alert('新增成功!')</script>"; echo '<meta http-equiv=REFRESH CONTENT=2;url=./login.html>'; } else { echo '新增失敗!'; echo '<meta http-equiv=REFRESH CONTENT=2;url=./registration.php>'; } } else { echo '抱歉,無用戶名額!'; echo '<meta http-equiv=REFRESH CONTENT=2;url=./registration.php>'; } } ?> |
4.3 伺服器端
本節將會詳細介紹ELK幾個組件的詳細配置過程與配置文件的相關内容
4.3.1 Elasticsearch
Elasticsearch可以通過多種方式安裝在多種系統中,例如可以通過下載.zip檔,解壓並運行其中的脚本文件來安裝在Window上,或通過wget與apt-get安裝在linux上。我們最終決定將其安裝在linux系統上,因爲linux系統更穩定、更安全。作爲一個長期運行的服務,Elasticsearch顯然更適合安裝在linux上。
需要注意的是,ELK三部分的版本需要保持一致,否則會發生錯誤
官方的ELasticsearch軟件包可以通過wget命令下載。下載前需要先取得官方的ELasticsearch PGP key。下載完成后通過shasum命令驗證完整性后使用dpkg命令安裝。安裝後即可開始配置相關文件
配置文件中會有許多注釋項,可根據需要選擇是否開啓,以下會列出一些需要調整的設置。
cluster與node為Elasticsearch用於緩解内存壓力的設置,維持默認設置即可
cluster.name: elasticsearch node.name: node-1 node.attr.rack: r1 |
---|
path.data與path.logs為儲存用戶日誌與Elasticsearch日誌的路徑,如無必要,維持默認設置即可
path.data: /var/lib/elasticsearch path.logs: /var/log/elasticsearch |
---|
network.host與http.host決定Elasticsearch能否從外界訪問,將其設置爲0.0.0.0以方便我們使用外部地址進行訪問與調試
network.host: 0.0.0.0 http.port: 9200 http.host: 0.0.0.0 transport.host: 0.0.0.0 |
---|
Xpack是一個集安全、警報、監視、報告和圖形功能於身的擴展,該功能在5.x版本后的elasticsearch内為自帶功能。開啓此功能后,訪問es需要用戶名與密碼,能夠提高安全性,因此選擇開啓
xpack.security.enabled: true xpack.security.enrollment.enabled: true xpack.security.http.ssl: enabled: false keystore.path: certs/http.p12 xpack.security.transport.ssl: enabled: true verification_mode: certificate keystore.path: certs/transport.p12 truststore.path: certs/transport.p12} |
---|
4.3.2 Logstash
Logstash一樣安裝與linux系統中。但Logstash的安裝方式與Elasticsearch有略微不同,通過wget獲取公共簽名密鑰后直接通過apt-get下載安裝Logstash。安裝完畢后即可開始配置相關文件
Logstash本身的配置文件(logstash.yml)中需要配置的並不多,大部分維持默認設置即可
node.name: 'hostname' path.data: /var/lib/logstash path.config: "/etc/logstash/pipelines.yml" config.test_and_exit: false config.reload.automatic: false config.reload.interval: 3s api.enabled: true api.http.host: 0.0.0.0 api.http.port: 9600 path.logs: /var/log/logstash |
---|
要接收用戶發送的日誌,我們需要在相應的目錄創建配置文件(.conf)來進行設定。配置文件由三個部分組成,分別為Input,Filter,Output。其中Input讓Logstash可以讀取特定的事件源,這可以是標準輸入、本地文件讀入或是通過指定端口傳入文件。Output能夠使Logstash將數據發送到指定位置,在本專題中通常爲配置完成的Elasticsearch。這兩個部分是不可缺少的。而Filter則是一個對數據進行處理的過濾器,在Filter中可以引用許多插件(比如GeoIP),本專題主要使用Filter的Grok功能對數據進行欄位的分割。
在Input部分。需要指定事件源,本專題中為接受用戶日誌的特定端口。同時可以設置一些特徵值,幫助區分用戶。
input { udp { type => "5044" port => 5044 tags => ["from5044"] } } |
---|
Filter部分使用Grok匹配不同格式的日誌,即使是同一種類型的日誌,也可能會有不同的格式,因此本組寫入了多條Grok表達式來確保用戶傳入的日誌可以被正確地匹配到。
grok{ match => { "message" => ["<%{NUMBER:syslog.PRI}> %{SYSLOGTIMESTAMP:syslog.timestamp} %{SYSLOGHOST:syslog.hostname} %{SYSLOGPROG:syslog.program} : %{LOGLEVEL:syslog.level} :%{GREEDYDATA:syslog.message}", "<%{NUMBER:syslog.PRI}> %{SYSLOGTIMESTAMP:syslog.timestamp} %{SYSLOGHOST:syslog.hostname} %{SYSLOGPROG:syslog.program}: %{LOGLEVEL:syslog.level} : %{GREEDYDATA:syslog.message}", "<%{NUMBER:dns.pri}> %{SYSLOGTIMESTAMP:dns.time} %{IPORHOST:dns.hostname} %{DATA:dns.tag}: (?<dns.timestamp>%{MONTHDAY}-%{MONTH}-%{YEAR} %{TIME}) client(.*?) %{IP:dns.clientip} %{POSINT:dns.clientport}(%{DATA:dns.query_one}): query: %{DATA:dns.query_two} IN %{GREEDYDATA:dns.querytype} (%{IP:dns.dns})", "<%{NUMBER:syslog.PRI}> %{SYSLOGTIMESTAMP:syslog.timestamp} %{SYSLOGHOST:syslog.hostname} %{SYSLOGPROG:syslog.program}: %{GREEDYDATA:syslog.message}", "%{IPORHOST:ip} - %{DATA:user_name} [%{HTTPDATE:time}] "%{WORD:http_method} %{DATA:url} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} "%{DATA:referrer}" "%{DATA:agent}" ] } } |
---|
由於nginx的格式較特殊,可能包含一些特殊符號,導致Logstash無法正確識別,因此本組使用特定函數將傳入的nginxlog中的特定内容作刪減。
ruby { code => " if event['message'] event['message'] = event['message'].gsub('x22','').gsub('x9B','').gsub('x08','').gsub('x09','').gsub('x','') end " } |
---|
在Filter中,本組還使用了GeoIP插件,從日志中的IP地址獲取更多的信息,以在後續的可視化呈現部分實現地圖功能。注意欄位「Location」的資料型態需為「geo_point」
geoip { source => "dns.clientip" target => "geoip" } |
---|
Logstash支持使用if篩選日志内容,因此本組在Filter中加入識別日志中Fatal與Error字樣,並添加特徵值,以利後續操作
if "ERROR" in [message]{ mutate { add_tag => "errorlog" } } |
---|
由於需要服務多名用戶,因此需要多個配置文件。此時需要配置Pipelines.yml,這可以使Logstash在啓動時應用多個配置文件
- pipeline.id: main path.config: "/etc/logstash/conf.d/*.conf" |
---|
在Output部分根據特徵值將處理后的日志存入Elasticsearch,注意在存入Elasticsearch時需要配置設定好的密碼。如果不以特徵值加以區分,則每一份日志會往每一個index中均存入一份。同時,依據filter中的日志錯誤訊息的檢索,判斷是否將日志文件輸出爲本地的txt文檔,讓編寫好的Python脚本對其進行處理
output { if "errorlog" in [tags] { file { path => "/etc/logstash/testsave/testlog.txt" } } if "from5044" in [tags]{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "testmap5044-%{ YYYY.MM}" user => "elastic" password => "123456" } } } |
---|
4.3.3 Kibana
Kibana的主要功能是搜索、查看、交互儲存與Elasticsearch索引中的數據,並能以圖表、表格、地圖等方式展示數據。
Kibana的安裝與Logstash相同。通過apt-get取得軟件包並安裝后,即可進行配置文件的相關改動。
如果Kibana與Elasticsearch安裝在同一台主機中,就不必配置Elastisearch的位址,使用默認的localhost即可
(kibanayml.txt)
由於開啓了Elasticsearch的Xpack安全插件,需要為kibana配置用於訪問Elasticsearch的賬號密碼
server.port: 5601 server.host: 0.0.0.0 elasticsearch.hosts: ["http://0.0.0.0:9200"] | server.port: 5601 server.host: 0.0.0.0 elasticsearch.hosts: ["http://0.0.0.0:9200"] |
---|---|
server.port: 5601 server.host: 0.0.0.0 elasticsearch.hosts: ["http://0.0.0.0:9200"] |
由於開啓了Elasticsearch的Xpack安全插件,需要為kibana配置用於訪問Elasticsearch的賬號密碼
elasticsearch.hosts: ["http://0.0.0.0:9200"] elasticsearch.username: "kibana" elasticsearch.password: "123456" |
---|
如果需要開啓Kibana的警報功能,需要生成安全密鑰,並寫入Kibana的配置文件中
xpack.encryptedSavedObjects.encryptionKey: 732f089c299b8469e154d3115d751524 xpack.reporting.encryptionKey: b0111902695addd52ba3dc9e0631404c xpack.security.encryptionKey: b53ef418fe64529e88c17dc45125dd76 |
---|
4.3.4 郵件報警腳本
該腳本是通過使用os庫來實現操作linux系統來讀取被存取的錯誤信息,使用pymysql庫來獲取用戶信息來發送郵件,而郵件系統則是使用email庫來實現,通過使用gmail的授權碼來使用 gmail提供的官方smtp服務器發送郵件。
import logging from email import encoders from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import smtplib import pymysql import os def mailsender(user_name, user_email, errorfile): logging.basicConfig(level=logging.DEBUG) # 填写发送邮件的邮箱地址和密码 from_addr = 'luowenjie1191172440@gmail.com' password = 'lwj001014' # 填写收件人的邮箱地址 to_addr = user_email # 设置邮件的标题和文件名 subject = 'Dear ' user_name ' log error email alerting' filename = '/etc/logstash/testsave/' errorfile '.txt' html_msg = """ <p>錯誤日誌提醒</p> <p><a href="https://elkweb.isrcttu.net/webpage/login.html">請查看附件或至官網查看具體錯誤</a></p> """ # 创建一个带附件的实例 msg = MIMEMultipart() msg['From'] = from_addr msg['To'] = to_addr msg['Subject'] = subject msg.attach(MIMEText(html_msg, 'html', 'utf-8')) # 打开文件并将内容读入 with open(filename, 'rb') as f: attachment = MIMEBase('application', 'octet-stream') attachment.set_payload((f).read()) # 编码附件 encoders.encode_base64(attachment) attachment.add_header('Content-Disposition', 'attachment', filename='errorlog.txt') msg.attach(attachment) # 连接并发送邮件 try: server = smtplib.SMTP('smtp.gmail.com', 587) server.starttls() server.login(from_addr, 'zfycahzcmjgtgbjj') server.sendmail(from_addr, to_addr, msg.as_string()) print("邮件发送成功") os.popen('cd /etc/logstash/testsave && rm ' errorfile '.txt') delnot = os.popen('find /etc/logstash/testsave -name "' errorfile '.txt "').read() if not delnot: print("文件删除成功") except smtplib.SMTPException as e: print(e) print('邮件发送失败!') except socket.gaierror as e: print(e) print('邮件服务器链接失败!') finally: # 关闭服务器 server.quit() def getuser(): db = pymysql.connect(host='localhost', user='root', password='yxul4dj4', database='adminuser') # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() errorsave = os.popen('cd /etc/logstash/testsave && ls ').read() errorsave_list = errorsave.split("n") for i in range(len(errorsave_list)-1): errorsave_list[i] = errorsave_list[i].strip('.txt') print(errorsave_list[i]) # 使用 execute() 方法执行 SQL 查询 cursor.execute( "SELECT account,email_add FROM userinfo WHERE port=" errorsave_list[i]) # 使用 fetchone() 方法获取单条数据. data = cursor.fetchall() if not data: pass else: user_message = list(data[0]) user_name = user_message[0] user_email = user_message[1] mailsender(user_name, user_email,errorsave_list[i]) print(user_name, user_email) # 关闭数据库连接 db.close() if __name__ == "__main__": getuser() print("over") |
---|
結論
5.1 結論
本專題透過使用ELK系統、python製作腳本、網頁畫面的製作與mySQL資料庫的設計,實作一套以上述的日誌分析服務。此系統提供多種日誌類型目前為syslog,nginxlog及dnslog,並且用戶只需要將多台主機或各種不同的日誌類型傳入一個指定端口,便可以查看全部日誌的可視化信息,當日誌中產生錯誤信息可以通過郵箱想用戶告警。
Syslog 旨在监控网络设备和系统,以便在出现任何功能问题时发送通知消息——它还针对预先通知的事件发出警报,并通过参与网络设备的更改日誌或事件日誌来监控可疑活动。該專題的可視化圖表可以很容易的呈現出系統中所執行的程序有哪些錯誤或是警報。DNS 日誌是用来记录 DNS 服务器的活动的日誌文件。这些日誌文件可以用来调试 DNS 问题、确定哪些域名正在使用您的 DNS 服务器、跟踪潜在的攻击和恶意活动等。該專題的可視化圖表可以很明顯的看出網站的流量與最經常訪問的用戶來自何處。Nginx 日誌文件包括访问日誌和错误日誌。访问日誌记录了每个访问 Nginx 服务器的请求的相关信息,包括访问时间、访问者的 IP 地址、访问的 URL 和返回的 HTTP 状态码等。这些信息可以帮助您了解哪些内容被访问得最多,哪些地方可能存在问题等。該專題可以通過圖表輕易地看出上述信息。
用戶通過網站進行注冊、登錄后便能分配到專屬的端口與Kibana Space。發送日誌到對應端口后,通過顯示的賬號、密碼登錄Kibana即可查看。用戶可以發送不同主機上的不同類型的log到對應端口,之後通過本專題預設之Dashboard查看日誌分析與統計結果。也可通過聚合查詢分別查看來自不同主機的日誌或集中查看某一類型的日誌。如此可以較少的精力體驗較佳的日誌分析功能。
5.2未來發展
1.目前支持的日誌類型較少,用戶選擇較爲受限。未來擬定支持更多日誌類型,如Apachelog,提高用戶體驗
2.郵件告警功能目前僅對「Error」「Fatal」字樣作反應,可以添加自定義功能,讓用戶指定需要郵件提示的内容
3.目前無法讓用戶更改Kibana的相關信息,可以通過Kibana的開放Api來讓用戶可以自定Kibana的賬號密碼與Space名稱
4.目前透過傳入的port區分用戶,目前沒有問題,但對服務的擴張有一定影響
參考文獻
[0]GeoIP介紹 檢自
https://zh.wikipedia.org/zh-tw/GeoDNS
[0]Filebeat介紹 檢自
https://zhuanlan.zhihu.com/p/141439013
[0]Elasticsearch的安裝與使用 檢自
https://www.elastic.co/guide/en/elasticsearch/reference/current/elasticsearch-intro.html
[0]Logstash的安裝與使用 檢自
https://www.elastic.co/guide/en/logstash/current/introduction.html
[0]Kibana的安裝與使用 檢自
https://www.elastic.co/guide/en/kibana/current/introduction.html
[0]在Python中使用reuqest模塊實現curl命令 檢自
https://www.delftstack.com/zh/howto/python/implement-curl-commands-using-requests-module-in-python/
[0]Elasticsearch中的插件Xpack 檢自
https://zhuanlan.zhihu.com/p/36337697
[0]在Logstash Filter中使用GeoIP plugin 檢自
https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html