Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions geocoding_ban/custom-recipes/geocoding-ban/recipe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
{
// Meta data for display purposes
"meta" : {
"label": "Geocode with BAN",
"author": "Ministère de l’intérieur — EIG 2017",
"tags": ["geocoding", "dev"]
},

"kind" : "PYTHON",

"inputRoles" : [{
"name": "input",
"label": "Dataset to geocode",
"description": "what input A means",
"arity": "UNARY",
"required": true,
"acceptsDataset": true
}],
"selectableFromDataset": "input",
"outputRoles" : [{
"name": "output",
"label": "Output dataset",
"description": "what main output means",
"arity": "UNARY",
"required": true,
"acceptsDataset": true
}],

"params": [
{
"name": "server_address",
"label": "Server hosting Addok",
"type": "STRING",
"description": "Full url with http://. Do not include the path /search",
"mandatory": true,
"defaultValue": "http://api-adresse.data.gouv.fr"
},
{
"name": "columns",
"label": "Address columns",
"type": "COLUMNS",
"columnRole": "input",
"description": "Multiple columns will be concatenated",
"mandatory": true
},
{
"name": "post_code",
"label": "Column of the postcode",
"type": "COLUMN",
"columnRole": "input",
"description": "Optional: only results of that postcode will be returned",
"mandatory": false
},
{
"name": "city_code",
"label": "Column of the city code",
"type": "COLUMN",
"columnRole": "input",
"description": "Optional: only results of that city code will be return. Do not confuse it with postcodes (despite its ressemblance in France)",
"mandatory": false
},
{
"name": "lines_per_request",
"label": "Lines per request",
"type": "INT",
"defaultValue": 1000,
"mandatory": true,
"description": "Multiple lines per request saves time, but never exceed 8Mb per request"
},
{
"name": "concurent_requests",
"label": "Concurrent requests",
"type": "INT",
"defaultValue": 1,
"mandatory": true,
"description": "Use more than 1 only if you host your own instance. You might otherwise get banned"
},
{
"name": "http_proxy",
"label": "HTTP proxy",
"type": "STRING",
"mandatory": false,
"description": "If you don’t have an internet access and have to use a web proxy. Use the full form with http://"
},
{
"name": "timeout",
"label": "Timeout",
"type": "INT",
"defaultValue": 1000,
"mandatory": true,
"description": "Of the http request, in milliseconds"
},
{
"name": "prefix",
"label": "Result columns prefix",
"type": "STRING",
"defaultValue": "result_",
"mandatory": true,
"description": "Renames the columns returned by the geocoder"
},
{
"name": "error_col",
"label": "Column with error message",
"type": "STRING",
"mandatory": false,
"description": "Optional. The column name will be prefixed"
}
]
}
160 changes: 160 additions & 0 deletions geocoding_ban/custom-recipes/geocoding-ban/recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# coding=utf-8

u"""Geocoding plugin for Dataiku Science Studio (DSS)

It uses (by default) the geocoding server from: http://adresse.data.gouv.fr

It will also work on any private instance of https://github.com/addok/addok

This plugin was developped by the Ministère de l’Intérieur
in the context of the program Entrepreneurs d’Intérêt Général 2017
"""

from concurrent import futures
import dataiku
from dataiku.customrecipe import get_input_names_for_role
from dataiku.customrecipe import get_output_names_for_role
from dataiku.customrecipe import get_recipe_config
import itertools
import logging
import pandas as pd
import requests
import StringIO

# We read the addresses from the input dataset
# And write the coordinates in the output dataset
input_name = get_input_names_for_role('input')[0]
input_dataset = dataiku.Dataset(input_name)

output_name = get_output_names_for_role('output')[0]
output_dataset = dataiku.Dataset(output_name)

# All the variables when building a request
server_address = get_recipe_config()['server_address']
columns = get_recipe_config()['columns']
post_code = get_recipe_config().get('post_code', None)
city_code = get_recipe_config().get('city_code', None)
lines_per_request = int(get_recipe_config()['lines_per_request'])
concurent_requests = int(get_recipe_config()['concurent_requests'])
http_proxy = get_recipe_config().get('http_proxy', None)
timeout = int(get_recipe_config()['timeout'])
prefix = get_recipe_config().get('prefix', None)
error = get_recipe_config().get('error_col', None)
i = 0


def datas():
"""Returns the columns composing the address"""
result = {'columns': columns}
cols = list(columns)

if post_code:
result['postcode'] = post_code
cols.append(post_code)

if city_code:
result['citycode'] = city_code
cols.append(city_code)

return (result, cols)


def adresse_submit(df):
"""Does the actual request to the geocoding server"""
global i
verbosechunksize = 2000
string_io = StringIO.StringIO()
i += lines_per_request
if (i % verbosechunksize) == 0:
logging.info("geocoding chunk %r to %r", i-verbosechunksize, i)

data, cols = datas()
df[cols].to_csv(string_io, encoding="utf-8", index=False)

kwargs = {
'data': data,
'files': {'data': string_io.getvalue()},
'timeout': timeout,
'url': "{}/search/csv".format(server_address)
}

if http_proxy:
kwargs['proxies'] = {'http': http_proxy}

response = requests.post(**kwargs)

if error:
error_col = 'result_{}'.format(error)
else:
error_col = None

if response.status_code == 200:
content = StringIO.StringIO(response.content.decode('utf-8-sig'))
result = pd.read_csv(content, dtype=object)
if error_col:
result[error_col] = None
result = result.rename(columns={'longitude': 'result_longitude',
'latitude': 'result_latitude'})

# We only keep the new columns to avoid modifying the schema
diff = result.axes[1].difference(df.axes[1])

for new_column in diff:
if new_column[0:7] == "result_":
df[new_column.replace("result_", prefix)] = result[new_column]

else:
logging.warning("Chunk %r to %r: no valid response",
i-lines_per_request, i)
df['result_score'] = -1
if error_col:
df["{}{}".format(prefix, error)] = "HTTP Status: {}".format(response.status_code)

return df


def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return itertools.izip_longest(*args, fillvalue=fillvalue)


# We make a first run with a sample to have a valid schema to build a writer
small = input_dataset.get_dataframe(sampling='head',
limit=1,
infer_with_pandas=False)

initial_index = small.axes[1]
geocoded = adresse_submit(small)
output_index = geocoded.axes[1]

if '{}longitude'.format(prefix) not in output_index:
raise Exception('Geocoding failed: unable to make a sample request')

schema = input_dataset.read_schema()

floats = [prefix + column for column in ['longitude', 'latitude', 'score']]
for column in output_index.difference(initial_index):
if column in floats:
schema.append({'name': column, 'type': 'float'})
else:
schema.append({'name': column, 'type': 'string'})

output_dataset.write_schema(schema)
writer = output_dataset.get_writer()

dataset_iter = input_dataset.iter_dataframes(chunksize=lines_per_request,
infer_with_pandas=False)

with futures.ThreadPoolExecutor(max_workers=concurent_requests) as executor:
for chunks in grouper(dataset_iter, 10 * concurent_requests):
j = 0
for s in executor.map(adresse_submit, chunks):
j += lines_per_request
try:
writer.write_dataframe(s)
except Exception as exc:
logging.warning("chunk %r to %r generated an exception: %r\n%r",
j-lines_per_request, j, exc, s)

writer.close()
29 changes: 29 additions & 0 deletions geocoding_ban/plugin.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// This file is the descriptor for the DSS Plugin geocoding_BAN
{
// The identifier of the plugin.
// This must be globally unique, and only contain A-Za-z0-9_-
"id" : "geocoding-ban",

// Version. It is highly recommended to use Semantic Versioning
"version" : "0.0.2",

// Meta data for display purposes
"meta" : {
// Name of this plugin that appears in the interface.
"label": "French geocoding (BAN)",

"description": "Uses the French BAN database to geocode addresses",
"author": "Ministère de l’Intérieur — EIG 2017",

// The icon of a plugin must be one of the FontAwesome 3.1 icons
"icon": "icon-map-marker",

"licenseInfo" : "Apache Software License",

// URL where the user can learn more about the plugin
"url": "https://adresse.data.gouv.fr",

// List of tags for filtering the list of plugins
"tags": ["geocoding", "ban"]
}
}
5 changes: 5 additions & 0 deletions geocoding_ban/requirements.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"python" : [
{"name":"futures"}
]
}