Skip to content

Commit

Permalink
Switch Marketo from annotated_schema to metadata (#54)
Browse files Browse the repository at this point in the history
* Initial attempt at metadata conversion

* Fix refactor mistake, update tests

* Fix validate_state, add unittest for it

* Removed integration tests from circle until we have working creds
  • Loading branch information
cosimon authored Sep 9, 2019
1 parent 3e87460 commit f841e30
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 113 deletions.
16 changes: 15 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
jobs:
build:
docker:
- image: circleci/python:3.5-jessie-browsers
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester
steps:
- checkout
- run:
Expand All @@ -22,3 +22,17 @@ jobs:
command: |
source ~/.virtualenvs/tap-marketo/bin/activate
python -m unittest
# - add_ssh_keys
# - run:
# name: 'Integration Tests'
# command: |
# aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh
# source dev_env.sh
# source /usr/local/share/virtualenvs/tap-tester/bin/activate
# run-a-test --tap=tap-marketo \
# --target=target-stitch \
# --orchestrator=stitch-orchestrator \
# --email=harrison+sandboxtest@stitchdata.com \
# --password=$SANDBOX_PASSWORD \
# --client-id=50 \
# tap_tester.suites.marketo
13 changes: 7 additions & 6 deletions tap_marketo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@

def validate_state(config, catalog, state):
for stream in catalog["streams"]:
if not stream["schema"].get("selected"):
# If a stream is deselected while it's the current stream, unset the
# current stream.
if stream["tap_stream_id"] == get_currently_syncing(state):
set_currently_syncing(state, None)
continue
for mdata in stream['metadata']:
if mdata['breadcrumb'] == () and mdata['metadata'].get('selected') != True:
# If a stream is deselected while it's the current stream, unset the
# current stream.
if stream["tap_stream_id"] == get_currently_syncing(state):
set_currently_syncing(state, None)
break

replication_key = determine_replication_key(stream['tap_stream_id'])
if not replication_key:
Expand Down
19 changes: 8 additions & 11 deletions tap_marketo/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ def get_schema_for_type(typ, breadcrumb, mdata, null=False):

if null:
rtn["type"] = [rtn["type"], "null"]
rtn["inclusion"] = "available"
mdata = metadata.write(mdata, breadcrumb, 'inclusion', 'available')

else:
rtn["inclusion"] = "automatic"
mdata = metadata.write(mdata, breadcrumb, 'inclusion', 'automatic')

return rtn, mdata
Expand All @@ -78,19 +76,20 @@ def get_activity_type_stream(activity):
mdata = metadata.new()

properties = {
"marketoGUID": {"type": ["null", "string"], "inclusion": "automatic"},
"leadId": {"type": ["null", "integer"], "inclusion": "automatic"},
"activityDate": {"type": ["null", "string"], "format": "date-time", "inclusion": "automatic"},
"activityTypeId": {"type": ["null", "integer"], "inclusion": "automatic"}
"marketoGUID": {"type": ["null", "string"]},
"leadId": {"type": ["null", "integer"]},
"activityDate": {"type": ["null", "string"], "format": "date-time"},
"activityTypeId": {"type": ["null", "integer"]}
}

for prop in properties:
mdata = metadata.write(mdata, ('properties', prop), 'inclusion', 'automatic')

if "primaryAttribute" in activity:
properties["primary_attribute_value"] = {"type": ["null", "string"], "inclusion": "automatic"}
properties["primary_attribute_name"] = {"type": ["null", "string"], "inclusion": "automatic"}
properties["primary_attribute_value_id"] = {"type": ["null", "string"], "inclusion": "automatic"}
properties["primary_attribute_value"] = {"type": ["null", "string"]}
properties["primary_attribute_name"] = {"type": ["null", "string"]}
properties["primary_attribute_value_id"] = {"type": ["null", "string"]}

mdata = metadata.write(mdata, ('properties', "primary_attribute_value"), 'inclusion', 'automatic')
mdata = metadata.write(mdata, ('properties', "primary_attribute_name"), 'inclusion', 'automatic')
mdata = metadata.write(mdata, ('properties', "primary_attribute_value_id"), 'inclusion', 'automatic')
Expand Down Expand Up @@ -120,7 +119,6 @@ def get_activity_type_stream(activity):
"schema": {
"type": "object",
"additionalProperties": False,
"inclusion": "available",
"properties": properties,
},
}
Expand Down Expand Up @@ -166,7 +164,6 @@ def discover_leads(client):
"schema": {
"type": "object",
"additionalProperties": False,
"inclusion": "available",
"properties": properties,
},
}
Expand Down
5 changes: 0 additions & 5 deletions tap_marketo/schemas/activity_types.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@
"schema": {
"type": "object",
"additionalProperties": false,
"inclusion": "automatic",
"properties": {
"id": {
"type": "integer",
"inclusion": "automatic"
},
"name": {
"type": "string",
"inclusion": "automatic"
},
"description": {
"type": ["string", "null"],
"inclusion": "available"
},
"primaryAttribute": {
"type": ["object", "null"],
Expand All @@ -43,7 +39,6 @@
}
}
},
"inclusion": "unsupported"
}
}
}
Expand Down
11 changes: 0 additions & 11 deletions tap_marketo/schemas/campaigns.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,38 @@
"schema": {
"type": "object",
"additionalProperties": false,
"inclusion": "available",
"properties": {
"id": {
"type": "integer",
"inclusion": "automatic"
},
"createdAt": {
"type": "string",
"format": "date-time",
"inclusion": "automatic"
},
"updatedAt": {
"type": "string",
"format": "date-time",
"inclusion": "automatic"
},
"active": {
"type": ["boolean", "null"],
"inclusion": "available"
},
"description": {
"type": ["string", "null"],
"inclusion": "available"
},
"name": {
"type": "string",
"inclusion": "available"
},
"programId": {
"type": ["integer", "null"],
"inclusion": "available"
},
"programName": {
"type": ["string", "null"],
"inclusion": "available"
},
"type": {
"type": "string",
"inclusion": "available"
},
"workspaceName": {
"type": ["string", "null"],
"inclusion": "available"
}
}
}
Expand Down
70 changes: 31 additions & 39 deletions tap_marketo/schemas/lists.json
Original file line number Diff line number Diff line change
@@ -1,42 +1,34 @@
{
"tap_stream_id": "lists",
"stream": "lists",
"key_properties": ["id"],
"schema": {
"type": "object",
"additionalProperties": false,
"inclusion": "available",
"properties": {
"id": {
"type": "integer",
"inclusion": "automatic"
},
"name": {
"type": "string",
"inclusion": "automatic"
},
"createdAt": {
"type": "string",
"format": "date-time",
"inclusion": "automatic"
},
"updatedAt": {
"type": "string",
"format": "date-time",
"inclusion": "automatic"
},
"description": {
"type": ["string", "null"],
"inclusion": "available"
},
"programName": {
"type": ["string", "null"],
"inclusion": "available"
},
"workspaceName": {
"type": ["string", "null"],
"inclusion": "available"
}
}
"tap_stream_id": "lists",
"stream": "lists",
"key_properties": ["id"],
"schema": {
"type": "object",
"additionalProperties": false,
"properties": {
"id": {
"type": "integer",
},
"name": {
"type": "string",
},
"createdAt": {
"type": "string",
"format": "date-time",
},
"updatedAt": {
"type": "string",
"format": "date-time",
},
"description": {
"type": ["string", "null"],
},
"programName": {
"type": ["string", "null"],
},
"workspaceName": {
"type": ["string", "null"],
}
}
}
}
14 changes: 0 additions & 14 deletions tap_marketo/schemas/programs.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,50 @@
"schema": {
"type": "object",
"additionalProperties": false,
"inclusion": "available",
"properties": {
"id": {
"type": "integer",
"inclusion": "automatic"
},
"createdAt": {
"type": "string",
"format": "date-time",
"inclusion": "automatic"
},
"updatedAt": {
"type": "string",
"format": "date-time",
"inclusion": "automatic"
},
"name": {
"type": "string",
"inclusion": "available"
},
"description": {
"type": ["null", "string"],
"inclusion": "available"
},
"url": {
"type": ["null", "string"],
"inclusion": "available"
},
"type": {
"type": ["null", "string"],
"inclusion": "available"
},
"channel": {
"type": ["null", "string"],
"inclusion": "available"
},
"status": {
"type": ["null", "string"],
"inclusion": "available"
},
"workspace": {
"type": ["null", "string"],
"inclusion": "available"
},
"folder": {
"type": "object",
"properties": {
"type": {
"type": ["null", "string"],
"inclusion": "available"
},
"value": {
"type": ["null", "integer"],
"inclusion": "available"
},
"folderName": {
"type": ["null", "string"],
"inclusion": "available"
}
}
}
Expand Down
27 changes: 21 additions & 6 deletions tap_marketo/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,15 @@ def format_value(value, schema):

def format_values(stream, row):
rtn = {}

available_fields = []
for entry in stream['metadata']:
if entry['metadata'].get('selected') or entry['metadata'].get('inclusion') == 'automatic':
available_fields.append(entry['breadcrumb'][-1])

for field, schema in stream["schema"]["properties"].items():
if not schema.get("selected") and not (schema.get("inclusion") == "automatic"):
continue
rtn[field] = format_value(row.get(field), schema)
if field in available_fields:
rtn[field] = format_value(row.get(field), schema)
return rtn


Expand Down Expand Up @@ -158,8 +163,11 @@ def get_or_create_export_for_leads(client, state, stream, export_start):

# Create the new export and store the id and end date in state.
# Does not start the export (must POST to the "enqueue" endpoint).
fields = [f for f, s in stream["schema"]["properties"].items()
if s.get("selected") or (s.get("inclusion") == "automatic")]
fields = []
for entry in stream['metadata']:
if entry['metadata'].get('selected') or entry['metadata'].get('inclusion') == 'automatic':
fields.append(entry['breadcrumb'][-1])

export_id = client.create_export("leads", fields, query)
state = update_state_with_export_info(
state, stream, export_id=export_id, export_end=export_end.isoformat())
Expand Down Expand Up @@ -445,7 +453,14 @@ def sync(client, catalog, config, state):

for stream in catalog["streams"]:
# Skip unselected streams.
if not stream["schema"].get("selected"):
mdata = stream['metadata']

try:
stream_selected = [entry for entry in mdata if entry['breadcrumb'] == []][0]
except:
raise RuntimeError('Bad catalog: Expected metadata entry for stream')

if not (stream_selected and stream_selected['metadata']['selected']):
singer.log_info("%s: not selected", stream["tap_stream_id"])
continue

Expand Down
Loading

0 comments on commit f841e30

Please sign in to comment.