-
Couldn't load subscription status.
- Fork 354
Docs/website/docs/examples/qdrant eg #775
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
ad36202
qdrant zendesk example
hibajamal 18e8bb5
pushing after rebase
hibajamal 98c2fc3
edded all code snippets and sidebars.js
hibajamal 4506e9b
added all code snippets and sidebars.js
hibajamal 3eeb33b
added test and few code explanations
hibajamal 9573291
code to be fixed to work for zendesk source
hibajamal 64afe34
fixed code and qdrant creds
hibajamal 6aa5a68
changes incorporated
hibajamal 4afdc17
return back article
hibajamal 218cff7
return package.json
hibajamal 43a0a5a
fixing destinations in all examples
hibajamal 02421a9
fix __name__ with _remove
hibajamal 02b644a
add dependencies in index.md
hibajamal 185f460
add dependencies in pyproject.toml
hibajamal ffea520
add generated files
AstrakhantsevaAA 2f9a8e9
pin python version, and reduce on top version from 3.13 to 3.12, upda…
AstrakhantsevaAA a8f64af
revert check-package.sh
AstrakhantsevaAA cfd2fac
revert poetry-deps.sh
AstrakhantsevaAA 13a99f7
removed unused variables
hibajamal bc268f9
regenerate script example
AstrakhantsevaAA 1f0674a
generate script example for connector_x_arrow, small fixes in code an…
AstrakhantsevaAA c0131e1
added env variable for qdrant
hibajamal 9f54373
fix deps and secrets.toml's
AstrakhantsevaAA 0c89555
fix creds for qdrant
AstrakhantsevaAA cd3f385
fix creds for zendesk
AstrakhantsevaAA a1d3a28
fix source name for zendesk
AstrakhantsevaAA 890164a
fix qdrant creds path
AstrakhantsevaAA e874cff
revert stupid bash scripts
AstrakhantsevaAA a20af27
no updated lock file
AstrakhantsevaAA File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| import connectorx as cx | ||
|
|
||
| import dlt | ||
| from dlt.sources.credentials import ConnectionStringCredentials | ||
|
|
||
| def read_sql_x( | ||
| conn_str: ConnectionStringCredentials = dlt.secrets.value, | ||
| query: str = dlt.config.value, | ||
| ): | ||
| yield cx.read_sql( | ||
| conn_str.to_native_representation(), | ||
| query, | ||
| return_type="arrow2", | ||
| protocol="binary", | ||
| ) | ||
|
|
||
| def genome_resource(): | ||
| # create genome resource with merge on `upid` primary key | ||
| genome = dlt.resource( | ||
| name="genome", | ||
| write_disposition="merge", | ||
| primary_key="upid", | ||
| standalone=True, | ||
| )(read_sql_x)( | ||
| "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] | ||
| "SELECT * FROM genome ORDER BY created LIMIT 1000", | ||
| ) | ||
| # add incremental on created at | ||
| genome.apply_hints(incremental=dlt.sources.incremental("created")) | ||
| return genome | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| pipeline = dlt.pipeline(destination="duckdb") | ||
| genome = genome_resource() | ||
|
|
||
| print(pipeline.run(genome)) | ||
| print(pipeline.last_trace.last_normalize_info) | ||
| # NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading | ||
File renamed without changes.
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| [destination.qdrant.credentials] | ||
| location = "" | ||
| api_key = "" | ||
|
|
||
| [sources.zendesk.credentials] | ||
| password = "" | ||
| subdomain = "" | ||
| email = "" |
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| from typing import Optional, Dict, Any, Tuple | ||
|
|
||
| import dlt | ||
| from dlt.common import pendulum | ||
| from dlt.common.time import ensure_pendulum_datetime | ||
| from dlt.common.typing import TAnyDateTime | ||
| from dlt.sources.helpers.requests import client | ||
| from dlt.destinations.qdrant import qdrant_adapter | ||
| from qdrant_client import QdrantClient | ||
|
|
||
| from dlt.common.configuration.inject import with_config | ||
|
|
||
| # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk | ||
| @dlt.source(max_table_nesting=2) | ||
| def zendesk_support( | ||
| credentials: Dict[str, str] = dlt.secrets.value, | ||
| start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 | ||
| end_date: Optional[TAnyDateTime] = None, | ||
| ): | ||
| """ | ||
| Retrieves data from Zendesk Support for tickets events. | ||
|
|
||
| Args: | ||
| credentials: Zendesk credentials (default: dlt.secrets.value) | ||
| start_date: Start date for data extraction (default: 2000-01-01) | ||
| end_date: End date for data extraction (default: None). | ||
| If end time is not provided, the incremental loading will be | ||
| enabled, and after the initial run, only new data will be retrieved. | ||
|
|
||
| Returns: | ||
| DltResource. | ||
| """ | ||
| # Convert start_date and end_date to Pendulum datetime objects | ||
| start_date_obj = ensure_pendulum_datetime(start_date) | ||
| end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None | ||
|
|
||
| # Extract credentials from secrets dictionary | ||
| auth = (credentials["email"], credentials["password"]) | ||
| subdomain = credentials["subdomain"] | ||
| url = f"https://{subdomain}.zendesk.com" | ||
|
|
||
| # we use `append` write disposition, because objects in tickets_data endpoint are never updated | ||
| # so we do not need to merge | ||
| # we set primary_key so allow deduplication of events by the `incremental` below in the rare case | ||
| # when two events have the same timestamp | ||
| @dlt.resource(primary_key="id", write_disposition="append") | ||
| def tickets_data( | ||
| updated_at: dlt.sources.incremental[ | ||
| pendulum.DateTime | ||
| ] = dlt.sources.incremental( | ||
| "updated_at", | ||
| initial_value=start_date_obj, | ||
| end_value=end_date_obj, | ||
| allow_external_schedulers=True, | ||
| ) | ||
| ): | ||
| # URL For ticket events | ||
| # 'https://d3v-dlthub.zendesk.com/api/v2/incremental/tickets_data.json?start_time=946684800' | ||
| event_pages = get_pages( | ||
| url=url, | ||
| endpoint="/api/v2/incremental/tickets", | ||
| auth=auth, | ||
| data_point_name="tickets", | ||
| params={"start_time": updated_at.last_value.int_timestamp}, | ||
| ) | ||
| for page in event_pages: | ||
| yield ([_fix_date(ticket) for ticket in page]) | ||
|
|
||
| # stop loading when using end_value and end is reached. | ||
| # unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves | ||
| if updated_at.end_out_of_range: | ||
| return | ||
|
|
||
| return tickets_data | ||
|
|
||
|
|
||
| # helper function to fix the datetime format | ||
| def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]: | ||
| if not value: | ||
| return None | ||
| return ensure_pendulum_datetime(value) | ||
|
|
||
| # modify dates to return datetime objects instead | ||
| def _fix_date(ticket): | ||
| ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"]) | ||
| ticket["created_at"] = _parse_date_or_none(ticket["created_at"]) | ||
| ticket["due_at"] = _parse_date_or_none(ticket["due_at"]) | ||
| return ticket | ||
|
|
||
| # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk | ||
| def get_pages( | ||
| url: str, | ||
| endpoint: str, | ||
| auth: Tuple[str, str], | ||
| data_point_name: str, | ||
| params: Optional[Dict[str, Any]] = None, | ||
| ): | ||
| """ | ||
| Makes a request to a paginated endpoint and returns a generator of data items per page. | ||
|
|
||
| Args: | ||
| url: The base URL. | ||
| endpoint: The url to the endpoint, e.g. /api/v2/calls | ||
| auth: Credentials for authentication. | ||
| data_point_name: The key which data items are nested under in the response object (e.g. calls) | ||
| params: Optional dict of query params to include in the request. | ||
|
|
||
| Returns: | ||
| Generator of pages, each page is a list of dict data items. | ||
| """ | ||
| # update the page size to enable cursor pagination | ||
| params = params or {} | ||
| params["per_page"] = 1000 | ||
| headers = None | ||
|
|
||
| # make request and keep looping until there is no next page | ||
| get_url = f"{url}{endpoint}" | ||
| while get_url: | ||
| response = client.get( | ||
| get_url, headers=headers, auth=auth, params=params | ||
| ) | ||
| response.raise_for_status() | ||
| response_json = response.json() | ||
| result = response_json[data_point_name] | ||
| yield result | ||
|
|
||
| get_url = None | ||
| # See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format | ||
| if not response_json["end_of_stream"]: | ||
| get_url = response_json["next_page"] | ||
|
|
||
| if __name__ == "__main__": | ||
| # create a pipeline with an appropriate name | ||
| pipeline = dlt.pipeline( | ||
| pipeline_name="qdrant_zendesk_pipeline", | ||
| destination="qdrant", | ||
| dataset_name="zendesk_data", | ||
| ) | ||
|
|
||
| # run the dlt pipeline and save info about the load process | ||
| load_info = pipeline.run( | ||
| # here we use a special function to tell Qdrant which fields to embed | ||
| qdrant_adapter( | ||
| zendesk_support(), # retrieve tickets data | ||
| embed=["subject", "description"], | ||
| ) | ||
| ) | ||
|
|
||
| print(load_info) | ||
|
|
||
|
|
||
| # running the Qdrant client to connect to your Qdrant database | ||
|
|
||
| @with_config(sections=("destination", "credentials")) | ||
| def get_qdrant_client(location=dlt.secrets.value, api_key=dlt.secrets.value): | ||
| return QdrantClient( | ||
| url=location, | ||
| api_key=api_key, | ||
| ) | ||
|
|
||
| # running the Qdrant client to connect to your Qdrant database | ||
| qdrant_client = get_qdrant_client() | ||
|
|
||
| # view Qdrant collections you'll find your dataset here: | ||
| print(qdrant_client.get_collections()) | ||
|
|
||
| # query Qdrant with prompt: getting tickets info close to "cancellation" | ||
| response = qdrant_client.query( | ||
| "zendesk_data_content", # collection/dataset name with the 'content' suffix -> tickets content table | ||
| query_text=["cancel", "cancel subscription"], # prompt to search | ||
| limit=3 # limit the number of results to the nearest 3 embeddings | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 33 additions & 22 deletions
55
docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,41 +1,52 @@ | ||
| def connector_x_snippet() -> None: | ||
|
|
||
| # @@@DLT_SNIPPET_START example | ||
| # @@@DLT_SNIPPET_START markdown_source | ||
|
|
||
| import connectorx as cx | ||
|
|
||
| import dlt | ||
| from dlt.sources.credentials import ConnectionStringCredentials | ||
|
|
||
| def read_sql_x( | ||
| conn_str: ConnectionStringCredentials = dlt.secrets.value, | ||
| query: str = dlt.config.value | ||
| query: str = dlt.config.value, | ||
| ): | ||
| yield cx.read_sql(conn_str.to_native_representation(), query, return_type="arrow2", protocol="binary") | ||
|
|
||
| # create genome resource with merge on `upid` primary key | ||
| genome = dlt.resource( | ||
| name="genome", | ||
| write_disposition="merge", | ||
| primary_key="upid", | ||
| standalone=True | ||
| )(read_sql_x)( | ||
| "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] | ||
| "SELECT * FROM genome ORDER BY created LIMIT 1000" | ||
| ) | ||
| # add incremental on created at | ||
| genome.apply_hints(incremental=dlt.sources.incremental("created")) | ||
| yield cx.read_sql( | ||
| conn_str.to_native_representation(), | ||
| query, | ||
| return_type="arrow2", | ||
| protocol="binary", | ||
| ) | ||
|
|
||
| def genome_resource(): | ||
| # create genome resource with merge on `upid` primary key | ||
| genome = dlt.resource( | ||
| name="genome", | ||
| write_disposition="merge", | ||
| primary_key="upid", | ||
| standalone=True, | ||
| )(read_sql_x)( | ||
| "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] | ||
| "SELECT * FROM genome ORDER BY created LIMIT 1000", | ||
| ) | ||
| # add incremental on created at | ||
| genome.apply_hints(incremental=dlt.sources.incremental("created")) | ||
| return genome | ||
|
|
||
| # @@@DLT_SNIPPET_END markdown_source | ||
|
|
||
| # @@@DLT_SNIPPET_START markdown_pipeline | ||
| __name__ = "__main__" # @@@DLT_REMOVE | ||
| __name__ = "__main__" # @@@DLT_REMOVE | ||
| if __name__ == "__main__": | ||
| pipeline = dlt.pipeline(destination="duckdb") | ||
| genome = genome_resource() | ||
|
|
||
| print(pipeline.run(genome)) | ||
| print(pipeline.last_trace.last_normalize_info) | ||
| # NOTE: run pipeline again to see that no more records got loaded thanks to incremental working | ||
| # NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading | ||
| # @@@DLT_SNIPPET_END markdown_pipeline | ||
|
|
||
| # check that stuff was loaded | ||
| row_counts = pipeline.last_trace.last_normalize_info.row_counts | ||
| assert row_counts["genome"] == 1000 | ||
| # check that stuff was loaded # @@@DLT_REMOVE | ||
| row_counts = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE | ||
| assert row_counts["genome"] == 1000 # @@@DLT_REMOVE | ||
|
|
||
| # @@@DLT_SNIPPET_END example |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.