Skip to content

Commit 038dd4f

Browse files
committed
code to be fixed to work for zendesk source
1 parent 863a397 commit 038dd4f

File tree

3 files changed

+403
-138
lines changed

3 files changed

+403
-138
lines changed

docs/examples/docs/examples/qdrant_zendeskdocs/examples/qdrant_zendesk/code/qdrant.py

Lines changed: 124 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,151 @@
22
from dlt.destinations.qdrant import qdrant_adapter
33
from qdrant_client import QdrantClient
44

5-
from zendesk import zendesk_support
5+
# function source: https://dlthub.com/docs/examples/incremental_loading/#loading-code
6+
@dlt.source(max_table_nesting=2)
7+
def zendesk_support(
8+
credentials: Dict[str, str] = dlt.secrets.value,
9+
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
10+
end_date: Optional[TAnyDateTime] = None,
11+
):
12+
"""
13+
Retrieves data from Zendesk Support for tickets events.
614
7-
def main():
8-
# 1. Create a pipeline
15+
Args:
16+
credentials: Zendesk credentials (default: dlt.secrets.value)
17+
start_date: Start date for data extraction (default: 2000-01-01)
18+
end_date: End date for data extraction (default: None).
19+
If end time is not provided, the incremental loading will be
20+
enabled, and after the initial run, only new data will be retrieved.
21+
22+
Returns:
23+
DltResource.
24+
"""
25+
# Convert start_date and end_date to Pendulum datetime objects
26+
start_date_obj = ensure_pendulum_datetime(start_date)
27+
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None
28+
29+
# Convert Pendulum datetime objects to Unix timestamps
30+
start_date_ts = start_date_obj.int_timestamp
31+
end_date_ts: Optional[int] = None
32+
if end_date_obj:
33+
end_date_ts = end_date_obj.int_timestamp
34+
35+
# Extract credentials from secrets dictionary
36+
auth = (credentials["email"], credentials["password"])
37+
subdomain = credentials["subdomain"]
38+
url = f"https://{subdomain}.zendesk.com"
39+
40+
# we use `append` write disposition, because objects in ticket_events endpoint are never updated
41+
# so we do not need to merge
42+
# we set primary_key so allow deduplication of events by the `incremental` below in the rare case
43+
# when two events have the same timestamp
44+
@dlt.resource(primary_key="id", write_disposition="append")
45+
def ticket_events(
46+
timestamp: dlt.sources.incremental[int] = dlt.sources.incremental(
47+
"timestamp",
48+
initial_value=start_date_ts,
49+
end_value=end_date_ts,
50+
allow_external_schedulers=True,
51+
),
52+
):
53+
# URL For ticket events
54+
# 'https://d3v-dlthub.zendesk.com/api/v2/incremental/ticket_events.json?start_time=946684800'
55+
event_pages = get_pages(
56+
url=url,
57+
endpoint="/api/v2/incremental/ticket_events.json",
58+
auth=auth,
59+
data_point_name="ticket_events",
60+
params={"start_time": timestamp.last_value},
61+
)
62+
for page in event_pages:
63+
yield page
64+
# stop loading when using end_value and end is reached.
65+
# unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves
66+
if timestamp.end_out_of_range:
67+
return
68+
69+
return ticket_events
70+
71+
def get_pages(
72+
url: str,
73+
endpoint: str,
74+
auth: Tuple[str, str],
75+
data_point_name: str,
76+
params: Optional[Dict[str, Any]] = None,
77+
):
78+
"""
79+
Makes a request to a paginated endpoint and returns a generator of data items per page.
80+
81+
Args:
82+
url: The base URL.
83+
endpoint: The url to the endpoint, e.g. /api/v2/calls
84+
auth: Credentials for authentication.
85+
data_point_name: The key which data items are nested under in the response object (e.g. calls)
86+
params: Optional dict of query params to include in the request.
87+
88+
Returns:
89+
Generator of pages, each page is a list of dict data items.
90+
"""
91+
# update the page size to enable cursor pagination
92+
params = params or {}
93+
params["per_page"] = 1000
94+
headers = None
95+
96+
# make request and keep looping until there is no next page
97+
get_url = f"{url}{endpoint}"
98+
while get_url:
99+
response = client.get(
100+
get_url, headers=headers, auth=auth, params=params
101+
)
102+
response.raise_for_status()
103+
response_json = response.json()
104+
result = response_json[data_point_name]
105+
yield result
106+
107+
get_url = None
108+
# See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format
109+
if not response_json["end_of_stream"]:
110+
get_url = response_json["next_page"]
111+
112+
113+
if __name__ == "__main__":
114+
115+
# create a pipeline with an appropriate name
9116
pipeline = dlt.pipeline(
10117
pipeline_name="qdrant_zendesk_pipeline",
11118
destination="qdrant",
12-
dataset_name="zendesk_data_tickets",
119+
dataset_name="zendesk_data",
13120
)
14121

15-
# 2. Initialize Zendesk source to get the ticket data
16-
zendesk_source = zendesk_support(load_all=False)
17-
tickets = zendesk_source.tickets
18-
19-
# 3. Run the dlt pipeline
20-
info = pipeline.run(
21-
# 4. Here we use a special function to tell Qdrant
22-
# which fields to embed
23-
qdrant_adapter(
24-
tickets,
25-
embed=["subject", "description"],
122+
# run the dlt pipeline and save info about the load process
123+
load_info = pipeline.run(
124+
# here we use a special function to tell Qdrant which fields to embed
125+
qdrant_adapter(
126+
zendesk_support(), # retrieve tickets data
127+
embed=["subject", "description"],
26128
)
27129
)
28130

29-
return info
30-
31-
if __name__ == "__main__":
32-
load_info = main()
33131
print(load_info)
34132

35133

36-
# running the Qdrant client
134+
# running the Qdrant client to connect to your Qdrant database
37135
qdrant_client = QdrantClient(
38136
url="https://your-qdrant-url",
39137
api_key="your-qdrant-api-key",
40138
)
41139

42-
# view Qdrant collections
140+
# view Qdrant collections you'll find your dataset here:
43141
print(qdrant_client.get_collections())
44142

143+
# query Qdrant with appropriate prompt
45144
response = qdrant_client.query(
46-
"zendesk_data_tickets", # collection/dataset name
145+
"zendesk_data_tickets", # collection/dataset name with the 'tickets' suffix -> tickets table
47146
query_text=["cancel", "cancel subscription"], # prompt to search
48147
limit=3 # limit the number of results to the nearest 3 embeddings
49148
)
50149

51-
print(response)
150+
print(response)
151+
152+
assert len(response) <= 3

docs/website/docs/examples/qdrant_zendesk/code/qdrant-snippets.py

Lines changed: 139 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,59 +2,165 @@
22

33
def qdrant_snippet():
44
# @@@DLT_SNIPPET_START example
5-
# @@@DLT_SNIPPET_START main_code
5+
# @@@DLT_SNIPPET_START zendesk_conn
6+
from typing import Iterator, Optional, Dict, Any, Tuple
7+
68
import dlt
9+
from dlt.common import pendulum
10+
from dlt.common.time import ensure_pendulum_datetime
11+
from dlt.common.typing import TAnyDateTime
12+
from dlt.sources.helpers.requests import client
713
from dlt.destinations.qdrant import qdrant_adapter
14+
815
from qdrant_client import QdrantClient
916

10-
from zendesk import zendesk_support
17+
# function source: https://dlthub.com/docs/examples/incremental_loading/#loading-code
18+
@dlt.source(max_table_nesting=2)
19+
def zendesk_support(
20+
credentials: Dict[str, str] = dlt.secrets.value,
21+
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
22+
end_date: Optional[TAnyDateTime] = None,
23+
):
24+
"""
25+
Retrieves data from Zendesk Support for tickets events.
26+
27+
Args:
28+
credentials: Zendesk credentials (default: dlt.secrets.value)
29+
start_date: Start date for data extraction (default: 2000-01-01)
30+
end_date: End date for data extraction (default: None).
31+
If end time is not provided, the incremental loading will be
32+
enabled, and after the initial run, only new data will be retrieved.
33+
34+
Returns:
35+
DltResource.
36+
"""
37+
# Convert start_date and end_date to Pendulum datetime objects
38+
start_date_obj = ensure_pendulum_datetime(start_date)
39+
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None
40+
41+
# Convert Pendulum datetime objects to Unix timestamps
42+
start_date_ts = start_date_obj.int_timestamp
43+
end_date_ts: Optional[int] = None
44+
if end_date_obj:
45+
end_date_ts = end_date_obj.int_timestamp
46+
47+
# Extract credentials from secrets dictionary
48+
auth = (credentials["email"], credentials["password"])
49+
subdomain = credentials["subdomain"]
50+
url = f"https://{subdomain}.zendesk.com"
51+
52+
# we use `append` write disposition, because objects in ticket_events endpoint are never updated
53+
# so we do not need to merge
54+
# we set primary_key so allow deduplication of events by the `incremental` below in the rare case
55+
# when two events have the same timestamp
56+
@dlt.resource(primary_key="id", write_disposition="append")
57+
def ticket_events(
58+
timestamp: dlt.sources.incremental[int] = dlt.sources.incremental(
59+
"timestamp",
60+
initial_value=start_date_ts,
61+
end_value=end_date_ts,
62+
allow_external_schedulers=True,
63+
),
64+
):
65+
# URL For ticket events
66+
# 'https://d3v-dlthub.zendesk.com/api/v2/incremental/ticket_events.json?start_time=946684800'
67+
event_pages = get_pages(
68+
url=url,
69+
endpoint="/api/v2/incremental/ticket_events.json",
70+
auth=auth,
71+
data_point_name="ticket_events",
72+
params={"start_time": timestamp.last_value},
73+
)
74+
for page in event_pages:
75+
yield page
76+
# stop loading when using end_value and end is reached.
77+
# unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves
78+
if timestamp.end_out_of_range:
79+
return
80+
81+
return ticket_events
82+
83+
def get_pages(
84+
url: str,
85+
endpoint: str,
86+
auth: Tuple[str, str],
87+
data_point_name: str,
88+
params: Optional[Dict[str, Any]] = None,
89+
):
90+
"""
91+
Makes a request to a paginated endpoint and returns a generator of data items per page.
92+
93+
Args:
94+
url: The base URL.
95+
endpoint: The url to the endpoint, e.g. /api/v2/calls
96+
auth: Credentials for authentication.
97+
data_point_name: The key which data items are nested under in the response object (e.g. calls)
98+
params: Optional dict of query params to include in the request.
99+
100+
Returns:
101+
Generator of pages, each page is a list of dict data items.
102+
"""
103+
# update the page size to enable cursor pagination
104+
params = params or {}
105+
params["per_page"] = 1000
106+
headers = None
11107

12-
def main():
13-
# 1. Create a pipeline
108+
# make request and keep looping until there is no next page
109+
get_url = f"{url}{endpoint}"
110+
while get_url:
111+
response = client.get(
112+
get_url, headers=headers, auth=auth, params=params
113+
)
114+
response.raise_for_status()
115+
response_json = response.json()
116+
result = response_json[data_point_name]
117+
yield result
118+
119+
get_url = None
120+
# See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format
121+
if not response_json["end_of_stream"]:
122+
get_url = response_json["next_page"]
123+
124+
# @@@DLT_SNIPPET_END zendesk_conn
125+
126+
if __name__ == "__main__":
127+
# @@@DLT_SNIPPET_START main_code
128+
129+
# create a pipeline with an appropriate name
14130
pipeline = dlt.pipeline(
15-
pipeline_name="qdrant_zendesk_pipeline",
131+
pipeline_name="qdrant_zendesk_pipeline_ATT",
16132
destination="qdrant",
17-
dataset_name="zendesk_data_tickets",
133+
dataset_name="zendesk_data",
18134
)
19135

20-
# 2. Initialize Zendesk source to get the ticket data
21-
zendesk_source = zendesk_support(load_all=False)
22-
tickets = zendesk_source.tickets
23-
24-
# 3. Run the dlt pipeline
25-
info = pipeline.run(
26-
# 4. Here we use a special function to tell Qdrant
27-
# which fields to embed
28-
qdrant_adapter(
29-
tickets,
30-
embed=["subject", "description"],
136+
# run the dlt pipeline and save info about the load process
137+
load_info = pipeline.run(
138+
# here we use a special function to tell Qdrant which fields to embed
139+
qdrant_adapter(
140+
zendesk_support(), # retrieve tickets data
141+
embed=["subject", "description"],
31142
)
32143
)
33144

34-
return info
35-
36-
if __name__ == "__main__":
37-
load_info = main()
38145
print(load_info)
39146

40147
# @@@DLT_SNIPPET_END main_code
41148

42149
# @@@DLT_SNIPPET_START declare_qdrant_client
43-
# running the Qdrant client
150+
# running the Qdrant client to connect to your Qdrant database
44151
qdrant_client = QdrantClient(
45-
url="https://your-qdrant-url",
46-
api_key="your-qdrant-api-key",
152+
url="https://5708cdff-94ce-4e2d-bc41-2dbf4d281244.europe-west3-0.gcp.cloud.qdrant.io",
153+
api_key="UtTVT2g5yYVj5syiYeEqm41Z90dE0B2c6CQs-GOP4bTOnj2IUZkdog",
47154
)
48-
# @@@DLT_SNIPPET_END declare_qdrant_client
49155

50-
# @@@DLT_SNIPPET_START view_collections
51-
# view Qdrant collections
156+
# view Qdrant collections you'll find your dataset here:
52157
print(qdrant_client.get_collections())
53-
# @@@DLT_SNIPPET_END view_collections
158+
# @@@DLT_SNIPPET_END declare_qdrant_client
54159

55160
# @@@DLT_SNIPPET_START get_response
161+
# query Qdrant with appropriate prompt
56162
response = qdrant_client.query(
57-
"zendesk_data_tickets", # collection/dataset name
163+
"zendesk_data_tickets", # collection/dataset name with the 'tickets' suffix -> tickets table
58164
query_text=["cancel", "cancel subscription"], # prompt to search
59165
limit=3 # limit the number of results to the nearest 3 embeddings
60166
)
@@ -63,6 +169,8 @@ def main():
63169
print(response)
64170

65171
# @@@DLT_REMOVE
66-
assert len(response) == 3
172+
assert len(response) <= 3
173+
174+
# @@@DLT_SNIPPET_END example
67175

68-
# @@@DLT_SNIPPET_END example
176+
qdrant_snippet()

0 commit comments

Comments
 (0)