trafilatura 网页解析原理分析

2023-11-01 14:12:25 浏览数 (2)

trafilatura 介绍

Trafilatura是一个Python包和命令行工具,用于收集网络上的文本。其主要应用场景包括网络爬虫下载和网页解析等。

今天我们不讨论爬虫和抓取,主要看他的数据解析是如何做的。

extract初体验

代码语言:javascript复制
from trafilatura import fetch_url, extract
url = 'https://haokan.baidu.com/v?pd=wisenatural&vid=292842333147844218'
downloaded = fetch_url(url)
result = extract(downloaded, output_format="json")
print(result)

结果:

代码语言:javascript复制
{"title": "日本东电公布核污水排海瞬间:核对程序后启动,有工作人员抽检浓度", "author": null, "hostname": "baidu.com", "date": "2023-10-30", "fingerprint": "ffffffffffffffff", "id": null, "license": null, "comments": "", "raw_text": "日本东电公布核污水排海瞬间:核对程序后启动,有工作人员抽检浓度,国际,国际社会,好看视频下载客户端创作中心消息上传视频61万次播放 | 发布时间:2023年8月25日01.3万收藏日本东电公布核污水排海瞬间:核对程序后启动,有工作人员抽检浓度接下来播放猜你喜欢", "text": "日本东电公布核污水排海瞬间:核对程序后启动,有工作人员抽检浓度,国际,国际社会,好看视频下载客户端创作中心消息上传视频61万次播放 | 发布时间:2023年8月25日01.3万收藏日本东电公布核污水排海瞬间:核对程序后启动,有工作人员抽检浓度接下来播放猜你喜欢", "language": null, "image": "https://f7.baidu.com/it/u=3372340810,1415940711&fm=222&app=106&f=JPEG@s_0,w_800,h_1000,q_80,f_auto", "pagetype": "video", "source": "https://haokan.baidu.com/v?vid=292842333147844218&tab=recommend", "source-hostname": "haokan.baidu.com", "excerpt": "日本东电公布核污水排海瞬间:核对程序后启动,有工作人员抽检浓度,本视频由青蜂侠Bee提供,607479次播放,好看视频是由百度团队打造的集内涵和颜值于一身的专业短视频聚合平台", "categories": "", "tags": "国际,国际社会,科普资料,科普咨询,科普电影,科普电视剧,科普综艺,科普话题,科普帖子,科普mv,科普视频,科普在线,科普下载,科普观看,科普直播,资料,咨询,电影,电视剧,综艺,话题,帖子,mv,视频,在线,下载,观看,直播,科普,国际社会,科学,日本东电,核污水"}

extract 分析

extract 函数定义如下,默认output_format为txt,只提取正文

代码语言:javascript复制
def extract(filecontent, url=None, record_id=None, no_fallback=False,
            favor_precision=False, favor_recall=False,
            include_comments=True, output_format='txt',
            tei_validation=False, target_language=None,
            include_tables=True, include_images=False, include_formatting=False,
            include_links=False, deduplicate=False,
            date_extraction_params=None,
            only_with_metadata=False, with_metadata=False,
            max_tree_size=None, url_blacklist=None, author_blacklist=None,
            settingsfile=None, config=DEFAULT_CONFIG,
            **kwargs):
    """Main function exposed by the package:
       Wrapper for text extraction and conversion to chosen output format.

extract里,主要是调用bare_extraction

  • 首先用lxml加载tree = load_html(filecontent)
  • 然后check_html_lang, 如果设置了target_language, 但网页不匹配会返回错误

meta解析

  • 接着解析extract_metadata meta信息解析,从header里解析内容
    • 首先,examine_meta, 先尝试extract_opengraph,有的网站符合Search meta tags following the OpenGraph guidelines (https://ogp.me/) 规范
    • 如果不符合OpenGraph规范,则从meta里提取 extract_meta_json,这里有很多meta配置,比如OG_AUTHOR = {'og:author', 'og:article:author'}, 当meta里有匹配的规则时,会填充到meta中
    • title 识别失败的,从H1 和 配置的xpath获取
代码语言:javascript复制
title_xpaths = [
    '//*[(self::h1 or self::h2)][contains(@class, "post-title") or contains(@class, "entry-title") or contains(@class, "headline") or contains(@id, "headline") or contains(@itemprop, "headline") or contains(@class, "post__title") or contains(@class, "article-title")]',
    '//*[@class="entry-title" or @class="post-title"]',
    '//*[(self::h1 or self::h2 or self::h3)][contains(@class, "title") or contains(@id, "title")]',
]
  • author 识别失败的,从配置的xpath获取
代码语言:javascript复制
author_xpaths = [
    '//*[(self::a or self::address or self::div or self::link or self::p or self::span or self::strong)][@rel="author" or @id="author" or @class="author" or @itemprop="author name" or rel="me" or contains(@class, "author-name") or contains(@class, "AuthorName") or contains(@class, "authorName") or contains(@class, "author name")]|//author', # specific and almost specific
    '//*[(self::a or self::div or self::h3 or self::h4 or self::p or self::span)][contains(@class, "author") or contains(@id, "author") or contains(@itemprop, "author") or @class="byline" or contains(@id, "zuozhe") or contains(@class, "zuozhe") or contains(@id, "bianji") or contains(@class, "bianji") or contains(@id, "xiaobian") or contains(@class, "xiaobian") or contains(@class, "submitted-by") or contains(@class, "posted-by") or @class="username" or @class="BBL" or contains(@class, "journalist-name")]', # almost generic and generic, last ones not common
    '//*[contains(translate(@id, "A", "a"), "author") or contains(translate(@class, "A", "a"), "author") or contains(@class, "screenname") or contains(@data-component, "Byline") or contains(@itemprop, "author") or contains(@class, "writer") or contains(translate(@class, "B", "b"), "byline")]', # last resort: any element
]
  • image 识别失败的,从配置的xpath获取
代码语言:javascript复制
    for elem in tree.xpath('.//head/meta[@property="og:image" or @property="og:image:url"][@content]'):
        return elem.get('content')

    for elem in tree.xpath('.//head/meta[@property="twitter:image" or @property="twitter:image:src"][@content]'):
        return elem.get('content')
  • sitename 识别失败的,会从title去识别, examine_title_element, 靠正则去匹配HTMLTITLE_REGEX = re.compile(r'^(. )?s [–•·—|⁄*⋆~‹«<›»>:-]s (. )$') # part without dots?, 这个对中文网页好像不太行
  • 其他的还同步识别了tags,就是关键词

正文识别

正文识别,配置options

代码语言:javascript复制
    # regroup extraction options
    options = Extractor(config, no_fallback, favor_precision, favor_recall,
                        include_comments, include_formatting, include_links,
                        include_images, include_tables, deduplicate,
                        target_language)

然后backup tree 和清理tree

代码语言:javascript复制
    # backup (or not) for further processing
    tree_backup_1 = deepcopy(tree) if no_fallback is False else None
    tree_backup_2 = deepcopy(tree)

    # clean   use LXML cleaner
    cleaned_tree = tree_cleaning(tree, options)
    cleaned_tree_backup = deepcopy(cleaned_tree)

    # convert tags, the rest does not work without conversion
    cleaned_tree = convert_tags(cleaned_tree, options, url or document.url)

识别评论:

代码语言:javascript复制
    # comments first, then remove
    if include_comments is True:
        commentsbody, temp_comments, len_comments, cleaned_tree = extract_comments(cleaned_tree, options)
    else:
        commentsbody, temp_comments, len_comments = None, '', 0

提升精度, 将一些unwanted_nodes清理掉:

代码语言:javascript复制
        if favor_precision is True:
            cleaned_tree = prune_unwanted_nodes(cleaned_tree, REMOVE_COMMENTS_XPATH)

这里的REMOVE_COMMENTS_XPATH, 主要是一些常见的comment

代码语言:javascript复制
REMOVE_COMMENTS_XPATH = [
    """.//*[(self::div or self::list or self::section)][
    starts-with(translate(@id, "C","c"), 'comment') or
    starts-with(translate(@class, "C","c"), 'comment') or
    contains(@class, 'article-comments') or contains(@class, 'post-comments')
    or starts-with(@id, 'comol') or starts-with(@id, 'disqus_thread')
    or starts-with(@id, 'dsq-comments')
    ]""",
]

然后是正餐,提取正文

代码语言:javascript复制
# extract content
postbody, temp_text, len_text = extract_content(cleaned_tree, options)

主要原理是使用一组XPath表达式找到页面的主要内容,然后提取相关元素,并去除不需要的部分:

  • 定义正文潜在的候选标签potential_tags
代码语言:javascript复制
TAG_CATALOG = frozenset(['blockquote', 'code', 'del', 'head', 'hi', 'lb', 'list', 'p', 'pre', 'quote'])

potential_tags = set(TAG_CATALOG)
if options.tables is True:
    potential_tags.update(['table', 'td', 'th', 'tr'])
if options.images is True:
    potential_tags.add('graphic')
if options.links is True:
    potential_tags.add('ref')

然后,从配置的xpath里,去提取正文,来看看定义的xpath,真是大力出奇迹

代码语言:javascript复制
BODY_XPATH = [
    '''.//*[(self::article or self::div or self::main or self::section)][
    @class="post" or @class="entry" or
    contains(@class, "post-text") or contains(@class, "post_text") or
    contains(@class, "post-body") or contains(@class, "post-entry") or contains(@class, "postentry") or
    contains(@class, "post-content") or contains(@class, "post_content") or
    contains(@class, "postcontent") or contains(@class, "postContent") or
    contains(@class, "article-text") or contains(@class, "articletext") or contains(@class, "articleText")
    or contains(@id, "entry-content") or
    contains(@class, "entry-content") or contains(@id, "article-content") or
    contains(@class, "article-content") or contains(@id, "article__content") or
    contains(@class, "article__content") or contains(@id, "article-body") or
    contains(@class, "article-body") or contains(@id, "article__body") or
    contains(@class, "article__body") or @itemprop="articleBody" or
    contains(translate(@id, "B", "b"), "articlebody") or contains(translate(@class, "B", "b"), "articleBody")
    or @id="articleContent" or contains(@class, "ArticleContent") or
    contains(@class, "page-content") or contains(@class, "text-content") or
    contains(@id, "body-text") or contains(@class, "body-text") or
    contains(@class, "article__container") or contains(@id, "art-content") or contains(@class, "art-content")][1]''',
    # (…)[1] = first occurrence
    '(.//article)[1]',
    """(.//*[(self::article or self::div or self::main or self::section)][
    contains(@class, 'post-bodycopy') or
    contains(@class, 'storycontent') or contains(@class, 'story-content') or
    @class='postarea' or @class='art-postcontent' or
    contains(@class, 'theme-content') or contains(@class, 'blog-content') or
    contains(@class, 'section-content') or contains(@class, 'single-content') or
    contains(@class, 'single-post') or
    contains(@class, 'main-column') or contains(@class, 'wpb_text_column') or
    starts-with(@id, 'primary') or starts-with(@class, 'article ') or @class="text" or
    @id="article" or @class="cell" or @id="story" or @class="story" or
    contains(@class, "story-body") or contains(@class, "field-body") or
    contains(translate(@class, "FULTEX","fultex"), "fulltext")
    or @role='article'])[1]""",
    '''(.//*[(self::article or self::div or self::main or self::section)][
    contains(@id, "content-main") or contains(@class, "content-main") or contains(@class, "content_main") or
    contains(@id, "content-body") or contains(@class, "content-body") or contains(@id, "contentBody")
    or contains(@class, "content__body") or contains(translate(@id, "CM","cm"), "main-content") or contains(translate(@class, "CM","cm"), "main-content")
    or contains(translate(@class, "CP","cp"), "page-content") or
    @id="content" or @class="content"])[1]''',
    '(.//*[(self::article or self::div or self::section)][starts-with(@class, "main") or starts-with(@id, "main") or starts-with(@role, "main")])[1]|(.//main)[1]',
]

然后解析简单了,依次遍历:

代码语言:javascript复制
    for expr in BODY_XPATH:
        # select tree if the expression has been found
        try:
            subtree = tree.xpath(expr)[0]
        except IndexError:
            continue

对于匹配上的,开始细节处理:

代码语言:javascript复制
    # prune the subtree
    subtree = prune_unwanted_sections(subtree, potential_tags, options)
    # second pass?
    # subtree = delete_by_link_density(subtree, 'list', backtracking=False, favor_precision=options.precision)
    if 'table' in potential_tags or options.precision is True:
        for elem in subtree.iter('table'):
            if link_density_test_tables(elem) is True:
                elem.getparent().remove(elem)
    # skip if empty tree
    if len(subtree) == 0:
        continue
    # no paragraphs containing text, or not enough
    ptest = subtree.xpath('//p//text()')
    if options.recall is True:
        factor = 5
    elif options.precision is True:
        factor = 1
    else:
        factor = 3
    if not ptest or len(''.join(ptest)) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE') * factor:
        potential_tags.add('div')
    # polish list of potential tags
    if 'ref' not in potential_tags:
        strip_tags(subtree, 'ref')
    if 'span' not in potential_tags:
        strip_tags(subtree, 'span')
    LOGGER.debug(sorted(potential_tags))
    # proper extraction
    subelems = subtree.xpath('.//*')
    # e.g. only lb-elems in a div
    if {e.tag for e in subelems} == {'lb'}:
        subelems = [subtree]
    # extract content
    result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options) for e in subelems)))
    # remove trailing titles
    while len(result_body) > 0 and (result_body[-1].tag in NOT_AT_THE_END):
        result_body[-1].getparent().remove(result_body[-1])
    # exit the loop if the result has children
    if len(result_body) > 1:
        LOGGER.debug(expr)
        break
  • 对子树进行修剪,删除不需要的部分。
  • 如果potential_tags中包含'table'或者选项中设置了优先精度precision,那么遍历子树中的所有表格元素,如果里面链接过多,会剔除这个表格
  • 如果子树为空,跳过当前循环
  • 如果没有包含文本的段落,或者段落数量不足,将'div'添加到候选标签集合中
  • 如果span和ref不在候选标签,去除'ref'和'span'标签
  • 最主要的提取内容,handle_textelem, 如果提取到结果,则退出(这里可能有问题,比如后面的xpath也能匹配到内容)
代码语言:javascript复制
def handle_textelem(element, potential_tags, options):
    '''Process text element and determine how to deal with its content'''
    new_element = None
    # bypass: nested elements
    if element.tag == 'list':
        new_element = handle_lists(element, options)
    elif element.tag in CODES_QUOTES:
        new_element = handle_quotes(element, options)
    elif element.tag == 'head':
        new_element = handle_titles(element, options)
    elif element.tag == 'p':
        new_element = handle_paragraphs(element, potential_tags, options)
    elif element.tag == 'lb':
        if text_chars_test(element.tail) is True:
            element = process_node(element, options)
            if element is not None:
                new_element = Element('p')
                new_element.text = element.tail
    elif element.tag in FORMATTING:
        new_element = handle_formatting(element, options)  # process_node(element, options)
    elif element.tag == 'table' and 'table' in potential_tags:
        new_element = handle_table(element, potential_tags, options)
    elif element.tag == 'graphic' and 'graphic' in potential_tags:
        new_element = handle_image(element)
    else:
        # other elements (div, ??, ??)
        new_element = handle_other_elements(element, potential_tags, options)
    return new_element

该函数,根据element.tag的值,调用不同的处理函数来处理不同类型的元素。例如,如果element.tag等于'list',则调用handle_lists函数;如果element.tag在CODES_QUOTES中,则调用handle_quotes函数,依此类推。

我们看一个处理p标签的:

代码语言:javascript复制
def handle_paragraphs(element, potential_tags, options):
    '''Process paragraphs (p) elements along with their children,
       trim and clean the content'''
    element.attrib.clear()
    # strip_tags(element, 'p') # change in precision due to spaces?
    # no children
    if len(element) == 0:
        processed_element = process_node(element, options)
        if processed_element is not None:
            return processed_element
        return None
    # children
    processed_element = Element(element.tag)
    for child in element.iter('*'):
        if child.tag not in potential_tags and child.tag != 'done':
            LOGGER.debug('unexpected in p: %s %s %s', child.tag, child.text, child.tail)
            continue
        # spacing = child.tag in SPACING_PROTECTED  # todo: outputformat.startswith('xml')?
        # todo: act on spacing here?
        processed_child = handle_textnode(child, options, comments_fix=False, preserve_spaces=True)
        if processed_child is not None:
            # todo: needing attention!
            if processed_child.tag == 'p':
                LOGGER.debug('extra p within p: %s %s %s', processed_child.tag, processed_child.text,
                             processed_child.tail)
                if processed_element.text:
                    processed_element.text  = ' '   processed_child.text
                else:
                    processed_element.text = processed_child.text
                continue
            # handle formatting
            newsub = Element(child.tag)
            if processed_child.tag in P_FORMATTING:
                # check depth and clean
                if len(processed_child) > 0:
                    for item in processed_child:  # children are lists
                        if text_chars_test(item.text) is True:
                            item.text = ' '   item.text
                        strip_tags(processed_child, item.tag)
                # correct attributes
                if child.tag == 'hi':
                    newsub.set('rend', child.get('rend'))
                elif child.tag == 'ref':
                    if child.get('target') is not None:
                        newsub.set('target', child.get('target'))
            # handle line breaks
            # elif processed_child.tag == 'lb':
            #    try:
            #        processed_child.tail = process_node(child, options).tail
            #    except AttributeError:  # no text
            #        pass
            # prepare text
            # todo: to be moved to handle_textnode()
            # if text_chars_test(processed_child.text) is False:
            #    processed_child.text = ''
            # if text_chars_test(processed_child.tail) is False:
            #    processed_child.tail = ''
            # if there are already children
            # if len(processed_element) > 0:
            #    if text_chars_test(processed_child.tail) is True:
            #        newsub.tail = processed_child.text   processed_child.tail
            #    else:
            #        newsub.tail = processed_child.text
            newsub.text, newsub.tail = processed_child.text, processed_child.tail
            processed_element.append(newsub)
        child.tag = 'done'
    # finish
    if len(processed_element) > 0:
        # clean trailing lb-elements
        if (
                processed_element[-1].tag == 'lb'
                and processed_element[-1].tail is None
        ):
            processed_element[-1].getparent().remove(processed_element[-1])
        return processed_element
    if processed_element.text:
        return processed_element
    LOGGER.debug('discarding p-child: %s', tostring(processed_element))
    return None
  • 首先,清除element的属性
  • 根据element是否有子元素,分别进行处理。
    • 如果没有子元素,直接调用process_node函数处理element,并返回处理结果
    • 如果有子元素,创建一个新的Element对象,遍历所有子元素,并根据子元素的标签进行相应的处理, 遍历提取文本

咱们视角继续回到extract_content, 如果result_body有值,那么

代码语言:javascript复制
    temp_text = ' '.join(result_body.itertext()).strip()
    # try parsing wild <p> elements if nothing found or text too short
    # todo: test precision and recall settings here
    if len(result_body) == 0 or len(temp_text) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'):
        result_body = recover_wild_text(backup_tree, result_body, options, potential_tags)
        temp_text = ' '.join(result_body.itertext()).strip()
    # filter output
    strip_elements(result_body, 'done')
    strip_tags(result_body, 'div')
    # return
    return result_body, temp_text, len(temp_text)
  • result_body中的所有文本节点连接成一个字符串,并去除首尾的空白字符,赋值给temp_text
  • 如果result_body为空或者temp_text的长度小于配置中的最小提取大小(MIN_EXTRACTED_SIZE),则尝试从备份树(backup_tree)中恢复原始文本,并重新计算temp_text (这个对于上面提到的误判,有一个修正)
  • 对result_body进行过滤,移除包含'done'文本的元素和所有'div'标签
  • 最后返回处理后的result_body、temp_text以及temp_text的长度

继续回到bare_extraction

代码语言:javascript复制
        # extract content
        postbody, temp_text, len_text = extract_content(cleaned_tree, options)

        # compare if necessary
        if no_fallback is False:
            postbody, temp_text, len_text = compare_extraction(cleaned_tree_backup, tree_backup_1, url, postbody, temp_text, len_text, options)
        # add baseline as additional fallback
        # rescue: try to use original/dirty tree # and favor_precision is False=?
        if len_text < config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'):
            postbody, temp_text, len_text = baseline(tree_backup_2)
            LOGGER.debug('non-clean extracted length: %s (extraction)', len_text)

        # tree size sanity check
        if max_tree_size is not None:
            # strip tags
            if len(postbody) > max_tree_size:
                LOGGER.debug('output tree too long: %s', len(postbody))
                strip_tags(postbody, 'hi')
            # still too long, raise an error
            if len(postbody) > max_tree_size:
                LOGGER.debug('output tree too long: %s, discarding file', len(postbody))
                raise ValueError
        # size checks
        if len_comments < config.getint('DEFAULT', 'MIN_EXTRACTED_COMM_SIZE'):
            LOGGER.debug('not enough comments %s', url)
        if len_text < config.getint('DEFAULT', 'MIN_OUTPUT_SIZE') and len_comments < config.getint('DEFAULT',
                                                                                                   'MIN_OUTPUT_COMM_SIZE'):
            LOGGER.debug('text and comments not long enough: %s %s', len_text, len_comments)
            raise ValueError

        # check duplicates at body level
        if deduplicate is True and duplicate_test(postbody, config) is True:
            LOGGER.debug('discarding duplicate document for URL %s', url)
            raise ValueError

        # sanity check on language
        if target_language is not None:
            is_not_target_lang, document = language_filter(temp_text, temp_comments, target_language, document)
            if is_not_target_lang is True:
                LOGGER.debug('wrong language for URL %s', url)
                raise ValueError

这里主要检查抽取结果:

  • 如果no_fallback为False,则使用compare_extraction函数比较两个备份树(cleaned_tree_backup和tree_backup_1),看那个结果更好
  • 如果提取的文本长度小于配置中的最小提取大小(MIN_EXTRACTED_SIZE),则尝试使用原始tree解析,并更新postbody、temp_text和len_text。
  • 如果输出树的长度大于最大树大小(max_tree_size),则删除标签以减小树的大小。如果仍然太大,则引发错误。
  • 重复检测, 如果deduplicate,会做duplicate_test,有重复内容的话会报错

总结

Trafilatura 没有采用类似GNE使用文本区块密度的方式来确定正文的方案,用来比较多的xpath规则,覆盖度还是不错的,但是对于未覆盖规则的部分,效果差强人意,需要做一些额外的处理。

可以优化的方向:

  • 和GNE的xpath规则,取并集,覆盖国内主要新闻网站
  • 增加后处理,通过模型去除一些非正文部分

0 人点赞