第十五章 并发版爬虫第二版 -- 完结

2020-09-27 16:41:02 浏览数 (1)

并发版爬虫, 在上一篇单机版爬虫的基础上演变而来

这里只有并发引擎的代码, 基本的解析器代码参考: https://cloud.tencent.com/developer/article/1706803

一. 单节点版爬虫的问题

拉取数据的速度太慢, 慢有两部分. 一部分是网络请求, 根据url拉取zhenai网的数据. 另一部分是: 解析. 两部分相比较, 第一部分更慢

既然慢, 我们就要想办法解决慢的问题.

其实拉取数据和数据解析, 可以看成是一个部分. 他们都是具体的工作者. 因此, 我们把他们化为工作者模型

那么, 一个工作者工作, 速度很慢. 很多个工作者同时工作, 速度肯定比一个工作者要快很多.

这里想到java的多线程同时工作. go里面应该就是多协程一起工作了. 将工作者抽象出来, 然后创建多个协程, 比如, 5个, 10个, 20个

在单机版的爬虫里面. 会将数据延绵不断的放入队列中. 那么, 在并发版的应该也有一个类似于队列的东西, 来保存request. 这里定义为一个Scheduler

这是并发版的架构模型. 有一个engine, 一个scheduler, 多个worker. 这里的一个,多个映射到代码里表示的是协程.

sheduler是一个协程调度者, 他把收到的request分发给worker. worker拿到request进行处理. 处理结果输出为Requests, Items, 交给engine执行引擎, 执行引擎再把新的Request放入到Scheduler调度者中, 然后循环往复

这里每条线都代表一个chan. 协程和协程之间的通信, 使用的是chan.

二. 并发版爬虫第一版---所有的worker公用一个输入

什么意思呢?

在单机版是一个队列, 单线程执行, 去队列里去url, 然后get网页内容, 在进行解析.

在并发版, 我们让多个工作者同时工作, 去队列里取request, 然后同时工作, get网页内容, 解析网页.

在go里多个工作者同时工作, 我们可以考虑开多个协程. 同时去队列去数据. 这第一版, 把队列改成了任务调度器Scheduler. 任务调度器是单独的一个goroutine.

拿到的request放到Scheduler里面, 然后多个worker去Scheduler里拿request进行处理.

增加一个并发的引擎--concurrectEngine

代码语言:javascript复制
package engine

import "fmt"

// 定义了一个
type ConcurrectEngine struct {
    Scheduler Scheduler
    WorkerCount int
}

type Scheduler interface{
    Submit(Request)
    ConfigureMasterWorkerChan(chan Request)
}

func (c *ConcurrectEngine) Run(seeds ...Request) {
    // 第一步: 做初始化工作
    in := make(chan Request)
    out := make(chan ParseResult)
    c.Scheduler.ConfigureMasterWorkerChan(in)

    // 创建工作者, 从in输入管道中取出request进行处理, 处理后放入到ParseResult的管道中
    for i := 0; i<c.WorkerCount; i    {
        // 创建WorkerCount个工作者
        c.createWorker(in, out)
    }

    // 第二步: 把种子放到任务调度器中
    for _, seed := range seeds {
        c.Scheduler.Submit(seed)
    }

    // 第三步: 处理工作者线程返回的ParseResult, 将items打印, requests再次添加到任务调度器中
    for {
        pr := <- out
        for _, item := range pr.Items  {
            fmt.Printf("内容项: %v n", item)
        }

        for _, p := range pr.Req{
            c.Scheduler.Submit(p)
        }
    }
}

func (c *ConcurrectEngine) createWorker(in chan Request, out chan ParseResult) {
    go func() {
        // 从in中取出request请求
        req := <- in
        parseResult, e := worker(req)
        if e != nil {
            return
        }
        out <- parseResult
    }()
}

拆解分析

代码语言:javascript复制
// 定义了一个并发的引擎结构, 目的是, 区别于之前写的单机版的Engine.
type ConcurrectEngine struct {
    Scheduler Scheduler
    WorkerCount int
}

调度器接口

这里是定义了一个并发引擎. 每个引擎engine都有自己的Run方法.

代码语言:javascript复制
// 定义了调度器的接口. 既然是调度器, 就要有一个接受请求的方法.
type Scheduler interface{
    // 接收请求的方法
    Submit(Request)
    // 这里比较巧妙的地方是, 其实调度器的chan类型和工作者输入的chan类型是同一个.
    // 调度器输出的request, 就是工作者的输入request, 这里让他们指向同一个地址.
    ConfigureMasterWorkerChan(chan Request)
}

定义了调度器的接口.

在任务开始前, 我们需要将要处理的request放入到调度器中. 所以, 调度器需要有一个方法submit(request)

调度器的输出request,其实就是工作者的输入request. 这里让他们指向同一个地址, 一个变就都变了. 不用再进行拷贝了

引擎执行的代码

在并发的引擎结构中, 第一个方法是调度器的接口. 再具体使用的时候, 传什么类型的调度器, 就执行其具体的调度规则

代码语言:javascript复制
func (c *ConcurrectEngine) Run(seeds ...Request) {
    // 第一步: 做初始化工作
    in := make(chan Request)
    out := make(chan ParseResult)
    c.Scheduler.ConfigureMasterWorkerChan(in)

    // 创建工作者, 从in输入管道中取出request进行处理, 处理后放入到ParseResult的管道中
    for i := 0; i<c.WorkerCount; i    {
        // 创建WorkerCount个工作者
        c.createWorker(in, out)
    }

    // 第二步: 把种子放到任务调度器中
    for _, seed := range seeds {
        c.Scheduler.Submit(seed)
    }

    // 第三步: 处理工作者线程返回的ParseResult, 将items打印, requests再次添加到任务调度器中
    for {
        pr := <- out
        for _, item := range pr.Items  {
            fmt.Printf("内容项: %v n", item)
        }

        for _, p := range pr.Req{
            c.Scheduler.Submit(p)
        }
    }
}

开始工作.

首先, 我们再来思考这个模型

刚开始, 种子请求过来了, 我们将其放入到调度器中. 调度器是单独工作的. 只需要有一个调度器.

然后从调度器中取出一个request. 通过管道in chan request, 将请求发送给worker. worker处理请求, 处理完成以后, 将处理结果放入out chan ParseResut类型的管道中. 通过管道进行通讯

然后引擎从管道中取出ParseResult, 将其中的items部分打印出来, 将新的requests添加到Scheduler调度器中.

循环反复执行

看看engine的Run代码

第一步: 做了初始化操作. 输入管道, 输出管道. 以及Scheduler调度器中的管道就是in输入管道.

第二步: 将初始的种子请求, 放入到任务调度器中.

第三步: 从调度器中取出一个请求, 进行任务处理.

第四步: 处理返回的处理结果.

具体调度器的代码

代码语言:javascript复制
package sechduler

import "aaa/crawler/zhenai/engine"

type SimpleScheduler struct {
    workerChan chan engine.Request

}

// 这里需要是一个地址拷贝, 而不是值拷贝. 也就是workerChan和in使用的是同一个地址
func (s *SimpleScheduler) ConfigureMasterWorkerChan(in chan engine.Request) {
    s.workerChan = in
}

// 将请求添加到任务调度器
func (s *SimpleScheduler) Submit(r engine.Request) {
    go func() {
        s.workerChan <- r
    }()
}

这里, 调度器里的submit定义为一个go routine, 原因是, 如果不定义为goroutine, 会出现循环等待. 然后卡死, 为什么会卡死呢?

根本原因还是管道的特性. 必须要有人从管道中取走数据, 且同时有人向管道中放数据, 这样才可以, 没有人取数据或没有人发数据, 管道就会一直等待.

代码语言:javascript复制
func (c *ConcurrectEngine) createWorker(in chan Request, out chan ParseResult) {
    go func() {
        for  {
            // 从in中取出request请求
            req := <- in
            parseResult, e := worker(req)
            if e != nil {
                return
            }
            out <- parseResult
        }
    }()
}

看这个工作者, 从in中取出request, 然后处理后发送给out. 在run中在取出out的ParseResult, 发送给任务调度器. 任务调度器的submit方法里, 将request添加到workerChan. 也就是只有workerChan添加成功了, 反过来这一些列的流程才能继续执行. 但是workerChan是否能够添加成功呢? 这又取决于, 是否有woker取走workerChan中的request. 现在有10个goroutine, 10个goroutine都工作起来的, 都开始等待新的worker取走request. 但是有没有新的worker执行工作了, 所以, 就进入了循环等待. 出现卡住的现象

要解决这个问题, 其实也很简单. 保证workerChan不会循环等待. 给workerChan开一个单独的goroutine. 这样, 在这里就不会循环等待了. 执行到submit, 就开了一个goroutine. 然后这个动作就执行完了, 那么worker就有功夫去workerChan中取数据了, 这样submit中的request也可以添加到workerChan了, 整个链路有活起来了.

其实最终的架构变成了这样

为每一个request创建了一个goroutine. 然后等待去发送到worker. 发送完成以后, 就关闭了.

执行结果:

三. 并发爬虫第二版---将scheduler和worker都变成一个队列

为什么要将Scheduler和worker变成队列呢?

在上面, 我们刚开始会出现循环等待, 程序卡死, 为了解决这个问题, 我们给scheduler的submit添加了一个单独的goroutine. 让request放入workerChan的过程中迅速执行完毕, 不要等待.

但这样有个缺点, 我开了一个goroutine, 这个goroutine执行的怎么样, 执行了么?我们是不知道的. 没有办法跟踪

对于worker来说, 有10个worker. 每次都是这10个worker去workerChan里面抢request, 如果我想要把某个request分发给指定的worker去执行, 这样是不可以的.

这样如果我们想做负载均衡, 就会很困难了. 因此. 我们将scheduler和worker都变成一个队列. 然后可以受我们的控制. 我们可以主动分发

感受go使用channel进行通信

这个demo写完了以后, 最大的感受就是go使用channel进行通信. 在scheduler和request使用的是channel进行通信, scheduler和worker之间也是使用的channel进行通信.

结构

这个结构的重点, 在于Scheduler, 在Scheduler里面, 管理了两个队列. 一个是Request队列, 一个是worker队列. 过来的请求, 发送给空闲的worker去工作.

也就是说, worker还是原来的worker . request还是原来的request, 变化的是调度器, 调度器变成了一个队列调度器

来看看具体的代码实现

代码语言:javascript复制
package sechduler

import "aaa/crawler/zhenai/engine"

type QueuedScheduler struct {
    RequestChan chan engine.Request
    // 工作者channel, 他的类型是request类型的chan. 工作者需要接受request类型的chan, 然后进行工作
    // 多个工作者之间, 也是一个chan
    WorkerChan chan chan engine.Request
}

func (q *QueuedScheduler) Submit(r engine.Request) {
    q.RequestChan <- r
}

func (q *QueuedScheduler) ConfigureMasterWorkerChan(chan engine.Request) {
    panic("implement me")
}

/**
 * 告诉我哪一个worker已经准备好, 可以工作了
 */
func (q *QueuedScheduler) WorkerReady(worker chan engine.Request) {
    q.WorkerChan <- worker
}

func (q *QueuedScheduler) Run() {
    q.RequestChan = make(chan engine.Request)
    q.WorkerChan = make(chan chan engine.Request)
    go func() {
        var requestQ []engine.Request
        var workerQ []chan engine.Request
        for {
            var requestActive engine.Request
            var workerActive chan engine.Request
            if len(requestQ) > 0 && len(workerQ) > 0 {
                requestActive = requestQ[0]
                workerActive = workerQ[0]
            }

            select {
            case r := <-q.RequestChan:
                // 发送给worker
                requestQ = append(requestQ, r)
            case w := <-q.WorkerChan:
                // 将下一个请求发送给worker
                workerQ = append(workerQ, w)
            case workerActive <- requestActive:
                    requestQ = requestQ[1:]
                    workerQ = workerQ[1:]
            }
        }
    }()
}

这个结构, 充分体现了, 使用channel进行通信

  1. 外部有一个请求过来了, 那么调动submit将请求发送到request chan
  2. 外部有一个worker已经准备好可以工作了, 调用workerReady,将准备好的worker放入到workerChan
  3. 接下来, 如果有请求来, 我们就想请求添加到request队列. 如果有worker准备好了, 从workerChan中取出放入到worker队列
  4. 当request队列中有请求过来, 且worker队列中有等待的worker的时候, 就把这个请求发送给这个worker, 让worker开始工作, 处理request

engine做简单修改

代码语言:javascript复制
package engine

import "fmt"

// 定义了一个并发的引擎结构, 目的是, 区别于之前写的单机版的Engine.
type ConcurrectEngine struct {
    Scheduler Scheduler
    WorkerCount int
    WorkChan chan interface{}
}

// 定义了调度器的接口. 既然是调度器, 就要有一个接受请求的方法.
type Scheduler interface{
    // 接收请求的方法
    Submit(Request)
    // 这里比较巧妙的地方是, 其实调度器的chan类型和工作者输入的chan类型是同一个.
    // 调度器输出的request, 就是工作者的输入request, 这里让他们指向同一个地址.
    ConfigureMasterWorkerChan(chan Request)
    WorkerReady(chan Request)
    Run()
}

func (c *ConcurrectEngine) Run(seeds ...Request) {
    // 第一步: 做初始化工作
    out := make(chan ParseResult)
    c.Scheduler.Run()

    // 创建工作者, 从in输入管道中取出request进行处理, 处理后放入到ParseResult的管道中
    for i := 0; i<c.WorkerCount; i    {
        // 创建WorkerCount个工作者
        c.createWorker(c.Scheduler, out)
    }

    // 第二步: 把种子放到任务调度器中
    for _, seed := range seeds {
        c.Scheduler.Submit(seed)
    }

    // 第三步: 处理工作者线程返回的ParseResult, 将items打印, requests再次添加到任务调度器中
    itemCount := 0
    for {
        pr := <- out
        for _, item := range pr.Items  {
            fmt.Printf("内容项:  %d, %v n", itemCount, item)
            itemCount   
        }

        for _, p := range pr.Req {
            c.Scheduler.Submit(p)
        }
    }
}

func (c *ConcurrectEngine) createWorker(sche Scheduler, out chan ParseResult) {
    in := make(chan Request)
    go func() {
        for  {
            // 告诉调度器, 我已经准备好开始工作了
            sche.WorkerReady(in)
            // 从in中取出request请求
            req := <- in
            parseResult, e := worker(req)
            if e != nil {
                return
            }
            out <- parseResult
        }
    }()
}

这样功能是完成了, but, 来看看simple和queued这两个Scheduler调度器. 先来看看他们的接口

代码语言:javascript复制
// 定义了调度器的接口. 既然是调度器, 就要有一个接受请求的方法.
type Scheduler interface{
    // 接收请求的方法
    Submit(Request)
    // 这里比较巧妙的地方是, 其实调度器的chan类型和工作者输入的chan类型是同一个.
    // 调度器输出的request, 就是工作者的输入request, 这里让他们指向同一个地址.
    ConfigureMasterWorkerChan(chan Request)
    WorkerReady(chan Request)
    Run()
}

simple 实现了接口的前两个方法, 而queued没有实现第二个方法. 下面我们来统一一下接口的方法. 先看看第二个方法

第二个方法是干什么用的?

ConfigureMasterWorkerChan(chan Request)

在simple中, 我们所有的worker都有一个共同的输入request. 通过 这个方法ConfigureMasterWorkerChan(chan Request), 我们将worker的输入和simple中的workerChan关联了

在Queued中, 我们的每一个worker都有一个自己的输入 request, 和第一个的区别是第一个是所有worker公用一个chan request. 而第二种是每个worker自己一个request

我们对这件事进行一个抽象, 在engine中, 调用哪一个Scheduler, 他是不知道的. 那么到底worker是公用一个chan request, 还是每个worker有一个chan reqeust呢, 他也是不知道的.

那么谁知道呢? 具体的Scheduler调度器知道. 也就是, Simple Scheduler知道, Queued Scheduler也知道.

因此, 我们写一个方法, 来问各种类型的调度器要chan request.


上面是把获得的item打印了, 下面我们来设计入库的模块

分析: 其实,我们要做是什么呢? 将打印变成入库操作. but, 有一个问题. 我们在fetch阶段, 能直接入库么?和数据库交互的速度, 肯定会影响fetch的速度.

其实这里有两种解决方案

  1. 放到内存list中. 批量保存
  2. 使用chan, 间数据发送到管道里面, 在单开一个goroutine, 去取数据, 将取出来的数据save到数据库.

因为, 我们是在学习go, 所以,采用第二种方式

三. docker和ElasticSearh

数据库---我们使用elasticSearch, 搭建在docker容器之上

因为都是初次接触, 所以,一步一步. 先搭建起docker的环境,然后在docker上安装elasticSearch.

Docker 的主要用途,目前有三大类

Docker 的主要用途,目前有三大类。

(1)提供一次性的环境。比如,本地测试他人的软件、持续集成的时候提供单元测试和构建的环境。

(2)提供弹性的云服务。因为 Docker 容器可以随开随关,很适合动态扩容和缩容。

(3)组建微服务架构。通过多个容器,一台机器可以跑多个服务,因此在本机就可以模拟出微服务架构。

而我们这里应该是使用docker的第三类用途. 后面我们要做分布式版的爬虫, 可以使用docker来模拟多台服务器间调用.

docker的安装和使用

第一步: 官网docker: https://docs.docker.com/

    我是mac: 下载地址: https://hub.docker.com/editions/community/docker-ce-desktop-mac/

第二步: 注册docker. 我之前注册过,直接登录即可

第三步: 基本命令

代码语言:javascript复制
查看docker有哪些命令
docker
代码语言:javascript复制
查看docker的版本信息
docker version
代码语言:javascript复制
查看docker的基本信息
docker info

比如docker有两部分组成,sercer和client. 我们启动的docker desktop就是一个server. 在控制台使用docker命令就是一个client

Server中的Registry是下载镜像的地址.

elasticSearch是什么

参考这篇文章, 说的很明白:https://blog.csdn.net/paicmis/article/details/82535018

Lucene是单机的模式,如果你的数据量超过了一台物理机的容量,你需要扩容,将数据拆分成2份放在不同的集群,这个就是典型的分布式计算了。需要拷贝容错,机器宕机,数据一致性等复杂的场景,这个实现就比较复杂了。

ES解决了这些问题

1、自动维护数据的分布到多个节点的索引的建立,还有搜索请求分布到多个节点的执行

2、自动维护数据的冗余副本,保证了一旦机器宕机,不会丢失数据

3、封装了更多高级的功能,例如聚合分析的功能,基于地理位置的搜索

ElasticSearch的功能

分布式的搜索引擎和数据分析引擎

搜索:网站的站内搜索,IT系统的检索

数据分析:电商网站,统计销售排名前10的商家

全文检索,结构化检索,数据分析

全文检索:我想搜索商品名称包含某个关键字的商品

结构化检索:我想搜索商品分类为日化用品的商品都有哪些

数据分析:我们分析每一个商品分类下有多少个商品

对海量数据进行近实时的处理

分布式:ES自动可以将海量数据分散到多台服务器上去存储和检索

海联数据的处理:分布式以后,就可以采用大量的服务器去存储和检索数据,自然而然就可以实现海量数据的处理了

近实时:检索数据要花费1小时(这就不要近实时,离线批处理,batch-processing);在秒级别对数据进行搜索和分析

ElasticSearch的应用场景

维基百科

The Guardian(国外新闻网站)

Stack Overflow(国外的程序异常讨论论坛)

GitHub(开源代码管理)

电商网站

日志数据分析

商品价格监控网站

BI系统

站内搜索

ElasticSearch的特点

可以作为一个大型分布式集群(数百台服务器)技术,处理PB级数据,服务大公司;也可以运行在单机上,服务小公司

Elasticsearch不是什么新技术,主要是将全文检索、数据分析以及分布式技术,合并在了一起

对用户而言,是开箱即用的,非常简单,作为中小型的应用,直接3分钟部署一下ES

Elasticsearch作为传统数据库的一个补充,比如全文检索,同义词处理,相关度排名,复杂数据分析,海量数据的近实时处理;

先大概了解一下elasticSearch可以干什么, 接下来我们在使用

ElasticSearch安装和使用

第一步: 下载并运行elasticSearch

代码语言:javascript复制
下载并运行elasticSearch
docker run -d -p9200:9200 daocloud.io/library/elasticsearch

这里的-p 后面跟的是端口号. 
第一个9200表示映射到物理机的端口是9200
第二个9200表示elasticSearch在虚拟机中运行的端口是9200

第二步: 查询运行情况

表示当前已经运行起来的elasticSearch

第三步: 在浏览器输入localhost:9200

看到如上信息, 表示已经启动成功了

第四步: 简单了解如何使用elasticSearch

打开postman

我们可以直接运行localhost:9200 ,GET请求, 获取到当前已经连接的elasticSearch数据库

接下来我们添加一条记录

在elasticSearch中Index/Type/id ,对应于数据库的是index--->database, Type---->table, id--->记录的id

比如:

添加一条记录为1的数据

代码语言:javascript复制
请求方式: POST
请求的url: localhost:9200/pachong/user/1

执行后的结果

代码语言:javascript复制
{
    "_index": "pachong",
    "_type": "user",
    "_id": "1",
    "_version": 1,
    "result": "created",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "created": true
}

前三个字段分别是: 库名, 表明, 记录的id. result表示当前是created.

来查询刚刚保存的记录

代码语言:javascript复制
请求方式: GET
请求url: localhost:9200/pachong/user/1

就获取到了刚刚添加的记录

代码语言:javascript复制
{
    "_index": "pachong",
    "_type": "user",
    "_id": "1",
    "_version": 1,
    "found": true,
    "_source": {
        "name": "lxl",
        "age": 12
    }
}

如果想查询所有的记录呢?

代码语言:javascript复制
请求方式: GET
请求url: localhost:9200/pachong/user/_search
输入参数: 空

返回结果:

代码语言:javascript复制
{
    "took": 97,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 2,
        "max_score": 1,
        "hits": [
            {
                "_index": "pachong",
                "_type": "user",
                "_id": "2",
                "_score": 1,
                "_source": {
                    "name": "ykk",
                    "age": 43
                }
            },
            {
                "_index": "pachong",
                "_type": "user",
                "_id": "1",
                "_score": 1,
                "_source": {
                    "name": "lxl",
                    "age": 12
                }
            }
        ]
    }
}

这就查询到了数据库中的所有记录

根据条件查询, 比如查询名字是ykk的

代码语言:javascript复制
请求方式: GET
请求url: localhost:9200/pachong/user/_search?q=ykk

查询结果

代码语言:javascript复制
{
    "took": 4,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 1,
        "max_score": 0.25811607,
        "hits": [
            {
                "_index": "pachong",
                "_type": "user",
                "_id": "2",
                "_score": 0.25811607,
                "_source": {
                    "name": "ykk",
                    "age": 43
                }
            }
        ]
    }
}

四. 将数据保存到elasticSearch

通过上面的demo, 我们知道elasticSearch使用的是restful的风格增删改查数据的. 那么我们可以直接使用http.Get, http.Post就可以实现

处理这种方式, 市面上还有对应的elasticSearch客户端, 我们使用客户端会更方便

百度 elasticSearch client--->找到对应的官网, https://www.elastic.co/guide/en/elasticsearch/client/index.html

点击最后一个社区版客户端. 进去之后查看go的elasticSearch client

这里我们使用第二类Google go, 点击进去是插件的源码, 看后面的README.

我们使用的elasticSearch版本是5.12, 所以下载对应的5版本的client.

在go中执行下载client.

下载完成就可以使用了, 使用文档: https://godoc.org/gopkg.in/olivere/elastic.v5

代码语言:javascript复制
func Save(item interface{}) (string, error) {
    // 第一步: 创建一个elasticSearch client
    // 文档: https://godoc.org/gopkg.in/olivere/elastic.v5
    client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"),
        // sniff: 是用来维护客户端集群的状态的. 但是我们的集群不跑在本机上,而是跑在docker上.
        // docker只有一个内网, 内网我们看不见.所以没有办法维护状态, 设置为false
        elastic.SetSniff(false))
    if err != nil {
        return "", err
    }

    // 将item数据保存到的elasticSearch中
    // elastic search 保存数据使用的Index.
    // elasticSearch 数据中的三部分分别是 /index/type/id  . 对应数据库的/database/table/id
    response, err := client.Index().
        Index("dataing_profile").
        Type("zhenai").
        BodyJson(item).
        Do(context.Background())
    if err != nil {
        return "", err
    }

    //  v 表示打印结构体带上结构体的名称
    return response.Id, nil

}

第一步: 建立elasticSearch 连接

第二步: 保存数据. elasticSearch保存数据的方法是Index

完毕, 是不是很简单....

接下来写一个单元测试, 测试我们是否添加数据成功了

代码语言:javascript复制
func TestSave(t *testing.T) {
    tests := []struct {
        name string
        item model.Profile
    }{
        {"1",
        model.Profile{
                Name: "冰靓晴雪",
                Marry: "已婚",
                Age:23,
                Xingzuo: "白羊座",
                Height: 154,
                Weight: 49,
                WorkerPlace: "北京",
                Salary: "10000-20000",
                Occuption: "销售总监",
                Education: "大学本科",
                Jiguan: "四川",
                Hobby: "理财",
                House:"有房",
                Car:"有车",
                IsChild:"不要小孩",
             },
        },
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            id, e := Save(tt.item)
            if e != nil {
                panic(e)
            }

            client, e := elastic.NewClient(elastic.SetSniff(false))
            if e != nil {
                panic(e)
            }

            resp, e := client.Get().
                Index("dataing_profile").
                Type("zhenai").
                Id(id).
                Do(context.Background())
            if e != nil {
                panic(e)
            }

            fmt.Printf("%s", *resp.Source)

            user := model.Profile{}
            e = json.Unmarshal(*resp.Source, &user)
            if e != nil {
                panic(e)
            }

            if user != tt.item {
                t.Errorf("error")
            }
        })
    }
}

第一步: 写测试的cases

第二步: 循环遍历cases, 调用save方法保存

第三步: 取出save中保存的内容

第四步: 和初始值对比, 是否一致

0 人点赞