Skip to content

Commit

Permalink
Merge pull request #229 from ammirate/pipeline_return_crawl_result
Browse files Browse the repository at this point in the history
pipelines: return a crawl_result object instead of just the record
  • Loading branch information
ammirate authored Feb 27, 2018
2 parents 4ce3f05 + e19b13a commit 5f1212c
Show file tree
Hide file tree
Showing 19 changed files with 331 additions and 190 deletions.
56 changes: 56 additions & 0 deletions hepcrawl/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
#
# This file is part of hepcrawl.
# Copyright (C) 2018 CERN.
#
# hepcrawl is a free software; you can redistribute it and/or modify it
# under the terms of the Revised BD License; see LICENSE file for
# more details.

import copy


class CrawlResult(object):
"""Representation of a crawling result.
This class defines the API used by the pipeline to send crawl results.
Attributes:
record (dict): the crawled record.
file_name (str): the name of the remote file crawled.
source_data (str): content of the remote file crawled.
errors (list): list of dictionaries with keys "exception" and
"traceback", which collects all the errors occurred during the parsing
phase.
"""
def __init__(self, record, file_name="", source_data=""):
self.record = record
self.file_name = file_name
self.source_data = source_data
self.errors = []

def add_error(self, exception_class, traceback):
error = {
'exception': exception_class,
'traceback': traceback
}
self.errors.append(error)

@staticmethod
def from_parsed_item(parsed_item):
result = CrawlResult(
record=parsed_item['record'],
file_name=parsed_item.get('file_name'),
source_data=parsed_item.get('source_data')
)

if parsed_item.get('exception'):
result.add_error(
parsed_item['exception'],
parsed_item['traceback']
)

return result

def to_dict(self):
return copy.deepcopy(self.__dict__)
44 changes: 22 additions & 22 deletions hepcrawl/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from scrapy import Request
from scrapy.pipelines.files import FilesPipeline

from .tohep import item_to_hep
from .api import CrawlResult
from .settings import FILES_STORE
from .utils import RecordFile

Expand Down Expand Up @@ -94,29 +94,29 @@ def __init__(self):
def open_spider(self, spider):
self.results_data = []

def _post_enhance_item(self, item, spider):
source = spider.source

enhanced_record = item_to_hep(
item=item,
source=source,
)
spider.logger.debug(
'Got post-enhanced hep record:\n%s' % pprint.pformat(
enhanced_record
)
)
return enhanced_record

def process_item(self, item, spider):
"""Convert internal format to INSPIRE data model."""
"""Add the crawl result to the results data after processing it.
This function enhances the crawled record from the parsed item, then
creates a crawl_result object from the parsed item and adds it to
`self.results_data`. In this way, the record and eventual errors
occurred processing it are saved.
Args:
item (ParsedItem): the parsed item returned by parsing the
crawled record.
spider (StatefulSpider): the current spider.
Returns:
(dict): the crawl result containing either the crawled
record or the errors occurred during the process.
"""
self.count += 1

hep_record = self._post_enhance_item(item, spider)

self.results_data.append(hep_record)

return hep_record
item.record = item.to_hep(source=spider.source)
spider.logger.debug('Got post-enhanced hep record:\n%s' % pprint.pformat(item.record))
crawl_result = CrawlResult.from_parsed_item(item).to_dict()
self.results_data.append(crawl_result)
return crawl_result

def _prepare_payload(self, spider):
"""Return payload for push."""
Expand Down
104 changes: 57 additions & 47 deletions hepcrawl/spiders/desy_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from __future__ import absolute_import, division, print_function

import os
import sys
import traceback

from flask.app import Flask
from inspire_dojson import marcxml2record
Expand Down Expand Up @@ -211,7 +209,6 @@ def _get_full_uri(current_url, base_url, schema='ftp', hostname=None):
def parse(self, response):
"""Parse a ``Desy`` XML file into a :class:`hepcrawl.utils.ParsedItem`.
"""

self.logger.info('Got record from url/path: {0}'.format(response.url))
self.logger.info('FTP enabled: {0}'.format(self.ftp_enabled))
ftp_params = None
Expand All @@ -234,33 +231,18 @@ def parse(self, response):
marcxml_records = self._get_marcxml_records(response.body)
self.logger.info('Got %d marc xml records' % len(marcxml_records))
self.logger.info('Getting hep records...')
hep_records = self._hep_records_from_marcxml(marcxml_records)
self.logger.info('Got %d hep records' % len(hep_records))

for hep_record in hep_records:
files_to_download = [
self._get_full_uri(
current_url=document['url'],
base_url=base_url,
schema=url_schema,
hostname=hostname,
)
for document in hep_record.get('documents', [])
if self._has_to_be_downloaded(document['url'])
]

self.logger.info(
'Got the following attached documents to download: %s'
% files_to_download
)
parsed_item = ParsedItem(
record=hep_record,
file_urls=files_to_download,
ftp_params=ftp_params,
record_format='hep',
)
self.logger.info('Got item: %s' % parsed_item)
parsed_items = self._parsed_items_from_marcxml(
marcxml_records=marcxml_records,
base_url=base_url,
hostname=hostname,
url_schema=url_schema,
ftp_params=ftp_params,
url=response.url
)
self.logger.info('Got %d hep records' % len(parsed_items))

for parsed_item in parsed_items:
yield parsed_item

@staticmethod
Expand All @@ -277,24 +259,52 @@ def _get_marcxml_records(response_body):

return [etree.tostring(item) for item in list_items]

def _hep_records_from_marcxml(self, marcxml_records):
def _create_json_record(xml_record):
app = Flask('hepcrawl')
app.config.update(
self.settings.getdict('MARC_TO_HEP_SETTINGS', {})
)
with app.app_context():
try:
hep_record = marcxml2record(xml_record)
except Exception as e:
return {'xml_record': xml_record, 'error': repr(e),
'traceback': traceback.format_tb(sys.exc_info()[2])}

return hep_record
def _parsed_items_from_marcxml(
self,
marcxml_records,
base_url="",
hostname="",
url_schema=None,
ftp_params=None,
url=""
):
app = Flask('hepcrawl')
app.config.update(self.settings.getdict('MARC_TO_HEP_SETTINGS', {}))
file_name = url.split('/')[-1]

hep_records = []
for xml_record in marcxml_records:
json_record = _create_json_record(xml_record)
hep_records.append(json_record)
with app.app_context():
parsed_items = []
for xml_record in marcxml_records:
try:
record = marcxml2record(xml_record)
parsed_item = ParsedItem(record=record, record_format='hep')
parsed_item.ftp_params = ftp_params
parsed_item.file_name = file_name

files_to_download = [
self._get_full_uri(
current_url=document['url'],
base_url=base_url,
schema=url_schema,
hostname=hostname,
)
for document in parsed_item.record.get('documents', [])
if self._has_to_be_downloaded(document['url'])
]
parsed_item.file_urls = files_to_download

self.logger.info('Got the following attached documents to download: %s'% files_to_download)
self.logger.info('Got item: %s' % parsed_item)

parsed_items.append(parsed_item)

return hep_records
except Exception as e:
error_parsed_item = ParsedItem.from_exception(
record_format='hep',
exception=repr(e),
source_data=xml_record,
file_name=file_name
)
parsed_items.append(error_parsed_item)

return parsed_items
2 changes: 0 additions & 2 deletions hepcrawl/testlib/scrapyd_coverage_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ def save_coverage():


if __name__ == '__main__':
print("\n--------------- CUSTOM SCRAPYD RUNNER ----------------\n")

start_coverage()
main()
save_coverage()
52 changes: 1 addition & 51 deletions hepcrawl/tohep.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@

from __future__ import absolute_import, division, print_function

import os
import datetime
import logging
import os

from inspire_schemas.api import LiteratureBuilder


LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -147,54 +145,6 @@ def _normalize_hepcrawl_record(item, source):
return item


def item_to_hep(
item,
source,
):
"""Get an output ready hep formatted record from the given
:class:`hepcrawl.utils.ParsedItem`, whatever format it's record might be.
Args:
item(hepcrawl.utils.ParsedItem): item to convert.
source(str): string identifying the source for this item (ex. 'arXiv').
Returns:
hepcrawl.utils.ParsedItem: the new item, with the internal record
formated as hep record.
Raises:
UnknownItemFormat: if the source item format is unknown.
"""
builder = LiteratureBuilder(
source=source
)

builder.add_acquisition_source(
source=source,
method='hepcrawl',
date=datetime.datetime.now().isoformat(),
submission_number=os.environ.get('SCRAPY_JOB', ''),
)

item.record['acquisition_source'] = builder.record['acquisition_source']

if item.record_format == 'hep':
return hep_to_hep(
hep_record=item.record,
record_files=item.record_files,
)
elif item.record_format == 'hepcrawl':
record = _normalize_hepcrawl_record(
item=item.record,
source=source,
)
return hepcrawl_to_hep(dict(record))
else:
raise UnknownItemFormat(
'Unknown ParsedItem::{}'.format(item.record_format)
)


def hep_to_hep(hep_record, record_files):
"""This is needed to be able to patch the ``documents`` in the record.
Expand Down
Loading

0 comments on commit 5f1212c

Please sign in to comment.