ETL CSV to Elasticsearch

2021-08-11 17:17:12 浏览数 (1)

前言

当有大量数据要从 CSV 导入到 Elasticsearch 中时一般有两种方式来完成

  • 1.使用 logstash 加上 csv filter 的方式来导入
  • 2.编写脚本来完成

对于第一种方式,只要定义好字段名,指定输入源文件,相对简单,但定制空间比较受 logstash 的功能约束

对于第二种方式,相对灵活,但是更复杂一点,需要借助各种库,也要理清数据抽取,变换处理与导入的逻辑流程

前一篇使用 helpers.bulk API 实现了 CSV 文档的批量导入

这里演示一下如何傅用 creat API 来将 CSV 导出到 Elasticsearch

Tip: 需要借助 Elasticsearch 的 python 客户端


操作

环境

代码语言:javascript复制
[root@much sf_script]# hostnamectl
   Static hostname: much
         Icon name: computer-vm
           Chassis: vm
        Machine ID: 33dc28f7e76c4903ad9b603b77e29a7c
           Boot ID: a6eba448fd814d6dad2f7cb92465f567
    Virtualization: kvm
  Operating System: CentOS Linux 7 (Core)
       CPE OS Name: cpe:/o:centos:centos:7
            Kernel: Linux 3.10.0-514.21.1.el7.x86_64
      Architecture: x86-64
[root@much sf_script]# ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN qlen 1
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
2: enp0s3: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000
    link/ether 08:00:27:e3:df:87 brd ff:ff:ff:ff:ff:ff
    inet 10.0.2.15/24 brd 10.0.2.255 scope global dynamic enp0s3
       valid_lft 80921sec preferred_lft 80921sec
3: enp0s8: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000
    link/ether 08:00:27:d3:ec:e7 brd ff:ff:ff:ff:ff:ff
    inet 192.168.56.208/24 brd 192.168.56.255 scope global enp0s8
       valid_lft forever preferred_lft forever
4: virbr0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN qlen 1000
    link/ether 52:54:00:16:5e:11 brd ff:ff:ff:ff:ff:ff
    inet 192.168.122.1/24 brd 192.168.122.255 scope global virbr0
       valid_lft forever preferred_lft forever
5: virbr0-nic: <BROADCAST,MULTICAST> mtu 1500 qdisc pfifo_fast master virbr0 state DOWN qlen 1000
    link/ether 52:54:00:16:5e:11 brd ff:ff:ff:ff:ff:ff
[root@much sf_script]# python -V
Python 2.7.5
[root@much sf_script]# rpm -qa | grep elast
elasticsearch-6.2.1-1.noarch
[root@much sf_script]# rpm -qa | grep kibana
kibana-6.2.1-1.x86_64
[root@much sf_script]#

Note: Kibana 的版本要与 elasticsearch 版本兼容,否则会报错

安装 pip

代码语言:javascript复制
[root@much ~]# yum install  python2-pip.noarch
Loaded plugins: fastestmirror, langpacks
Loading mirror speeds from cached hostfile
 * base: mirror.pregi.net
 * c7-media:
 * epel: mirror.pregi.net
 * extras: mirror.pregi.net
 * updates: mirror.pregi.net
Resolving Dependencies
--> Running transaction check
---> Package python2-pip.noarch 0:8.1.2-5.el7 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

================================================================================
 Package              Arch            Version               Repository     Size
================================================================================
Installing:
 python2-pip          noarch          8.1.2-5.el7           epel          1.7 M

Transaction Summary
================================================================================
Install  1 Package

Total download size: 1.7 M
Installed size: 7.2 M
Is this ok [y/d/N]: y
Downloading packages:
warning: /var/cache/yum/x86_64/7/epel/packages/python2-pip-8.1.2-5.el7.noarch.rpm: Header V3 RSA/SHA256 Signature, key ID 352c64e5: NOKEY
Public key for python2-pip-8.1.2-5.el7.noarch.rpm is not installed
python2-pip-8.1.2-5.el7.noarch.rpm                         | 1.7 MB   00:05     
Retrieving key from file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7
Importing GPG key 0x352C64E5:
 Userid     : "Fedora EPEL (7) <epel@fedoraproject.org>"
 Fingerprint: 91e9 7d7c 4a5e 96f1 7f3e 888f 6a2f aea2 352c 64e5
 Package    : epel-release-7-9.noarch (@extras)
 From       : /etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7
Is this ok [y/N]: y
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
Warning: RPMDB altered outside of yum.
** Found 3 pre-existing rpmdb problem(s), 'yum check' output follows:
ipa-client-4.4.0-14.el7.centos.7.x86_64 has installed conflicts freeipa-client: ipa-client-4.4.0-14.el7.centos.7.x86_64
ipa-client-common-4.4.0-14.el7.centos.7.noarch has installed conflicts freeipa-client-common: ipa-client-common-4.4.0-14.el7.centos.7.noarch
ipa-common-4.4.0-14.el7.centos.7.noarch has installed conflicts freeipa-common: ipa-common-4.4.0-14.el7.centos.7.noarch
  Installing : python2-pip-8.1.2-5.el7.noarch                               1/1
  Verifying  : python2-pip-8.1.2-5.el7.noarch                               1/1

Installed:
  python2-pip.noarch 0:8.1.2-5.el7                                              

Complete!
[root@much ~]#

安装 Elasticsearch Client API

代码语言:javascript复制
[root@much ~]# pip install elasticsearch
Collecting elasticsearch
  Downloading elasticsearch-6.1.1-py2.py3-none-any.whl (59kB)
    100% |████████████████████████████████| 61kB 326kB/s
Collecting urllib3<1.23,>=1.21.1 (from elasticsearch)
  Downloading urllib3-1.22-py2.py3-none-any.whl (132kB)
    100% |████████████████████████████████| 133kB 347kB/s
Installing collected packages: urllib3, elasticsearch
  Found existing installation: urllib3 1.10.2
    Uninstalling urllib3-1.10.2:
      Successfully uninstalled urllib3-1.10.2
Successfully installed elasticsearch-6.1.1 urllib3-1.22
You are using pip version 8.1.2, however version 9.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
[root@much ~]#

有提示安装新版本的 pip

代码语言:javascript复制
[root@much ~]# pip install --upgrade pip
Collecting pip
  Downloading pip-9.0.1-py2.py3-none-any.whl (1.3MB)
    100% |████████████████████████████████| 1.3MB 273kB/s
Installing collected packages: pip
  Found existing installation: pip 8.1.2
    Uninstalling pip-8.1.2:
      Successfully uninstalled pip-8.1.2
Successfully installed pip-9.0.1
[root@much ~]# pip -V
pip 9.0.1 from /usr/lib/python2.7/site-packages (python 2.7)
[root@much ~]#

查看当前索引

代码语言:javascript复制
[root@much sf_script]# curl http://localhost:9200/_cat/indices?pretty=true
green open .kibana FEw09koKTymzBRmFlyCThA 1 0 4 0 20kb 20kb
[root@much sf_script]#

编写脚本

代码语言:javascript复制
[root@much sf_script]# vim csv2es2.py
[root@much sf_script]# cat csv2es2.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

#import
import csv
import sys
import os
import uuid
from optparse import OptionParser
from elasticsearch import Elasticsearch

reload(sys)
sys.setdefaultencoding('utf-8')

#args parser
usage =  "Usage: %prog <-i index> <-t type> [options] arg"
parser = OptionParser(usage)
parser.add_option("-i","--index",dest="index",help="(mandatory)the index ready to import")
parser.add_option("-t","--type",dest="dtype",help="(mandatory)the type ready to import")
parser.add_option("-f","--csv",dest="csv",help="(mandatory)the csv file ready to import")
parser.add_option("-s","--server",dest="server",default="localhost",help="the Elasticsearch host, default is �fault")
parser.add_option("-P","--port",dest="port",default="9200",help="the Elasticsearch port, default is �fault")
parser.add_option("-u","--user",dest="user",help="the user of elasticsearch")
parser.add_option("-p","--password",dest="password",help="the password of the elasticsearch user")
(options,args)=parser.parse_args()

#check args
if options.csv == None:
    exit("Error: you need to specified target csv file tobe import")
if not os.path.exists(options.csv):
    exit("Error: %s not found"%options.csv)
if options.index == None:
    exit("Error: you need to specified target index tobe import")
if options.dtype == None:
    exit("Error: you need to specified target type tobe import")
if options.user == None or options.password == None:
    es_url = 'http://'   options.server   ':'   options.port   '/'
else:
    es_url = 'http://'   options.user   ':'   options.password   '@'   options.server   ':'   options.port   '/'


es = Elasticsearch(es_url)

def to_utf8(record):
    for i in record:
        record[i]=str(record[i]).encode('utf-8')
    return record

def etl_csv_to_es(indexName,typeName,csvFile):
    count=0
    for row in csv.DictReader(open(csvFile,'rb')):
        es.create(index=indexName,doc_type=typeName,id=str(uuid.uuid4()),body=to_utf8(row))
        count  = 1
    es.indices.flush(index=[indexName])
    return (True,count)

#main
if __name__ == "__main__":
    res,num = etl_csv_to_es(options.index,options.dtype,options.csv)
    if res:
        print "%d items import secussfully"%num
    else:
        print "import fail"
[root@much sf_script]#

准备测试数据

代码语言:javascript复制
[root@much sf_script]# vim y.csv
[root@much sf_script]# cat y.csv
x,y,z,p,q
2018/01/02 23:44:23,a,b,c,d
2018/01/02 23:44:24,a,b,c,d
2018/01/02 23:44:25,a,b,c,d
2018/01/02 23:44:26,a,b,c,d
2018/01/02 23:44:27,a,b,c,d
2018/01/02 23:44:28,a,b,c,d
2018/01/02 23:44:29,a,b,c,d
2018/01/02 23:44:30,a,b,c,d
2018/01/02 23:44:31,a,b,c,d
2018/01/02 23:44:32,a,b,c,d
2018/01/02 23:44:33,a,b,c,d
2018/01/02 23:44:34,a,b,c,d
2018/01/02 23:44:35,a,b,c,d
2018/01/02 23:44:36,a,b,c,d
2018/01/02 23:44:37,a,b,c,d
2018/01/02 23:44:38,a,b,c,d
2018/01/02 23:44:39,a,b,c,d
2018/01/02 23:44:40,a,b,c,d
2018/01/02 23:44:41,a,b,c,d
2018/01/02 23:44:42,a,b,c,d
2018/01/02 23:44:43,a,b,c,d
2018/01/02 23:44:44,a,b,c,d
2018/01/02 23:44:45,a,b,c,d
2018/01/02 23:44:46,a,b,c,d
2018/01/02 23:44:47,a,b,c,d
2018/01/02 23:44:48,a,b,c,d
2018/01/02 23:44:49,a,b,c,d
2018/01/02 23:44:50,a,b,c,d
2018/01/02 23:44:51,a,b,c,d
2018/01/02 23:44:52,a,b,c,d
2018/01/02 23:45:23,a,b,c,d
2018/01/02 23:45:24,a,b,c,d
2018/01/02 23:45:25,a,b,c,d
2018/01/02 23:45:26,a,b,c,d
2018/01/02 23:45:27,a,b,c,d
2018/01/02 23:45:28,a,b,c,d
2018/01/02 23:45:29,a,b,c,d
2018/01/02 23:45:30,a,b,c,d
2018/01/02 23:45:31,a,b,c,d
2018/01/02 23:45:32,a,b,c,d
2018/01/02 23:45:33,a,b,c,d
2018/01/02 23:45:34,a,b,c,d
2018/01/02 23:45:35,a,b,c,d
2018/01/02 23:45:36,a,b,c,d
2018/01/02 23:45:37,a,b,c,d
2018/01/02 23:45:38,a,b,c,d
2018/01/02 23:45:39,a,b,c,d
2018/01/02 23:45:40,a,b,c,d
2018/01/02 23:45:41,a,b,c,d
[root@much sf_script]#

运行脚本

代码语言:javascript复制
[root@much sf_script]# ./csv2es2.py -h
Usage: csv2es2.py <-i index> <-t type> [options] arg

Options:
  -h, --help            show this help message and exit
  -i INDEX, --index=INDEX
                        (mandatory)the index ready to import
  -t DTYPE, --type=DTYPE
                        (mandatory)the type ready to import
  -f CSV, --csv=CSV     (mandatory)the csv file ready to import
  -s SERVER, --server=SERVER
                        the Elasticsearch host, default is localhost
  -P PORT, --port=PORT  the Elasticsearch port, default is 9200
  -u USER, --user=USER  the user of elasticsearch
  -p PASSWORD, --password=PASSWORD
                        the password of the elasticsearch user
[root@much sf_script]# ./csv2es2.py -i indextest -t typetest -f y.csv
49 items import secussfully
[root@much sf_script]#

可以在命令行中进行验证

代码语言:javascript复制
[root@much sf_script]# curl http://localhost:9200/_cat/indices?pretty=true
yellow open indextest NiccLPLgStO7o6YjofKS-g 5 1 49 0 29kb 29kb
green  open .kibana   FEw09koKTymzBRmFlyCThA 1 0  4 0 20kb 20kb
[root@much sf_script]#

从 kibana 中查看数据

区别

两者花费的时间不一样(这并不是一个随机的结果,重复执行很多次结果差异不大)

代码语言:javascript复制
[root@much sf_script]# time ./csv2es2.py -i indextest -t typetest -f y.csv
49 items import secussfully

real	0m0.724s
user	0m0.384s
sys	0m0.084s
[root@much sf_script]# time ./csv2es.py -i indextest -t typetest -f y.csv
49 items import secussfully

real	0m0.630s
user	0m0.317s
sys	0m0.112s
[root@much sf_script]#

原因是 bulk 批量导入的效率与逐条导入的效率差异

代码语言:javascript复制
for row in csv.DictReader(open(csvFile,'rb')):
    action = {"_index":indexName,"_type":typeName,"_source":to_utf8(row)}
    actions.append(action)
    count  = 1
helpers.bulk(es,actions)
----------
for row in csv.DictReader(open(csvFile,'rb')):
    es.create(index=indexName,doc_type=typeName,id=str(uuid.uuid4()),body=to_utf8(row))
    count  = 1

一个是将所有 action 都存入列表中(内存里),调用一次 helpers.bulk API 将数据存入

一个是每一条数据生成后,就立刻调用一次 create API,将数据存入

前者在使用空间置换时间,后者在使用时间置换空间

至于选择哪种执行方式,得看当前的环境条件,内容少内存多时延容忍度小选择前者,内容多内存少时延容忍度大选择后者


总结

同一功能,有很多种实现方式,但是不同的方式,代价不一样,侧重也不一样,我们总会尽量尝试可接受的代价与更倾向的侧重方案

0 人点赞