Skip to content

Commit 739ae1a

Browse files
authored
test: add test for adf foreach activity and xml expressions (#54)
* test: add test for adf foreach activity * test: add readme and screenshot of batch_job pipeline for ADF * fix: fixed git log for xml tests
1 parent de61b82 commit 739ae1a

File tree

8 files changed

+388
-4
lines changed

8 files changed

+388
-4
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Batch Job
2+
3+
This pipeline is an example on how a batch job can be triggered from an Azure Data Factory pipeline. It configures a set of variables, create a storage container to be used by the batch job, trigger the job, monitors it, once complete it moves the output files to another storage account and finally deletes the storage container.
4+
5+
![img.png](img.png)
6+
43.5 KB
Loading
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Copy Blobs
2+
3+
This is an example pipeline which intends to list all the blobs in a given container and copies these blobs to another container
4+
5+
<image src="copy_blobs.png"></img>
6+
7+
The pipeline has two activities:
8+
9+
1. **List folders**: Web activity to list all blobs in a container that has a given prefix
10+
2. **For each activity**: Iterates over each item in the list returned above and executes the sub-activity on each item.
11+
12+
2.1. **Copy files to destination**: Copy activity which copies the blobs to a given destination.
22.6 KB
Loading
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
{
2+
"name": "copy_blobs",
3+
"properties": {
4+
"activities": [
5+
{
6+
"name": "List Folders",
7+
"type": "WebActivity",
8+
"dependsOn": [],
9+
"policy": {
10+
"timeout": "0.12:00:00",
11+
"retry": 0,
12+
"retryIntervalInSeconds": 30,
13+
"secureOutput": false,
14+
"secureInput": false
15+
},
16+
"userProperties": [],
17+
"typeProperties": {
18+
"url": {
19+
"value": "@concat('https://',pipeline().globalParameters.SourceStorageAccountName,'.blob.core.windows.net/',pipeline().parameters.SourceContainerName,'?restype=container&comp=list&prefix=',pipeline().parameters.SourceFolderPrefix,'&delimiter=$SourceBlobDelimiter')",
20+
"type": "Expression"
21+
},
22+
"method": "GET",
23+
"headers": {
24+
"x-ms-version": "2023-01-03"
25+
},
26+
"authentication": {
27+
"type": "MSI",
28+
"resource": "https://storage.azure.com"
29+
}
30+
}
31+
},
32+
{
33+
"name": "For Each SourceFolder",
34+
"type": "ForEach",
35+
"dependsOn": [
36+
{
37+
"activity": "List Folders",
38+
"dependencyConditions": [
39+
"Succeeded"
40+
]
41+
}
42+
],
43+
"userProperties": [],
44+
"typeProperties": {
45+
"items": {
46+
"value": "@xpath(xml(activity('List Folders').output.Response),'/EnumerationResults/Blobs/BlobPrefix/Name/text()')",
47+
"type": "Expression"
48+
},
49+
"activities": [
50+
{
51+
"name": "Copy files to Destination",
52+
"type": "Copy",
53+
"dependsOn": [],
54+
"policy": {
55+
"timeout": "0.12:00:00",
56+
"retry": 0,
57+
"retryIntervalInSeconds": 30,
58+
"secureOutput": false,
59+
"secureInput": false
60+
},
61+
"userProperties": [],
62+
"typeProperties": {
63+
"source": {
64+
"type": "BinarySource",
65+
"storeSettings": {
66+
"type": "AzureBlobStorageReadSettings",
67+
"recursive": true,
68+
"wildcardFolderPath": {
69+
"value": "@item()",
70+
"type": "Expression"
71+
},
72+
"deleteFilesAfterCompletion": false
73+
},
74+
"formatSettings": {
75+
"type": "BinaryReadSettings"
76+
}
77+
},
78+
"sink": {
79+
"type": "BinarySink",
80+
"storeSettings": {
81+
"type": "AzureBlobStorageWriteSettings"
82+
}
83+
},
84+
"enableStaging": false
85+
},
86+
"inputs": [
87+
{
88+
"referenceName": "DynamicBlobStorage",
89+
"type": "DatasetReference",
90+
"parameters": {
91+
"ServiceURI": {
92+
"value": "@concat('https://',pipeline().globalParameters.SourceStorageAccountName,'.blob.core.windows.net')",
93+
"type": "Expression"
94+
},
95+
"ContainerName": {
96+
"value": "@pipeline().parameters.SourceContainerName",
97+
"type": "Expression"
98+
},
99+
"FolderName": {
100+
"value": "@coalesce(null)",
101+
"type": "Expression"
102+
}
103+
}
104+
}
105+
],
106+
"outputs": [
107+
{
108+
"referenceName": "DynamicBlobStorage",
109+
"type": "DatasetReference",
110+
"parameters": {
111+
"ServiceURI": {
112+
"value": "@concat('https://',pipeline().parameters.SinkStorageAccountName,'.blob.core.windows.net')",
113+
"type": "Expression"
114+
},
115+
"ContainerName": {
116+
"value": "@pipeline().parameters.SinkContainerName",
117+
"type": "Expression"
118+
},
119+
"FolderName": {
120+
"value": "@pipeline().parameters.SinkFolderName",
121+
"type": "Expression"
122+
}
123+
}
124+
}
125+
]
126+
}
127+
]
128+
}
129+
}
130+
],
131+
"parameters": {
132+
"SourceContainerName": {
133+
"type": "string"
134+
},
135+
"SourceFolderPrefix": {
136+
"type": "string"
137+
},
138+
"SinkStorageAccountName": {
139+
"type": "string"
140+
},
141+
"SinkContainerName": {
142+
"type": "string"
143+
},
144+
"SinkFolderName": {
145+
"type": "string"
146+
}
147+
},
148+
"folder": {
149+
"name": "batch"
150+
},
151+
"annotations": []
152+
}
153+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import pytest
2+
from data_factory_testing_framework.state import (
3+
RunParameter,
4+
RunParameterType,
5+
)
6+
from data_factory_testing_framework.state.dependency_condition import DependencyCondition
7+
from data_factory_testing_framework.test_framework import TestFramework, TestFrameworkType
8+
9+
10+
def test_copy_blobs_pipeline(request: pytest.FixtureRequest) -> None:
11+
# Arrange
12+
test_framework = TestFramework(
13+
framework_type=TestFrameworkType.DataFactory, root_folder_path=request.fspath.dirname
14+
)
15+
pipeline = test_framework.repository.get_pipeline_by_name("copy_blobs")
16+
17+
# Act
18+
activities = test_framework.evaluate_pipeline(
19+
pipeline=pipeline,
20+
parameters=[
21+
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorageaccount"),
22+
RunParameter(RunParameterType.Pipeline, "SourceContainerName", "sourcecontainer"),
23+
RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "sourcefolder"),
24+
RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorageaccount"),
25+
RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"),
26+
RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"),
27+
],
28+
)
29+
30+
# Assert
31+
list_folder_activity = next(activities)
32+
assert list_folder_activity.name == "List Folders"
33+
assert (
34+
list_folder_activity.type_properties["url"].value
35+
== "https://sourcestorageaccount.blob.core.windows.net/sourcecontainer?restype=container&comp=list&prefix=sourcefolder&delimiter=$SourceBlobDelimiter"
36+
)
37+
assert list_folder_activity.type_properties["method"] == "GET"
38+
list_folder_activity.set_result(
39+
result=DependencyCondition.SUCCEEDED,
40+
output={
41+
"Response": """
42+
<EnumerationResults ServiceEndpoint="http://myaccount.blob.core.windows.net/" ContainerName="mycontainer">
43+
<Prefix>testfolder</Prefix>
44+
<Delimiter>$SourceBlobDelimiter</Delimiter>
45+
<Blobs>
46+
<BlobPrefix>
47+
<Name>testfolder_1/$SourceBlobDelimiter</Name>
48+
</BlobPrefix>
49+
<BlobPrefix>
50+
<Name>testfolder_2/$SourceBlobDelimiter</Name>
51+
</BlobPrefix>
52+
</Blobs>
53+
</EnumerationResults>
54+
"""
55+
},
56+
)
57+
58+
copy_activity = next(activities)
59+
60+
assert copy_activity.name == "Copy files to Destination"
61+
assert copy_activity.type == "Copy"
62+
assert (
63+
copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].value
64+
== "testfolder_1/$SourceBlobDelimiter"
65+
)
66+
67+
copy_activity = next(activities)
68+
assert copy_activity.name == "Copy files to Destination"
69+
assert copy_activity.type == "Copy"
70+
assert (
71+
copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].value
72+
== "testfolder_2/$SourceBlobDelimiter"
73+
)
74+
75+
pytest.raises(StopIteration, lambda: next(activities))
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import pytest
2+
from data_factory_testing_framework.models.activities.activity import Activity
3+
from data_factory_testing_framework.models.activities.for_each_activity import ForEachActivity
4+
from data_factory_testing_framework.models.pipeline import Pipeline
5+
from data_factory_testing_framework.state import (
6+
PipelineRunState,
7+
PipelineRunVariable,
8+
RunParameter,
9+
RunParameterType,
10+
)
11+
from data_factory_testing_framework.test_framework import TestFramework, TestFrameworkType
12+
13+
14+
@pytest.fixture
15+
def test_framework(request: pytest.FixtureRequest) -> TestFramework:
16+
return TestFramework(
17+
framework_type=TestFrameworkType.DataFactory,
18+
root_folder_path=request.fspath.dirname,
19+
)
20+
21+
22+
@pytest.fixture
23+
def pipeline(test_framework: TestFramework) -> Pipeline:
24+
return test_framework.repository.get_pipeline_by_name("copy_blobs")
25+
26+
27+
def test_list_blobs(pipeline: Pipeline) -> None:
28+
# Arrange
29+
activity = pipeline.get_activity_by_name("List Folders")
30+
state = PipelineRunState(
31+
variables=[
32+
PipelineRunVariable(name="SourceContainerName", default_value="source"),
33+
],
34+
parameters=[
35+
RunParameter[str](RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
36+
RunParameter[str](
37+
RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba"
38+
),
39+
RunParameter[str](RunParameterType.Pipeline, "SourceFolderPrefix", "testfolder"),
40+
],
41+
)
42+
43+
# Act
44+
activity.evaluate(state)
45+
46+
# Assert
47+
assert activity.name == "List Folders"
48+
assert (
49+
activity.type_properties["url"].value
50+
== "https://sourcestorage.blob.core.windows.net/container-8b6b545b-c583-4a06-adf7-19ff41370aba?restype=container&comp=list&prefix=testfolder&delimiter=$SourceBlobDelimiter"
51+
)
52+
assert activity.type_properties["method"] == "GET"
53+
54+
55+
def test_for_each(pipeline: Pipeline) -> None:
56+
# Arrange
57+
activity = pipeline.get_activity_by_name("For Each SourceFolder")
58+
state = PipelineRunState(
59+
variables=[
60+
PipelineRunVariable(name="SourceContainerName", default_value="source"),
61+
],
62+
parameters=[
63+
RunParameter[str](RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
64+
RunParameter[str](
65+
RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba"
66+
),
67+
RunParameter[str](RunParameterType.Pipeline, "SourceFolderPrefix", "testfolder"),
68+
],
69+
)
70+
state.add_activity_result(
71+
activity_name="List Folders",
72+
status="Succeeded",
73+
output={
74+
"Response": """
75+
<EnumerationResults ServiceEndpoint="http://myaccount.blob.core.windows.net/" ContainerName="mycontainer">
76+
<Prefix>testfolder</Prefix>
77+
<Delimiter>$SourceBlobDelimiter</Delimiter>
78+
<Blobs>
79+
<BlobPrefix>
80+
<Name>testfolder_1/$SourceBlobDelimiter</Name>
81+
</BlobPrefix>
82+
<BlobPrefix>
83+
<Name>testfolder_2/$SourceBlobDelimiter</Name>
84+
</BlobPrefix>
85+
</Blobs>
86+
</EnumerationResults>
87+
"""
88+
},
89+
)
90+
91+
# Act
92+
activity.evaluate(state)
93+
94+
# Assert
95+
assert activity.name == "For Each SourceFolder"
96+
assert activity.type_properties["items"].value == [
97+
"testfolder_1/$SourceBlobDelimiter",
98+
"testfolder_2/$SourceBlobDelimiter",
99+
]
100+
assert len(activity.type_properties["activities"]) == 1
101+
assert activity.type_properties["activities"][0]["name"] == "Copy files to Destination"
102+
assert activity.type_properties["activities"][0]["type"] == "Copy"
103+
104+
105+
def _get_child_activity_by_name(foreach_activity: ForEachActivity, name: str) -> Activity:
106+
return next(activity for activity in foreach_activity.activities if activity.name == name)
107+
108+
109+
@pytest.mark.parametrize(
110+
"wildcardfolderpath",
111+
[
112+
("testfolder_1/$SourceBlobDelimiter"),
113+
("testfolder_2/$SourceBlobDelimiter"),
114+
],
115+
)
116+
def test_copy_blobs_activity(pipeline: Pipeline, wildcardfolderpath: str) -> None:
117+
# Arrange
118+
foreach_activity = pipeline.get_activity_by_name("For Each SourceFolder")
119+
activity = _get_child_activity_by_name(foreach_activity, "Copy files to Destination")
120+
state = PipelineRunState(
121+
parameters=[
122+
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
123+
RunParameter(
124+
RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba"
125+
),
126+
RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorage"),
127+
RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"),
128+
RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"),
129+
],
130+
iteration_item=wildcardfolderpath,
131+
)
132+
133+
# Act
134+
activity.evaluate(state)
135+
136+
# Assert
137+
assert activity.name == "Copy files to Destination"
138+
assert activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].value == wildcardfolderpath

0 commit comments

Comments
 (0)