crawlab官方文档的scrapy爬虫支持爬取的数据插入到mongodb里面,但是官方没有提供nodejs爬虫对应的组件,这里nodejs爬虫爬取的数据需要自己按照一定的规则插入mongodb里面,才能达到类似的效果,这里记录下解决问题的过程
一、背景
crawlab 官方文档的scrapy 爬虫爬取的结果可以在任务栏的数据那里看到,但是官方没有指引nodejs如何达到类似的成果。这对使用nodejs在crawlab上写爬虫的同学非常不友好。
nodejs要支持这样的效果,需要先分析crawlab爬虫任务完成后,具体是怎么写入数据库的。简单先安装的查看数据库数据的docker镜像,研究下数据库变化先。
二、安装mongo-express
在之前的文章Crawlab 支持Nodejs脚本执行 - 腾讯云开发者社区-腾讯云 (tencent.com) 里面安装支持nodejs的crawlab多方法基础上,我们添加一个mongo-express镜像,用户查看数据库变化。修改docker-compose.yml如下
代码语言:javascript复制version: '3.3'
services:
master:
image: node-crawlab
container_name: crawlab_master
restart: always
environment:
CRAWLAB_NODE_MASTER: "Y" # Y: 主节点
# mongo host address. 在 Docker-Compose 网络中,直接引用 service 名称
CRAWLAB_MONGO_HOST: "mongo"
CRAWLAB_MONGO_PORT: "27017" # mongo port
CRAWLAB_MONGO_DB: "crawlab" # mongo database
CRAWLAB_MONGO_USERNAME: "username" # mongo username
CRAWLAB_MONGO_PASSWORD: "password" # mongo password
CRAWLAB_MONGO_AUTHSOURCE: "admin" # mongo auth source
volumes:
- "/opt/crawlab/master:/data" # 持久化 crawlab 数据
ports:
- "8080:8080" # 开放 api 端口
depends_on:
- mongo
mongo:
image: mongo:4.2
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: "username" # mongo username
MONGO_INITDB_ROOT_PASSWORD: "password" # mongo password
volumes:
- "/opt/crawlab/mongo/data/db:/data/db" # 持久化 mo ngo 数据
ports:
- "27017:27017" # 开放 mongo 端口到宿主机
mongo-express:
image: mongo-express:0.49.0
restart: always
environment:
ME_CONFIG_MONGODB_SERVER: "mongo"
ME_CONFIG_MONGODB_PORT: "27017"
ME_CONFIG_MONGODB_ENABLE_ADMIN: true
ME_CONFIG_MONGODB_ADMINUSERNAME: "username"
ME_CONFIG_MONGODB_ADMINPASSWORD: "password"
ME_CONFIG_MONGODB_AUTH_DATABASE: "admin"
ME_CONFIG_MONGODB_AUTH_USERNAME: "username"
ME_CONFIG_MONGODB_AUTH_PASSWORD: "password"
ports:
- "8081:8081" # exposed api port
depends_on:
- mongo
然后我们访问http://服务器ip:8081 就可以看到mongodb里面的数据了,记得要开启对应端口的防火墙
三、分析爬虫结果如何插入数据库
上图列举的爬取的数据在数据库里如何存储,省略了部分细节。其实插入爬虫结果数据的大概流程如下
1. 爬虫获取结果数据
2. 查询本次爬虫的结果应该存储到哪个collection里面(其实就是爬虫配置里的结果集)
3. 获取本次爬虫的任务ID,每条爬虫结果都添加_tid等于当前任务id
3. 将爬虫爬取的结果存储到步骤2的结果集里
四、使用nodejs完成数据库插入
打印nodejs执行的时候都环境变量,可以看到当前的任务id和Mongodb数据库的各种链接参数
那我们要做的事情就很简单了,大概是下面这样
1. 链接mongodb
2. 获取当前任务需要操作的结果集collect
3. 将当前任务的结果,插入到结果集里面, 同时为每条结果绑定_tid参数
这里贴一下代码实现
代码语言:javascript复制const { MongoClient } = require('mongodb');
const { ObjectId } = require('mongodb');
const defalutOptions = {
username: process.env['CRAWLAB_MONGO_USERNAME'],
password: process.env['CRAWLAB_MONGO_PASSWORD'],
host: process.env['CRAWLAB_MONGO_HOST'],
port: process.env['CRAWLAB_MONGO_PORT'],
db: process.env['CRAWLAB_MONGO_DB'],
path: "",
}
const spiderCollectionName = 'spiders'
const taskCollectionName = 'tasks'
const taskStatusCollectionName = 'task_stats'
const colCollectionName = 'data_collections'
let client;
async function getClient(resultOption) {
const url = `mongodb://${resultOption.username}:${resultOption.password}@${resultOption.host}:${resultOption.port}${resultOption.path}`;
const tempClient = new MongoClient(url);
await tempClient.connect();
console.log('Connected successfully to server');
client = tempClient;
return client;
}
function enHanceCollect(collection,db) {
const taskId = ObjectId(process.env['CRAWLAB_TASK_ID']);
const taskStatusColCollection = db.collection(taskStatusCollectionName);
collection.addDataList = async function(insertArray) {
const newParams = insertArray.map((item)=>{
item._tid = taskId;
return item;
})
const insertResult = await collection.insertMany(newParams);
const updateResult = await taskStatusColCollection.updateOne({ _id: taskId }, { $set: { result_count: insertArray.length } });
return insertResult;
}
}
async function getCollectionName(db) {
const spidersCollection = db.collection(spiderCollectionName);
const tasksCollection = db.collection(taskCollectionName);
const colCollection = db.collection(colCollectionName);
const currentTask = await tasksCollection.find({_id: ObjectId(process.env['CRAWLAB_TASK_ID'])}).toArray();
const spiderId = currentTask && currentTask[0] && currentTask[0].spider_id;
const currentSpider = await spidersCollection.find({_id: spiderId }).toArray();
const colId = currentSpider && currentSpider[0] && currentSpider[0].col_id;
const currentCol = await colCollection.find({_id: colId }).toArray();
const returnCollectionName = currentCol && currentCol[0] && currentCol[0].name;
return returnCollectionName;
}
async function getCollection(option) {
const userOption = option || {}
const resultOption = Object.assign({}, defalutOptions, userOption)
if (!client) {
client = await getClient(resultOption);
}
const db = client.db(resultOption.db);
const collectionName = await getCollectionName(db)
const collection = db.collection(collectionName);
enHanceCollect(collection,db);
return collection;
}
module.exports = {
getCollection: getCollection,
client: client,
}
我们使用这个函数插入一点测试数据,测试一下这个代码
代码语言:javascript复制const { getCollection } = require('../util/mongodb.js')
async function handleBody() {
const collection = await getCollection();
const testData = [
{
book: 631763,
page: 102,
},
{
book: 631763,
page: 102,
},{
book: 631763,
page: 102,
},{
book: 631763,
page: 102,
},{
book: 631763,
page: 102,
}
]
const insertResult = await collection.addDataList(testData);
console.log('Inserted documents =>', insertResult);
}
handleBody().then(()=>{
process.exit(0)
})