记某大用户Elasticsearch数据方案处理过程 <一>

2022-11-17 18:22:31 浏览数 (1)

一、前言|

需求演变

  • 需求一:某大用户CDN日志写入Elasticsearch时,从JSON字段里剔除tag结尾的字段,保留固定已知的字段
  • 需求二:用户的doc中的字段数不明确,删除以tag结尾的类似“2301931667f1adcfc56e7f10f6d5970b”这样的固定位数的无规则字符串。要把那些CDN校验产生的33位的值为tag的字段删除,其余都保留
  1. 日志截图如下:
改造日志截图改造日志截图

二、需求一 实现方案

上图标红的字段要移除,其他字段要保留。这里通过remove set pipeline的方式,实现此类字段的预处理。

实际是要处理req.headers字段里的内容数量,可以先拷贝出来,然后再移除掉req.headers body

知识点: set-process pipeline

Set processor | Elasticsearch Guide [8.3] | Elastic

这里用到了set 下的copy from. 先拷贝出来,然后再移除,注意此功能是ES7.14新功能。版本可能要升级。

remove pipeline

Remove processor | Elasticsearch Guide [8.3] | Elastic

  1. 测试过程: 第一步,先定义一个pipeline. 组合:remove set
代码语言:javascript复制
PUT _ingest/pipeline/set_bar_test
{
  "description": "TO remove some fileds for buiness!",
  "processors": [
    {
      "set": {
        "field": "req_headers_host",
        "copy_from": "req.headers.host"
      }
    },
    {
      "set": {
        "field": "req_headers_connection",
        "copy_from": "req.headers.connection"
      }
    },
    {
      "set": {
        "field": "req_headers_x-forwarded_for",
        "copy_from": "req.headers.x-forwarded-for"
      }
    },
    {
      "set": {
        "field": "req_headers_x-forward-proto",
        "copy_from": "req.headers.x-forwarded-proto"
      }
    },
    {
      "set": {
        "field": "req_headers_x-forwarded_host",
        "copy_from": "req.headers.x-forwarded-host"
      }
    },{
      "set": {
        "field": "req_headers_x-forwarder-port",
        "copy_from": "req.headers.x-forwarded-port"
      }
    },
    {
      "set": {
        "field": "req_headers_x-forwarded_path",
        "copy_from": "req.headers.x-forwarded-path"
      }
    },
    {
      "set": {
        "field": "req_header_x-forwarded-prefix",
        "copy_from": "req.headers.x-forwarded-prefix"
      }
    },
    {
      "set": {
        "field": "req_headers_content_length",
        "copy_from": "req.headers.content-length"
      }
    },
    {
      "set": {
        "field": "req_headers_x_stgw-time",
        "copy_from": "req.headers.x-stgw-time"
      }
    },
    {
      "set": {
        "field": "req_headers_x_client_proto",
        "copy_from": "req.headers.x-client-proto"
      }
    },
    {
      "set": {
        "field": "req_headers_x_client_proto_ver",
        "copy_from": "req.headers.x-client-proto-ver"
      }
    },
    {
      "set": {
        "field": "req_headers_accept",
        "copy_from": "req.headers.accept"
      }
    },
    {
      "set": {
        "field": "req_headers_content_type",
        "copy_from": "req.headers.content-type"
      }
    },
    {
      "set": {
        "field": "req_headers_authorization",
        "copy_from": "req.headers.authorization"
      }
    },
    {
      "set": {
        "field": "req_headers_x-fr-clientid",
        "copy_from": "req.headers.x-fr-clientid"
      }
    },
    {
      "set": {
        "field": "req_headers_date",
        "copy_from": "req.headers.date"
      }
    },
    {
      "set": {
        "field": "req_headers_content-md5",
        "copy_from": "req.headers.content-md5"
      }
    },
    {
      "set": {
        "field": "req_headers_accept-encoding",
        "copy_from": "req.headers.accept-encoding"
      }
    },
    {
      "set": {
        "field": "req_headers_user-agent",
        "copy_from": "req.headers.user-agent"
      }
    },{
      "set": {
        "field": "req_headers_waf_client_ip",
        "copy_from": "req.headers.waf_client_ip"
      }
    },
    {
      "set": {
        "field": "req_headers_x-nws-log-uuid",
        "copy_from": "req.headers.x-nws-log-uuid"
      }
    },
    {
      "set": {
        "field": "req_headers_x-tencent-ua",
        "copy_from": "req.headers.x-tencent-ua"
      }
    },
    {
      "remove": {
        "field": "req.headers"
      }
    }
  ]
}

第二步:应用pipeline

代码语言:javascript复制
POST test/_doc/1?pipeline=set_bar_test
{
    "hostName": "store-inventory-capi-proxy-5b9b4fbtlflq",
    "level": "INFO",
    "time": "2022-07-11T06:43:27.147Z",
    "traceId": null,
    "requestId": "274caf3c-d9c59c51022b",
    "service": "store-inventory",
    "thread": null,
    "message": "request.start",
    "endpoint": "/store-inventoumer/120346/employee-name",
    "responseTime": null,
    "req": {
      "method": "GET",
      "headers": {
        "host": "172.17.82.488",
        "connection": "keep-alive",
        "x-forwarded-for": "8.8.8.8",
        "x-forwarded-proto": "http",
        "x-forwarded-host": "tc-ng.cn",
        "x-forwarded-port": "8000",
        "x-forwarded-path": "/store-inventory/consumer/ployee-name",
        "x-forwarded-prefix": "/store-inventv1/uq/cn/consumer/",
        "x-real-ip": "8.8.8.8",
        "content-length": "0",
        "x-stgw-time": "1657521807.132",
        "x-client-proto": "https",
        "x-client-proto-ver": "HTTP/1.1",
        "accept": "application/json",
        "content-type": "application/json",
        "authorization": "Sigture-tnce:"158QqJRUa1zocsL961cLlqKY="",
        "x-fr-clientid": "handy",
        "date": "2022-07-11T14:43:17 08:00",
        "content-md5": "1B2M2Y8AsgTY7PhCfg==",
        "accept-encoding": "gzip",
        "user-agent": "okhttp/3.12.0",
        "waf_client_ip": "221.8.8.8",
        "2301931667f1adcfc56e7f10f6d5970b": "tag",
        "x-nws-log-uuid": "11031091720",
        "001cdca8084b0a72e42a97451c6541f9": "tag",
        "x-tencent-ua": "Qcloud"
      },
      "query": {
        "employee_code_list": "01555"
      },
      "body": null,
      "url": "/store-inventory/v1/uq/cn/consumer/120yee-name?empl55",
      "originalUrl": "/store-inventory/v1/uq/cn/consum/601444555",
      "httpVersion": "1.1"
    }
  }

第三步,测试结果

测试结果测试结果

发现:tag结尾的字段移除了,同时生成了几十个新字段。

三、需求二 实现方案

需求一是已知JSON数据里,要保留的字段已知,这样上面的方案可以执行。但是实际在跟用户对接过程中,发现JSON里的字段是不确定的。要排除的字段数跟要保留的字段数都不明确,唯明确"以tag结尾的“2301931667f1adcfc56e7f10f6d5970b”这样的固定位数的无规则字符串需要删除"那方案一就不行了。如何解决呢?

通过regex script方式实现客户的需求。

https://www.elastic.co/guide/en/elasticsearch/reference/current/script-processor.html

通过正则根据 字符长度和类型进行正则匹配。

同样是通过script pipeline.

代码语言:javascript复制
"processors": [
      {
        "script": {
          "description": "Remove 'tag'  from 'req.headers' field",
          "lang": "painless",
          "source": """
            Map map = (HashMap)ctx['req']['headers'];
            Map headersMap = new HashMap();
            for (entry in map.entrySet()){
              if (entry.getValue()!= params.value){
                headersMap.put(entry.getKey(), entry.getValue());
              }
            }
            ctx['req']['headers'] = headersMap
          """,
          "params": {
            "value": "tag"
          }
        }
      }
    ]

那么这样就实现了客户的需求。需求二比较复杂,需要点开发基础。

四、pipeline添加进索引settings

pipeline可以加入index settings或template.参考如下:

代码语言:javascript复制
PUT test
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 0,
    "index.default_pipeline": "set_bar_test"
  },
  "mappings": {}
}

post test/_doc/1

0 人点赞