golang源码分析:爬虫colly(part I)

2022-08-02 19:22:36 浏览数 (1)

Colly 是一个采用 Go 语言编写的 Web 爬虫框架:

https://github.com/gocolly/colly

http://go-colly.org/docs/

可以非常方便地写一个爬虫,下面是源码中的一个例子

代码语言:javascript复制
package main

import (
  "fmt"

  "github.com/gocolly/colly/v2"
)

func main() {
  // Instantiate default collector
  c := colly.NewCollector(
    // Visit only domains: hackerspaces.org, wiki.hackerspaces.org
    colly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"),
  )
  // On every a element which has href attribute call callback
  c.OnHTML("a[href]", func(e *colly.HTMLElement) {
    link := e.Attr("href")
    // Print link
    fmt.Printf("Link found: %q -> %sn", e.Text, link)
    // Visit link found on page
    // Only those links are visited which are in AllowedDomains
    c.Visit(e.Request.AbsoluteURL(link))
  })

  // Before making a request print "Visiting ..."
  c.OnRequest(func(r *colly.Request) {
    fmt.Println("Visiting", r.URL.String())
  })

  // Start scraping on https://hackerspaces.org
  c.Visit("https://hackerspaces.org/")
}

大体上分为三个步骤:

1,初始化爬虫,设置参数 c := colly.NewCollector

2,注册网页解析函数c.OnHTML("a[href]", func(e *colly.HTMLElement)

3,开始爬取网页内容 c.Visit("https://hackerspaces.org/")

Collector

Colly的首要入口是一个 Collector 对象。Collector 管理网络通信并负责在 Collector job 运行时执行附加的回调。使用colly,你必须初始化一个Collector

回调函数的执行顺序

  1. OnRequest 请求发出之前调用
  2. OnError 请求过程中出现Error时调用
  3. OnResponse 收到response后调用
  4. OnHTML 如果收到的内容是HTML,就在onResponse执行后调用
  5. OnXML 如果收到的内容是HTML或者XML,就在onHTML执行后调用
  6. OnScraped OnXML执行后调用

源码分析

colly的源码量不大:

代码语言:javascript复制
colly_test.go           http_backend.go         storage
context.go              http_trace.go           unmarshal.go
LICENSE.txt             context_test.go         http_trace_test.go      unmarshal_test.go
README.md               debug                   proxy                   xmlelement.go
VERSION                 extensions              queue                   xmlelement_test.go
_examples               go.mod                  request.go
cmd                     go.sum                  response.go
colly.go                htmlelement.go  

A,其中_examples 目录提供了常见使用场景的案例

B,cmd目录下面就一个文件cmd/colly/colly.go,提供了通过cli的方式生成上述例子的模板代码,cli命令行参数处理使用的是github.com/jawher/mow.cli,将模板代码拆分成几部分,比如scraperHeadTemplate

代码的逻辑就是根据用户输入提示生成对应代码

代码语言:javascript复制
  func main() 
    app := cli.App("colly", "Scraping Framework for Gophers")
    scraper.WriteString(htmlCallbackTemplate)
    app.Run(os.Args)

C,debug目录提供了两种debug的方式:本地日志和网页,其中debug/debug.go文件定义了debug的接口

代码语言:javascript复制
type Debugger interface {
  // Init initializes the backend
  Init() error
  // Event receives a new collector event.
  Event(e *Event)
}

在debug/logdebugger.go文件中提供了日志方式的debuger

代码语言:javascript复制
 func (l *LogDebugger) Event(e *Event)
      l.logger.Printf("[d] %d [m - %s] %q (%s)n", i, e.CollectorID, e.RequestID, e.Type, e.Values, time.Since(l.start))

在debug/webdebugger.go中实现了网页版,整体逻辑是:在前端,加载一个index页面,然后不断通过/status接口获取最新信息;在后端,不断响应各种事件,将结果存下来,前端不断获取存下来的信息

代码语言:javascript复制
    func (w *WebDebugger) Init() error 
      w.Address = "127.0.0.1:7676"
      http.HandleFunc("/status", w.statusHandler)
代码语言:javascript复制
    func (w *WebDebugger) Event(e *Event) 
        w.CurrentRequests[e.RequestID] = requestInfo{
      URL:         e.Values["url"],
      Started:     time.Now(),
      ID:          e.RequestID,
      CollectorID: e.CollectorID,
    }
代码语言:javascript复制
 func (w *WebDebugger) indexHandler(wr http.ResponseWriter, r *http.Request) 
     function fetchStatus() {
        $.getJSON("/status", function(data) {
代码语言:javascript复制
func (w *WebDebugger) statusHandler(wr http.ResponseWriter, r *http.Request) 
      jsonData, err := json.MarshalIndent(w, "", "  ")

D,extensions目录定义了一些扩展,比如加useragent,加refer,限制url长度等等,主要有下面几个文件

1,extensions/extensions.go

2,extensions/random_user_agent.go

代码语言:javascript复制
    genFirefoxUA,
    genChromeUA,
    genEdgeUA,
    genOperaUA,
    genMobileUcwebUA,
    genMobileNexus10UA,
    ffVersions
    chromeVersions
    osStrings

3,extensions/referer.go

代码语言:javascript复制
    func Referer(c *colly.Collector)

4,extensions/url_length_filter.go

代码语言:javascript复制
  func URLLengthFilter(c *colly.Collector, URLLengthLimit int)

E,proxy目录定义了通过轮转方式获取代理url,主要就一个文件proxy/proxy.go

代码语言:javascript复制
type roundRobinSwitcher struct {
  proxyURLs []*url.URL
  index     uint32
}
func (r *roundRobinSwitcher) GetProxy(pr *http.Request) (*url.URL, error)
func RoundRobinProxySwitcher(ProxyURLs ...string) (colly.ProxyFunc, error) 

F,queue接口定义了爬虫的队列,逻辑实现在queue/queue.go中,首先定义了依赖的存储的接口,主要是三个:存储请求,获取请求,获取队列长度

代码语言:javascript复制
type Storage interface {
  // Init initializes the storage
  Init() error
  // AddRequest adds a serialized request to the queue
  AddRequest([]byte) error
  // GetRequest pops the next request from the queue
  // or returns error if the queue is empty
  GetRequest() ([]byte, error)
  // QueueSize returns with the size of the queue
  QueueSize() (int, error)
}

在队列中限制消费者数量:

代码语言:javascript复制
type Queue struct {
  // Threads defines the number of consumer threads
  Threads int
  storage Storage
  wake    chan struct{}
  mut     sync.Mutex // guards wake and running
  running bool
}

队列上的接口如下

代码语言:javascript复制
func (q *Queue) IsEmpty() bool
func (q *Queue) AddURL(URL string) error 
func (q *Queue) AddRequest(r *colly.Request) error
func (q *Queue) storeRequest(r *colly.Request) error 
func (q *Queue) Size() (int, error) 
func (q *Queue) Run(c *colly.Collector) error
func (q *Queue) Stop()
func (q *Queue) loop(c *colly.Collector, requestc chan<- *colly.Request, complete <-chan struct{}, errc chan<- error){ 
  req, err = q.loadRequest(c)
 }
func (q *Queue) loadRequest(c *colly.Collector) (*colly.Request, error){
  copy(copied, buf)
}

其中最重要的接口就是run

代码语言:javascript复制
func (q *Queue) Run(c *colly.Collector) error{
    for i := 0; i < q.Threads; i   {
      go independentRunner(requestc, complete)
    }
    go q.loop(c, requestc, complete, errc)
}

它起了n个任务(协程)调用independentRunner去获取网页的结果,然后起一个行程将获得的结果copy到buff里

简单的任务场景,我们可以将爬取的数据存储在内存里,InMemoryQueueStorage 实现了storage的所有接口:

代码语言:javascript复制
type InMemoryQueueStorage struct {
  // MaxSize defines the capacity of the queue.
  // New requests are discarded if the queue size reaches MaxSize
  MaxSize int
  lock    *sync.RWMutex
  size    int
  first   *inMemoryQueueItem
  last    *inMemoryQueueItem
}  
代码语言:javascript复制
func (q *InMemoryQueueStorage) Init() error
func (q *InMemoryQueueStorage) AddRequest(r []byte) error 
func (q *InMemoryQueueStorage) GetRequest() ([]byte, error)
func (q *InMemoryQueueStorage) QueueSize() (int, error)

存储的数采用了单向链表的结构,内容是请求

代码语言:javascript复制
type inMemoryQueueItem struct {
  Request []byte
  Next    *inMemoryQueueItem
}

independentRunner的作用就是发送请求,就是一个http的客户端

代码语言:javascript复制
func independentRunner(requestc <-chan *colly.Request, complete chan<- struct{}) 
      req.Do()

G,storage定义了存储的具体数据解析和序列化,具体接口定义在storage/storage.go中:

代码语言:javascript复制
type Storage interface {
  // Init initializes the storage
  Init() error
  // Visited receives and stores a request ID that is visited by the Collector
  Visited(requestID uint64) error
  // IsVisited returns true if the request was visited before IsVisited
  // is called
  IsVisited(requestID uint64) (bool, error)
  // Cookies retrieves stored cookies for a given host
  Cookies(u *url.URL) string
  // SetCookies stores cookies for a given host
  SetCookies(u *url.URL, cookies string)
}

InMemoryStorage实现了上述接口

代码语言:javascript复制
type InMemoryStorage struct {
  visitedURLs map[uint64]bool
  lock        *sync.RWMutex
  jar         *cookiejar.Jar
}  
代码语言:javascript复制
func (s *InMemoryStorage) Init() error

0 人点赞