Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ckanext/datapusher_plus/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,25 @@ def after_upload(
the resource that was uploaded
"""
pass

def datastore_before_update(self, resource_id, existing_info, new_headers):
""" We are about to update the datastore

:param existing_info: The existing information in the datastore.
Empty if the resource is new.
Something like:
{
'Header 1': {'label': 'Header 1'},
'Header 2': {'label': 'Header 2'},
...
}

:param new_headers: The new headers that are to be added to the datastore.
Something like:
[
{'id': 'Header 1', 'type': 'text', 'info': {'label': 'Header 1'}},
{'id': 'Header 2', 'type': 'numeric', 'info': {'label': 'Header 2'}},
...
]
"""
pass
15 changes: 15 additions & 0 deletions ckanext/datapusher_plus/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@
from dateutil.parser import parse as parsedate

from rq import get_current_job
from ckan import plugins
import ckan.plugins.toolkit as tk


import ckanext.datapusher_plus.utils as utils
import ckanext.datapusher_plus.helpers as dph
# from ckanext.datapusher_plus.config import config
from ckanext.datapusher_plus import interfaces

log = logging.getLogger(__name__)


if locale.getdefaultlocale()[0]:
Expand Down Expand Up @@ -149,6 +153,7 @@ def send_resource_to_datastore(
"""
Stores records in CKAN datastore
"""
log.info(f"Sending data to datastore {resource_id}")

if resource_id:
# used to create the "main" resource
Expand Down Expand Up @@ -338,6 +343,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

logger.info(f"_push_to_datastore :: {task_id}")
# check if QSV_BIN and FILE_BIN exists
qsv_bin = tk.config.get("ckanext.datapusher_plus.qsv_bin")
qsv_path = Path(qsv_bin)
Expand Down Expand Up @@ -441,6 +447,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
if USE_PROXY:
kwargs["proxies"] = {"http": DOWNLOAD_PROXY, "https": DOWNLOAD_PROXY}
with requests.get(resource_url, **kwargs) as response:
logger.info(f"Response status {response.status_code} for {resource_url}")
response.raise_for_status()

cl = response.headers.get("content-length")
Expand Down Expand Up @@ -1008,6 +1015,14 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
info_dict = dict(label=original_header_dict.get(idx, "Unnamed Column"))
headers_dicts.append(dict(id=header["id"], type=header_type, info=info_dict))

# Notify plugins the datastore will be updated
for plugin in plugins.PluginImplementations(interfaces.IDataPusher):
plugin.datastore_before_update(
resource_id=resource_id,
existing_info=existing_info,
new_headers=headers_dicts,
)

# Maintain data dictionaries from matching column names
# if data dictionary already exists for this resource as
# we want to preserve the user's data dictionary curations
Expand Down
5 changes: 3 additions & 2 deletions ckanext/datapusher_plus/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ def get_dp_plus_user_apitoken():
if api_token:
return api_token

site_user = tk.get_action("get_site_user")({"ignore_auth": True}, {})
return site_user["apikey"]
raise Exception(
"No API token found. Please set the ckanext.datapusher_plus.api_token config option."
)


def check_response(
Expand Down
Loading