Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): SageMaker jobs and models #2830

Merged
merged 71 commits into from
Jul 8, 2021
Merged
Changes from 1 commit
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
5fbcd31
Add get_model utils
kevinhu Jun 30, 2021
0c5aee0
Add job listing commands
kevinhu Jun 30, 2021
459a5fc
Init sagemaker processors
kevinhu Jun 30, 2021
98ca20b
Add process_training_job
kevinhu Jun 30, 2021
df90714
Add tuning jobs
kevinhu Jun 30, 2021
6eea1b3
Create SageMakerJob intermediate
kevinhu Jun 30, 2021
bf49145
Reorganize URN generators
kevinhu Jun 30, 2021
8763515
Construct arn-name translator
kevinhu Jul 1, 2021
233c00f
Refactor SageMaker job processors into class
kevinhu Jul 1, 2021
a089b2d
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 1, 2021
4cb0e5a
Switch to tuple-indexed ARNs
kevinhu Jul 1, 2021
02d3437
Add input/outputs for process_transform_job
kevinhu Jul 1, 2021
6807382
Comment out unsupported aspects
kevinhu Jul 1, 2021
c282a6b
process_labeling_job datasets
kevinhu Jul 1, 2021
b05e66b
process_training_job datasets
kevinhu Jul 1, 2021
a51b79f
Init status enums
kevinhu Jul 1, 2021
386f46d
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 1, 2021
d003991
Update models
kevinhu Jul 1, 2021
925b72f
Add unknown status enum
kevinhu Jul 1, 2021
854bf4b
Revise source report
kevinhu Jul 1, 2021
82d4425
Init job stubs
kevinhu Jul 2, 2021
b99025f
Init model stubs
kevinhu Jul 2, 2021
abaab8b
Add list-models stub
kevinhu Jul 2, 2021
5abd576
Set names and ARNs for job stubs
kevinhu Jul 2, 2021
77e9b9e
Add list-jobs stubs
kevinhu Jul 2, 2021
67442aa
Refactor stubbed job names and arns
kevinhu Jul 2, 2021
ad02711
Refactor job types
kevinhu Jul 2, 2021
8cd750e
Refactor job MCE constructor
kevinhu Jul 2, 2021
2cd0b85
Add feast and sagemaker dataplatforms
kevinhu Jul 2, 2021
77209d8
Furnish S3 paths in stubs
kevinhu Jul 2, 2021
bf63192
Add stubber responses
kevinhu Jul 2, 2021
856b251
Refactor and fix stub validation errors
kevinhu Jul 2, 2021
f570003
Setup model stubs
kevinhu Jul 2, 2021
a59ab89
Refactor feature group yielding
kevinhu Jul 2, 2021
2432357
Set up model ingestion
kevinhu Jul 2, 2021
1502e01
Parse in model creation times
kevinhu Jul 2, 2021
996c818
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 2, 2021
bfb46eb
Regenerate snapshots
kevinhu Jul 2, 2021
5449c7e
Move custom properties
kevinhu Jul 2, 2021
44b18fc
Generate model custom properties
kevinhu Jul 2, 2021
0fc259d
Fix job stubbing order
kevinhu Jul 2, 2021
f72d776
Working jobs ingestion
kevinhu Jul 2, 2021
7f30e86
Add custom properties
kevinhu Jul 2, 2021
3559015
Switch to sets for i/o jobs
kevinhu Jul 2, 2021
d204b76
Ingest input jobs
kevinhu Jul 2, 2021
ef8568b
Add job filtering options
kevinhu Jul 3, 2021
0ed1d73
Fix jobs filter and sort datasets
kevinhu Jul 3, 2021
e4797d0
Ingest datasets
kevinhu Jul 3, 2021
884631c
Ingest custom dataset properties
kevinhu Jul 3, 2021
d193e67
Typo fixes
kevinhu Jul 3, 2021
75f174a
Refactor reports
kevinhu Jul 6, 2021
8def468
Refactor out s3 URN constructor
kevinhu Jul 6, 2021
cba59c2
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 6, 2021
27e921a
Remove unused
kevinhu Jul 6, 2021
ddb696a
Add env to models
kevinhu Jul 6, 2021
23c7541
Add umbrella flow and fix job envs
kevinhu Jul 6, 2021
bc80bff
Fix edge packaging stub
kevinhu Jul 6, 2021
24ab430
Set model sort order
kevinhu Jul 6, 2021
571e2b6
Comments for jobs
kevinhu Jul 6, 2021
e10592d
Fix time zones in stubs
kevinhu Jul 6, 2021
3584cfc
Create dataflow for each job
kevinhu Jul 6, 2021
4c44910
Set flows and migrate from azkaban enum
kevinhu Jul 7, 2021
8018022
Update rest sink test
kevinhu Jul 7, 2021
6675dd5
Set browse paths
kevinhu Jul 7, 2021
8e8eecd
Revert file to rest recipe
kevinhu Jul 7, 2021
d3b2e17
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 7, 2021
5331ba0
Browse paths for feature tables
kevinhu Jul 8, 2021
83fb6fd
Refactor make_s3_urn to aws_common
kevinhu Jul 8, 2021
76c7c7e
Add comment for deprecated azkaban types
kevinhu Jul 8, 2021
df56a61
Resolve merge conflict
kevinhu Jul 8, 2021
f98553b
Update schema_classes
kevinhu Jul 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Comments for jobs
  • Loading branch information
kevinhu committed Jul 6, 2021
commit 571e2b6a6756efcb7a633ff09c2e0007c2000943
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
DataJobInfoClass,
DataFlowInfoClass,
DataFlowSnapshotClass,
DataJobInfoClass,
DataJobInputOutputClass,
DataJobSnapshotClass,
DatasetPropertiesClass,
Expand All @@ -19,18 +19,31 @@

@dataclass
class SageMakerJobType:
# boto3 command to get list of jobs
list_command: str
# field in job listing response containing actual list
list_key: str
# field in job listing response element corresponding to job name
list_name_key: str
# field in job listing response element corresponding to job ARN
list_arn_key: str

# boto3 command to get job details
describe_command: str
# field in job description response corresponding to job name
describe_name_key: str
# field in job description response corresponding to job ARN
describe_arn_key: str
# field in job description response corresponding to job status
describe_status_key: str
# job-specific mapping from boto3 status strings to DataHub-native enum
status_map: Dict[str, str]

# name of function for processing job for ingestion
processor: str


# map from SageMaker job code to metadata on API access commands, fields, and processors
SAGEMAKER_JOB_TYPES = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! this is a pretty clean solution, all things considered

"auto_ml": SageMakerJobType(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_auto_ml_jobs
Expand Down Expand Up @@ -212,6 +225,12 @@ def make_sagemaker_job_urn(arn: str, env: str) -> str:

@dataclass
class SageMakerJob:
"""
Intermediate job representation for storing result of initial ingestion from raw API response.

Produced by first-pass ingestion and basis for subsequent extraction.
"""

job_snapshot: DataJobSnapshotClass
input_datasets: Dict[str, Dict[str, Any]] = field(default_factory=dict)
output_datasets: Dict[str, Dict[str, Any]] = field(default_factory=dict)
Expand All @@ -222,10 +241,18 @@ class SageMakerJob:

@dataclass
class JobProcessor:
"""
Job ingestion module, called by top-level SageMaker ingestion handler.
"""

# boto3 SageMaker client
sagemaker_client: Any
env: str
report: SagemakerSourceReport
# config filter for specific job types to ingest (see metadata-ingestion README)
job_type_filter: Union[Dict[str, str], bool, None]

# translators between ARNs and job names (represented as tuples of (job_type, job_name))
arn_to_name: Dict[str, Tuple[str, str]] = field(default_factory=dict)
name_to_arn: Dict[Tuple[str, str], str] = field(default_factory=dict)

Expand Down Expand Up @@ -275,9 +302,13 @@ def get_all_jobs(

return jobs

def get_job_details(
self, job_name: str, describe_command: str, describe_name_key: str
) -> Dict[str, Any]:
def get_job_details(self, job_name: str, job_type: str) -> Dict[str, Any]:
"""
Get boto3 describe_<job> response
"""

describe_command = SAGEMAKER_JOB_TYPES[job_type].describe_command
describe_name_key = SAGEMAKER_JOB_TYPES[job_type].describe_name_key

return getattr(self.sagemaker_client, describe_command)(
**{describe_name_key: job_name}
Expand Down Expand Up @@ -327,11 +358,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
job_type = SAGEMAKER_JOB_TYPES[job["type"]]
job_name = job[job_type.list_name_key]

job_details = self.get_job_details(
job_name,
job_type.describe_command,
job_type.describe_name_key,
)
job_details = self.get_job_details(job_name, job["type"])

processed_job = getattr(self, job_type.processor)(job_details)
processed_jobs[processed_job.job_snapshot.urn] = processed_job
Expand Down Expand Up @@ -400,6 +427,9 @@ def create_common_job_snapshot(
job: Dict[str, Any],
job_type: str,
) -> DataJobSnapshotClass:
"""
General function for generating a job snapshot.
"""

job_type_info = SAGEMAKER_JOB_TYPES[job_type]

Expand Down Expand Up @@ -563,7 +593,7 @@ def process_edge_packaging_job(
"uri": output_s3_uri,
}

# "The name of the SageMaker Neo compilation job that is used to locate model artifacts that are being packaged."
# from docs: "The name of the SageMaker Neo compilation job that is used to locate model artifacts that are being packaged."
compilation_job_name: Optional[str] = job.get("CompilationJobName")

output_jobs = set()
Expand All @@ -584,7 +614,7 @@ def process_edge_packaging_job(
f"Unable to find ARN for compilation job {compilation_job_name} produced by edge packaging job {arn}",
)

# TODO: see if we can link models here
# TODO: see if we can link models here (will require adding some aspect to either jobs or models)
# model: Optional[str] = job.get("ModelName")
# model_version: Optional[str] = job.get("ModelVersion")

Expand Down Expand Up @@ -850,6 +880,7 @@ def process_training_job(self, job: Dict[str, Any]) -> SageMakerJob:

output_datasets = {}

# process all output datasets at once
for output_s3_uri in [
output_s3_uri,
checkpoint_s3_uri,
Expand Down