Crawlab 支持Nodejs爬虫插入数据

2022-08-27 23:43:29 浏览数 (1)

crawlab官方文档的scrapy爬虫支持爬取的数据插入到mongodb里面,但是官方没有提供nodejs爬虫对应的组件,这里nodejs爬虫爬取的数据需要自己按照一定的规则插入mongodb里面,才能达到类似的效果,这里记录下解决问题的过程

一、背景

crawlab 官方文档的scrapy 爬虫爬取的结果可以在任务栏的数据那里看到,但是官方没有指引nodejs如何达到类似的成果。这对使用nodejs在crawlab上写爬虫的同学非常不友好。

官方scrapy爬虫的结果,可以直接在任务结果上看官方scrapy爬虫的结果,可以直接在任务结果上看

nodejs要支持这样的效果,需要先分析crawlab爬虫任务完成后,具体是怎么写入数据库的。简单先安装的查看数据库数据的docker镜像,研究下数据库变化先。

二、安装mongo-express

在之前的文章Crawlab 支持Nodejs脚本执行 - 腾讯云开发者社区-腾讯云 (tencent.com) 里面安装支持nodejs的crawlab多方法基础上,我们添加一个mongo-express镜像,用户查看数据库变化。修改docker-compose.yml如下

代码语言:javascript复制
version: '3.3'
services:
  master:
    image: node-crawlab
    container_name: crawlab_master
    restart: always
    environment:
      CRAWLAB_NODE_MASTER: "Y"  # Y: 主节点
      # mongo host address. 在 Docker-Compose 网络中,直接引用 service 名称
      CRAWLAB_MONGO_HOST: "mongo"  
      CRAWLAB_MONGO_PORT: "27017"  # mongo port 
      CRAWLAB_MONGO_DB: "crawlab"  # mongo database 
      CRAWLAB_MONGO_USERNAME: "username"  # mongo username
      CRAWLAB_MONGO_PASSWORD: "password"  # mongo password 
      CRAWLAB_MONGO_AUTHSOURCE: "admin"  # mongo auth source 
    volumes:
      - "/opt/crawlab/master:/data"  # 持久化 crawlab 数据
    ports:
      - "8080:8080"  # 开放 api 端口
    depends_on:
      - mongo

  mongo:
    image: mongo:4.2
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: "username"  # mongo username
      MONGO_INITDB_ROOT_PASSWORD: "password"  # mongo password
    volumes:
      - "/opt/crawlab/mongo/data/db:/data/db"  # 持久化 mo ngo 数据
    ports:
      - "27017:27017"  # 开放 mongo 端口到宿主机
      
  mongo-express:
    image: mongo-express:0.49.0
    restart: always
    environment:
      ME_CONFIG_MONGODB_SERVER: "mongo"
      ME_CONFIG_MONGODB_PORT: "27017"
      ME_CONFIG_MONGODB_ENABLE_ADMIN: true
      ME_CONFIG_MONGODB_ADMINUSERNAME: "username"
      ME_CONFIG_MONGODB_ADMINPASSWORD: "password"
      ME_CONFIG_MONGODB_AUTH_DATABASE: "admin"
      ME_CONFIG_MONGODB_AUTH_USERNAME: "username"
      ME_CONFIG_MONGODB_AUTH_PASSWORD: "password"
    ports:
      - "8081:8081"  # exposed api port
    depends_on:
      - mongo

然后我们访问http://服务器ip:8081 就可以看到mongodb里面的数据了,记得要开启对应端口的防火墙

三、分析爬虫结果如何插入数据库

任务结果集用于存放结果数据任务结果集用于存放结果数据
可以看到数据都存在对应爬虫的result结果集上可以看到数据都存在对应爬虫的result结果集上
result结果集,通过_tid与任务task和task_stats关联result结果集,通过_tid与任务task和task_stats关联
task和task_stats存放着任务和任务状态task和task_stats存放着任务和任务状态

上图列举的爬取的数据在数据库里如何存储,省略了部分细节。其实插入爬虫结果数据的大概流程如下

1. 爬虫获取结果数据

2. 查询本次爬虫的结果应该存储到哪个collection里面(其实就是爬虫配置里的结果集)

3. 获取本次爬虫的任务ID,每条爬虫结果都添加_tid等于当前任务id

3. 将爬虫爬取的结果存储到步骤2的结果集里

四、使用nodejs完成数据库插入

打印nodejs执行的时候都环境变量,可以看到当前的任务id和Mongodb数据库的各种链接参数

nodejs 爬虫脚本process.env变量nodejs 爬虫脚本process.env变量

那我们要做的事情就很简单了,大概是下面这样

1. 链接mongodb

2. 获取当前任务需要操作的结果集collect

3. 将当前任务的结果,插入到结果集里面, 同时为每条结果绑定_tid参数

这里贴一下代码实现

代码语言:javascript复制
const { MongoClient } = require('mongodb');
const { ObjectId } = require('mongodb');

const defalutOptions = {
  username: process.env['CRAWLAB_MONGO_USERNAME'],
  password: process.env['CRAWLAB_MONGO_PASSWORD'],
  host: process.env['CRAWLAB_MONGO_HOST'],
  port: process.env['CRAWLAB_MONGO_PORT'],
  db: process.env['CRAWLAB_MONGO_DB'],
  path: "",
}

const spiderCollectionName = 'spiders'
const taskCollectionName = 'tasks'
const taskStatusCollectionName = 'task_stats'
const colCollectionName = 'data_collections'

let client;

async function getClient(resultOption) {
  const url = `mongodb://${resultOption.username}:${resultOption.password}@${resultOption.host}:${resultOption.port}${resultOption.path}`;
  const tempClient = new MongoClient(url);
  await tempClient.connect();
  console.log('Connected successfully to server');
  client = tempClient;
  return client;
}

function enHanceCollect(collection,db) {
  const taskId = ObjectId(process.env['CRAWLAB_TASK_ID']);
  const taskStatusColCollection = db.collection(taskStatusCollectionName);
  collection.addDataList = async function(insertArray) {
    const newParams = insertArray.map((item)=>{
      item._tid = taskId;
      return item;
    })
    const insertResult = await collection.insertMany(newParams);
    const updateResult = await taskStatusColCollection.updateOne({ _id: taskId }, { $set: { result_count: insertArray.length } });
    return insertResult;
  }
}


async function getCollectionName(db) {
  const spidersCollection = db.collection(spiderCollectionName);
  const tasksCollection = db.collection(taskCollectionName);
  const colCollection = db.collection(colCollectionName);
  const currentTask = await tasksCollection.find({_id: ObjectId(process.env['CRAWLAB_TASK_ID'])}).toArray();
  const spiderId = currentTask && currentTask[0] && currentTask[0].spider_id;
  const currentSpider = await spidersCollection.find({_id: spiderId }).toArray();
  const colId = currentSpider && currentSpider[0] && currentSpider[0].col_id;
  const currentCol = await colCollection.find({_id: colId }).toArray();
  const returnCollectionName = currentCol && currentCol[0] && currentCol[0].name;
  return returnCollectionName;
}

async function getCollection(option) {
    const userOption = option || {}
  	const resultOption = Object.assign({}, defalutOptions, userOption)
    if (!client) {
      client = await getClient(resultOption);
    }
    
    const db = client.db(resultOption.db);
    const collectionName = await getCollectionName(db)
    const collection = db.collection(collectionName);
  	enHanceCollect(collection,db);
    return collection;
}

module.exports = {
    getCollection: getCollection,
    client: client,
}

我们使用这个函数插入一点测试数据,测试一下这个代码

代码语言:javascript复制
const { getCollection } = require('../util/mongodb.js')

async function handleBody() {
   const collection = await getCollection();
   const testData =  [
     	{
          book: 631763,
          page: 102,
        },
     	{
          book: 631763,
          page: 102,
        },{
          book: 631763,
          page: 102,
        },{
          book: 631763,
          page: 102,
        },{

          book: 631763,
          page: 102,
        }
    ]
   
   const insertResult = await collection.addDataList(testData);
   console.log('Inserted documents =>', insertResult);
}

handleBody().then(()=>{
  process.exit(0)
})

可以看到nodejs的数据已经插入完成了可以看到nodejs的数据已经插入完成了

0 人点赞