接上一篇文章,按照”终端出厂实现自动化运维方案",https://blog.csdn.net/yyz_1987/article/details/118358038
以终端状态上保监控服务和远程采集日志指令下发为例,记录下go-zero微服务的简单使用。最终实现一个低成本的后台监控云服务,监控所有出厂终端设备的状态和后续的报警推送服务。
这个方案说简单也简单,说难也确实不容易。难在而如何能否支撑全国各地上万个设备,每间隔十分钟一次的高并发。终端数量按10万计算,不像其他系统是读多写少。这套监控的场景反倒是写数据的多,读数据的少。单个mysql数据库能否撑得住同一时刻10万条记录的写入?
涉及API网关的负载均衡、同一微服务节点的多个部署。数据记录先入持久化缓存队列,空闲再写入mysql.这些肯定是少不了的。
Golang群里大神建议上MQ如Kafka,这样减轻数据库的写入压力。但是kafka有点儿重量级了,先不考虑。还有人建议上硬件的F5负载均衡或者用keepalive 或者lvs 的方案。但是得借助域名解析,也先暂不考虑,等真达到量级或有必要解决了,总会有办法的。
这里先介绍下初步实现吧:
新建一个Golang服务后台项目代码的目录,取名monitor。
环境准备
电脑或服务器安装有mysql,redis,etcd
下载好一些插件工具有:goctl,protoc.exe,proto-gen-go.exe
API网关层实现
按照goctl这一代码生成神器的使用方式,首先定义一下终端需要上送的接口字段信息:
statusUpload.api
代码语言:javascript复制type (
//终端状态上报内容
StatusUploadReq {
Sn string `json:"sn"` //设备唯一号
Pos string `json:"pos"` //终端编号
City string `json:"city"` //城市代码
Id string `json:"id"` //终端类型
Unum1 uint `json:"unnum"` //未传记录数量--公交
Unum2 uint `json:"unnum"` //未传记录数量--三方
Ndate string `json:"ndate"` //当前日期
Ntime string `json:"ntime"` //当前时间
Amount uint `json:"amount"` //当班总额
Count uint `json:"count"` //当班人数
Line uint `json:"line"` //线路号
Stime string `json:"stime"` //开机时间
Ctime string `json:"ctime"` //关机时间
Tenant uint `json:"tenant"` //租户ID
}
//应答内容
StatusUploadResp {
Code int `json:"code"`
Msg string `json:"msg"`
Cmd int `json:"cmd"` //控制终端命令字
}
)
service open-api {
@doc(
summary: 公开的api函数
desc: >statusUpload 终端状态上报
)
@server(
handler: statusUploadHandler
folder: open
)
post /open/statusUpload(StatusUploadReq) returns(StatusUploadResp)
}
接下来借助goctl神器的威力,直接生成网关层代码啦:
代码语言:javascript复制goctl api go -api statusUpload.api -dir .
生成代码目录结构如下:
接下来跑起来试试:
代码语言:javascript复制go run open.go
网关层启动成功,侦听端口8888,在etc文件夹内的 open-api.yaml文件中有配置
分别用postman和curl工具测下:
代码语言:javascript复制curl http://127.0.0.1:8888/open/statusUpload -X POST -H "Content-Type: application/json" -d @status.json
status.json文件内容:
代码语言:javascript复制{
"sn": "1C2EB08D",
"pos": "12345678",
"city": "0371",
"id": "B503",
"unum1": 0,
"unum2": 0,
"ndate": "2021-08-07",
"ntime": "18:30:30",
"amount": 0,
"count": 0,
"line": 101,
"stime": "05:01:01",
"ctime": "18:30:20",
"tenant": 0
}
RPC服务端实现
接下来,把它改造成微服务的形式,通过rpc调用服务提供的接口。大体结构如下:
需要提前安装就绪etcd环境,且需要安装一些插件工具,如proto.exe 和proto-gen-go.exe工具,放到go或gopath的bin目录下。
在项目代码跟目录下创建rpc文件夹,建个微服端的代码目录,这里取名为status。
定义proto文件,status.proto如下:
代码语言:javascript复制syntax = "proto3";
package status;
message statusUploadReq {
string sn = 1;
string pos = 2;
string city = 3;
string id = 4;
uint32 unum1 = 5;
uint32 unum2 = 6;
string ndate = 7;
string ntime = 8;
uint32 amount = 9;
uint32 count = 10;
uint32 line = 11;
string stime = 12;
string ctime = 13;
uint32 tenant = 14;
}
message statusUploadResp {
int32 code = 1;
string msg = 2;
int32 cmd = 3;
}
service statusUploader {
rpc statusUpload(statusUploadReq) returns(statusUploadResp);
}
然后又是goctl神器发威了,自动生成代码,厉害不?
代码语言:javascript复制goctl rpc proto -src=status.proto -dir .
自动生成的文件目录如下:
自动生成的并不包含上图的那个client文件夹。client文件夹是为了单独测试rpc服务自己创建的,做个client端的demo调用一下rpc服务。model文件夹也是手工创建的,里面放数据库的操作接口。
自动生成的rpc服务端status.go入口文件内容:
代码语言:javascript复制package main
import (
"flag"
"fmt"
"monitor/rpc/status/internal/config"
"monitor/rpc/status/internal/server"
"monitor/rpc/status/internal/svc"
"monitor/rpc/status/status"
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/zrpc"
"google.golang.org/grpc"
)
var configFile = flag.String("f", "etc/status.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
srv := server.NewStatusUploaderServer(ctx)
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
status.RegisterStatusUploaderServer(grpcServer, srv)
})
defer s.Stop()
fmt.Printf("Starting rpc server at %s...n", c.ListenOn)
s.Start()
}
这时候如果启动了etcd,那么直接go run status.go,服务端就启动成功啦。
RPC客户端测试
为了验证下rpc服务端是工作正常的,在client文件夹中实现个zrpc的客户端测试一下:
client.go文件如下:
代码语言:javascript复制package main
import (
"context"
"fmt"
"github.com/tal-tech/go-zero/core/discov"
"github.com/tal-tech/go-zero/zrpc"
"log"
pb "monitor/rpc/status/status"
)
func main() {
client := zrpc.MustNewClient(zrpc.RpcClientConf{
Etcd: discov.EtcdConf{
Hosts: []string{"127.0.0.1:2379"},
Key: "status.rpc",
},
})
sclient := pb.NewStatusUploaderClient(client.Conn())
reply, err := sclient.StatusUpload(context.Background(), &pb.StatusUploadReq{Sn: "test rpc", Pos: "go-zero"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.Msg)
}
如果服务正常,会收到服务端接口的响应。
网关层调用改为微服务方式调用
可以把网关层改造下,改为微服务的调用方式。改动点并不大,如下:
第一步:
api目录apiinternalconfig路径下的config文件和apietc下的open-api.yaml文件改动:
open-api.yaml增加etcd的相关配置,用于连接到etcd服务中心,查找对应的服务方法。
注意,Config结构里的Status名字和那个配置文件中的是一一对应的,不能错。如果有多个微服务,这里 可以依次写上,如这种:
代码语言:javascript复制Status:
Etcd:
Hosts:
- localhost:2379
Key: status.rpc
Expander:
Etcd:
Hosts:
- localhost:2379
Key: expand.rpc
代码语言:javascript复制type Config struct {
rest.RestConf
Status zrpc.RpcClientConf // 手动代码
Expander zrpc.RpcClientConf // 手动代码
}
第二步:
api目录apiinternalsvc路径下servicecontext.go文件改动:
第三步:
apiinternallogic目录下statusuploadlogic.go文件改动,
至此api网关层改造完成。可以模拟访问网关接口地址试试啦
代码语言:javascript复制curl http://127.0.0.1:8888/open/statusUpload -X POST -H "Content-Type: application/json" -d @status.json
定义数据库表结构,并生成 CRUD cache 代码
- monitor项目根路径下创建 rpc/model 目录:
mkdir -p rpc/model
- 在 rpc/model 目录下编写创建 tb_status表的 sql 文件
status.sql
,如下:
CREATE TABLE `tb_status`
(
`id` INT UNSIGNED AUTO_INCREMENT,
`sn` VARCHAR(32) NOT NULL COMMENT '设备唯一号',
`posno` VARCHAR(32) COMMENT '终端编号',
`city` VARCHAR(16) COMMENT '城市代码',
`tyid` VARCHAR(16) COMMENT '设备类型',
`unum1` INT COMMENT '未传记录数--公交',
`unum2` INT COMMENT '未传记录数--三方',
`ndate` DATE COMMENT '当前日期',
`ntime` TIME COMMENT '当前时间',
`amount` INT COMMENT '当班总额',
`count` INT COMMENT '当班人数',
`line` INT COMMENT '线路编号',
`stime` TIME COMMENT '开机时间 ',
`ctime` TIME COMMENT '关机时间 ',
`tenant` INT COMMENT '租户号 ',
PRIMARY KEY(`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4;
- 创建 DB,取名叫monitor, 和 table
create database monitor;
source status.sql;
- 在
rpc/model
目录下执行如下命令生成 CRUD cache 代码,-c
表示使用redis cache
goctl model mysql ddl -c -src status.sql -dir .
也可以用datasource
命令代替ddl
来指定数据库链接直接从 schema 生成
生成后的文件结构如下:
代码语言:javascript复制rpc/model
├── status.sql
├── tbstatusmodel.go // CRUD cache代码
└── vars.go // 定义常量和变量
自动生成的tbstatusmodel.go 文件内容:
代码语言:javascript复制package model
import (
"database/sql"
"fmt"
"strings"
"github.com/tal-tech/go-zero/core/stores/cache"
"github.com/tal-tech/go-zero/core/stores/sqlc"
"github.com/tal-tech/go-zero/core/stores/sqlx"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/tools/goctl/model/sql/builderx"
)
var (
tbStatusFieldNames = builderx.RawFieldNames(&TbStatus{})
tbStatusRows = strings.Join(tbStatusFieldNames, ",")
tbStatusRowsExpectAutoSet = strings.Join(stringx.Remove(tbStatusFieldNames, "`id`", "`create_time`", "`update_time`"), ",")
tbStatusRowsWithPlaceHolder = strings.Join(stringx.Remove(tbStatusFieldNames, "`id`", "`create_time`", "`update_time`"), "=?,") "=?"
cacheTbStatusIdPrefix = "cache::tbStatus:id:"
)
type (
TbStatusModel interface {
Insert(data TbStatus) (sql.Result, error)
FindOne(id int64) (*TbStatus, error)
Update(data TbStatus) error
Delete(id int64) error
}
defaultTbStatusModel struct {
sqlc.CachedConn
table string
}
TbStatus struct {
Id int64 `db:"id"`
Sn sql.NullString `db:"sn"` // 设备唯一号
Posno sql.NullString `db:"posno"` // 终端编号
City sql.NullString `db:"city"` // 城市代码
Tyid sql.NullString `db:"tyid"` // 设备类型
Unum1 sql.NullInt64 `db:"unum1"` // 未传记录数--公交
Unum2 sql.NullInt64 `db:"unum2"` // 未传记录数--三方
Ndate sql.NullTime `db:"ndate"` // 当前日期
Ntime sql.NullString `db:"ntime"` // 当前时间
Amount sql.NullInt64 `db:"amount"` // 当班总额
Count sql.NullInt64 `db:"count"` // 当班人数
Line sql.NullInt64 `db:"line"` // 线路编号
Stime sql.NullString `db:"stime"` // 开机时间
Ctime sql.NullString `db:"ctime"` // 关机时间
Tenant sql.NullInt64 `db:"tenant"` // 租户号
}
)
func NewTbStatusModel(conn sqlx.SqlConn, c cache.CacheConf) TbStatusModel {
return &defaultTbStatusModel{
CachedConn: sqlc.NewConn(conn, c),
table: "`tb_status`",
}
}
func (m *defaultTbStatusModel) Insert(data TbStatus) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tbStatusRowsExpectAutoSet)
ret, err := m.ExecNoCache(query, data.Sn, data.Posno, data.City, data.Tyid, data.Unum1, data.Unum2, data.Ndate, data.Ntime, data.Amount, data.Count, data.Line, data.Stime, data.Ctime, data.Tenant)
return ret, err
}
func (m *defaultTbStatusModel) FindOne(id int64) (*TbStatus, error) {
tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, id)
var resp TbStatus
err := m.QueryRow(&resp, tbStatusIdKey, func(conn sqlx.SqlConn, v interface{}) error {
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tbStatusRows, m.table)
return conn.QueryRow(v, query, id)
})
switch err {
case nil:
return &resp, nil
case sqlc.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
func (m *defaultTbStatusModel) Update(data TbStatus) error {
tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, data.Id)
_, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, tbStatusRowsWithPlaceHolder)
return conn.Exec(query, data.Sn, data.Posno, data.City, data.Tyid, data.Unum1, data.Unum2, data.Ndate, data.Ntime, data.Amount, data.Count, data.Line, data.Stime, data.Ctime, data.Tenant, data.Id)
}, tbStatusIdKey)
return err
}
func (m *defaultTbStatusModel) Delete(id int64) error {
tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, id)
_, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
return conn.Exec(query, id)
}, tbStatusIdKey)
return err
}
func (m *defaultTbStatusModel) formatPrimary(primary interface{}) string {
return fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, primary)
}
func (m *defaultTbStatusModel) queryPrimary(conn sqlx.SqlConn, v, primary interface{}) error {
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tbStatusRows, m.table)
return conn.QueryRow(v, query, primary)
}
修改 monitor/rpc/status rpc 代码调用 crud cache 代码
- 修改
rpc/status/etc/status.yaml
,增加如下内容:
- 修改
rpc/status/internal/config.go
,如下:
package config
import "github.com/tal-tech/go-zero/zrpc"
//手动代码
import "github.com/tal-tech/go-zero/core/stores/cache"
type Config struct {
zrpc.RpcServerConf
DataSource string // 手动代码
Cache cache.CacheConf // 手动代码
}
增加了 mysql 和 redis cache 配置
- 修改
rpc/status/internal/svc/servicecontext.go
,如下:
package svc
import "monitor/rpc/status/internal/config"
//手动代码
import "monitor/rpc/status/model"
type ServiceContext struct {
Config config.Config
Model model.TbStatusModel // 手动代码
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
Model: model.NewTbStatusModel(sqlx.NewMysql(c.DataSource), c.Cache), // 手动代码
}
}
- 修改
rpc/status/internal/logic/statusuploadlogic.go
,如下:
package logic
import (
"context"
"monitor/rpc/status/internal/svc"
"monitor/rpc/status/status"
"github.com/tal-tech/go-zero/core/logx"
//手动代码
"database/sql"
"monitor/rpc/status/model"
"time"
)
type StatusUploadLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
model model.TbStatusModel // 手动代码
}
func NewStatusUploadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StatusUploadLogic {
return &StatusUploadLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
model: svcCtx.Model, // 手动代码
}
}
func (l *StatusUploadLogic) StatusUpload(in *status.StatusUploadReq) (*status.StatusUploadResp, error) {
// todo: add your logic here and delete this line
// 手动代码开始,插入记录到数据库
t, _ := time.Parse("2006-01-02", in.Ndate)
_, err := l.model.Insert(model.TbStatus{
Sn: sql.NullString{in.Sn, true},
Posno: sql.NullString{in.Pos, true},
City: sql.NullString{in.City, true},
Tyid: sql.NullString{in.Id, true},
Ndate: sql.NullTime{t, true},
Ntime: sql.NullString{in.Ntime, true},
})
if err != nil {
return nil, err
}
return &status.StatusUploadResp{Code: 0, Msg: "server resp,insert data ok", Cmd: 1}, nil
}
注意这里的sql.NullString和NullTime的写法,如果后面的第二个参数值为false,则插入到库中的值为空。
最后测试下,发现数据已经可以成功入库啦。