ES日志存储以及备份压缩到COS

2023-04-18 11:40:08 浏览数 (1)

导语

为了满足用户日益增长的日志存储大小,不影响用户的写入和查询性能。满足不同用户写入流量。同时用户日志长期保存,日志存储比较占用空间和成本。ES集群规格配置高,消耗资源和成本。我们基于Go语言设计了一个多用户多ES集群,日志备份到cos节省成本的方案。本篇实践基于Go语言编程。

索引设计

为了防止单个索引不断增加。影响ES集群查询写入性能,ES 集群的索引设计主要采取如下方式:

1. 租户分离:将索引按照租户进行分离,避免不同租户之间的数据混淆,提高 ES 集群的数据安全性和隔离性。 2. 按月分割:将索引按照每个月进行分割,避免单个索引过大,提高 ES 集群的查询性能。 3. 按大小固定rollover:将索引按照固定大小进行rollover,自动分索引,避免单个索引过大,进行索引管理,提升集群性能,便于集群运维。

ES集群设计

为了避免 ES 集群出现单点问题,以及配置不断增加带来的运维风险, ES 集群设计时主要通过将不同规格的用户日志写入不同配置的ES集群,并且使用自动扩容技术来实现集群的可扩展性。

具体来说,可以将用户日志按照不同的规格(例如不同的数据量、访问频率等)进行分类,然后将不同规格的用户日志写入不同配置的ES集群中。例如,可以使用较小的ES集群来处理低频访问的用户日志,而使用较大的ES集群来处理高频访问的用户日志。

ES数据备份到COS

创建备份流程

在 ES 备份流程中,为了保证备份的正确性和完整性,可以按照如下步骤执行:

  • 创建一个仓库来存储备份数据。
  • 创建一个快照任务,将ES中的数据备份到指定的仓库中。在创建快照任务时,需要指定备份的索引、仓库、快照名称等参数,并设置备份的间隔时间和保留时间等策略。
  • 查询快照任务是否完成。ES提供了API接口来查询快照的状态和进度,可以根据查询结果来判断备份是否已经完成。
  • 调用 COS 提供的压缩函数对备份数据进行压缩,以减小备份数据所占用的存储空间。 通过备份和压缩到 COS  可以将 ES 存储的数据缩小到 1/4 的大小,并且通过 COS 的海量分布式存储服务,可以提供低成本、弹性的云存储服务。 各项步骤对应的代码逻辑如下:

1.创建仓库

代码语言:javascript复制
func CreateRepository(Repository string, cosappid uint64, appId uint64, accessKeyId string, AccessKeySecret string, buckets string, region string) error {  service := Client.SnapshotCreateRepository(Repository)  service = service.Type("cos").Settings(map[string]interface{}{    "app_id":            strconv.FormatUint(cosappid, 10),    "access_key_id":     accessKeyId,    "access_key_secret": AccessKeySecret,    "bucket":            buckets,    "region":            region,    "compress":          true,    "chunk_size":        "500mb",    "base_path":         "backup/"   strconv.FormatUint(appId, 10)   "/"   Repository,  })  if err := service.Validate(); err != nil {    logger.Errorf("SnapshotCreateRepository failed,app_id=%v,Repository=%s ", appId, Repository)    return err  }  _, err := service.Do(context.Background())  if err != nil {    logger.Errorf("SnapshotGetRepository do failed,app_id=%v,Repository=%s ", appId, Repository)  }  return nil}

2.创建快照

代码语言:javascript复制
func SnapshotCreate(Repository string, Snapshot string, appId uint64, indexname string) error {  type IndicesSetting struct {    Indices string `json:"indices"`  }  e1 := IndicesSetting{indexname}  _, err := Client.SnapshotCreate(Repository, Snapshot).WaitForCompletion(false).BodyJson(e1).Pretty(true).Do(context.Background())  if err != nil {    logger.Errorf("SnapshotCreate failed,app_id=%v,Repository=%s Snapshot=%s ", appId, Repository, Snapshot)    return err  }  return nil}

3.查询快照状态

代码语言:javascript复制
func SnapshotGet(Repository string, Snapshot string, appId uint64) (*elastic.SnapshotGetResponse, error) {  service, err := Client.SnapshotGet(Repository).    Snapshot(Snapshot).    IgnoreUnavailable(true).    Verbose(true).Pretty(true).Do(context.Background())  if err != nil {    logger.Errorf("SnapshotGet failed,app_id=%v,Repository=%s Snapshot=%s ", appId, Repository, Snapshot)    return nil, err  } /*    判断任务是否完成    if service.Snapshots[0].State != "true" {
    }*/  return service, nil}

4.创建压缩任务

压缩任务是腾讯云对象存储 COS 提供的压缩API,需要先创建好压缩函数: 创建压缩函数参考如下:

https://cloud.tencent.com/document/product/436/58578

首先获取压缩文件,然后调用压缩函数进行压缩。获取压缩文件代码逻辑如下:

代码语言:javascript复制
var coslist []string  coslist, err = ext.GetCosDirCosKeyList(backupcosfile, cosClient)  if err != nil {    logger.Errorx("GetCosDirCosKeyList error", err)    return err  }  scfClient, err := ext.GetScfClient(config.One.Backup.Region, true)  if err != nil {    logger.Errorx("GetScfClient error", err)    return err  }  var invokeRequestId string  invokeRequestId, err = ext.ScfCompression(scfClient, coslist, backupcosfilezip, config.One.Backup.ZipFunctionName)  if err != nil {    logger.Errorx("GetScfClient error", err)    return err  }

压缩函数模版如下:

代码语言:javascript复制
func ScfCompression(scfCleint *scf.Client, cosKeyList []string, compressKey string, functionname string) (functionRequestId string, err error) {  request := scf.NewInvokeRequest()
  request.FunctionName = common.StringPtr(functionname)  scfSourceList := make([]ScfCompressionSource, 0, 1)  for _, cosKey := range cosKeyList {    scfSource := ScfCompressionSource{      Url: fmt.Sprintf("https://%s.cos-internal.%s.tencentcos.cn/%s", config.One.Cos.GuangzhouCosBucket, config.One.Cos.CosRegion, cosKey),    }    scfSourceList = append(scfSourceList, scfSource)  }  scfClientContext := ScfCompressionClientContext{    Bucket:     config.One.Cos.GuangzhouCosBucket,    Region:     config.One.Cos.CosRegion,    Key:        compressKey,    Flatten:    false,    SourceList: scfSourceList,  }  clientContext, err := json.Marshal(scfClientContext)  if err != nil {    logger.Errorx("ScfCompression Marshal", err)    return "", err  }  request.ClientContext = common.StringPtr(string(clientContext))  //fmt.Println(string(clientContext))
  response, err := scfCleint.Invoke(request)  if _, ok := err.(*tencentError.TencentCloudSDKError); ok {    logger.Errorx("An API error has returned:", err)    return "", err  }  if err != nil {    logger.Errorx("scfCleint.Invoke", err)    return "", err  }  logger.Debugf("%s", response.ToJsonString())  return *response.Response.Result.FunctionRequestId, nil}

COS数据恢复到ES

创建恢复流程

恢复流程是备份流程的逆向流程,主要包括如下步骤:

  • 创建COS解压缩函数
  • 创建恢复任务
  • 查询恢复任务是否完成
  • 完成恢复 1. COS 解压缩任务

解压缩任务是腾讯云对象存储 COS 提供的解压缩函数模板,需要先创建好解压缩函数: 创建解压函数参考如下:

https://cloud.tencent.com/document/product/436/67101

解压缩函数模板如下:

代码语言:javascript复制
func ScfDecompression(scfCleint *scf.Client, compressKey string, cosTargetPrefix string, functionname string) (functionRequestId string, err error) {  logger.Debug("ScfDecompression"   compressKey)  request := scf.NewInvokeRequest()
  request.FunctionName = common.StringPtr(functionname)  //key := fmt.Sprintf("https://%s.cos-internal.ap-guangzhou.myzijiebao.com/%s", config.One.Cos.GuangzhouCosBucket, compressKey)  scfClientContext := ScfDecompressionClientContext{    Bucket:       config.One.Cos.GuangzhouCosBucket,    Region:       config.One.Cos.CosRegion,    Key:          compressKey,    TargetBucket: config.One.Cos.GuangzhouCosBucket,    TargetRegion: config.One.Cos.CosRegion,    TargetPrefix: cosTargetPrefix,  }  clientContext, err := json.Marshal(scfClientContext)  if err != nil {    logger.Errorx("ScfCompression Marshal", err)    return "", err  }  request.ClientContext = common.StringPtr(string(clientContext))  //fmt.Println(string(clientContext))
  response, err := scfCleint.Invoke(request)  if _, ok := err.(*tencentError.TencentCloudSDKError); ok {    logger.Errorx("An API error has returned:", err)    return "", err  }  if err != nil {    logger.Errorx("scfCleint.Invoke", err)    return "", err  }  logger.Debugf("%s", response.ToJsonString())  return *response.Response.Result.FunctionRequestId, nil}

2. 完成解压缩后,创建恢复任务

代码语言:javascript复制
func SnapshotRestore(Repository string, Snapshot string, appId uint64, oldIndexName string, newIndexName string) error {  _, err := Client.SnapshotRestore(Repository, Snapshot).RenamePattern(oldIndexName).    RenameReplacement(newIndexName).Indices(oldIndexName).WaitForCompletion(false).Pretty(true).Do(context.Background())  if err != nil {    logger.Errorf("SnapshotRestore failed,app_id=%v,Repository=%s Snapshot=%s ", appId, Repository, Snapshot)    return err  }  return nil}

0 人点赞