How do I clean up older data from the database? #12047
-
After using |
Beta Was this translation helpful? Give feedback.
Replies: 6 comments 13 replies
-
One option is to use the python APIs against the Perform this operation with great care. An example script would look something like this.
|
Beta Was this translation helpful? Give feedback.
-
Will this trigger the relevant cleanup to take place as well (like |
Beta Was this translation helpful? Give feedback.
-
If I want to keep ASSET_MATERIALIZATION and ASSET_OBSERVATION event for UI, is there a way to keep them? |
Beta Was this translation helpful? Give feedback.
-
Is there a clean way to also remove the associated folders on disk? I have a disk that's slowly getting filled up with the contents of intermediate results of runs. |
Beta Was this translation helpful? Give feedback.
-
Just for inspiration, we are currently cleaning up the disk by blindly deleting storage older than 30 days.
No guarantee this is a good idea, I can only confirm this hasn't broken our production instance (yet). |
Beta Was this translation helpful? Give feedback.
-
In Dagster+, runs can be deleted via an authorized call to GraphQL API using the import os
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
# Define the endpoint URL and token
org_name = "your-org-name"
base_url = f"https://{org_name}.dagster.cloud/"
deployment_name = "prod" # string name for actual deployments, branch deployments use a string ID of their deploymentId
url = base_url + deployment_name + "/graphql"
token = os.getenv("DAGSTER_CLOUD_USER_TOKEN") # a user token generated from the Organization Settings page in Dagster+. Note: use a user token, not agent token
# Define the transport with the endpoint URL and any headers if needed
transport = RequestsHTTPTransport(
url=url,
headers={
"Dagster-Cloud-Api-Token": token,
},
use_json=True,
timeout=60,
)
# Instantiate the client
client = Client(transport=transport)
# Define the GraphQL mutation
delete_run_mutation = gql("""
mutation DeleteRun($runId: String!) {
deleteRun(runId: $runId) {
__typename
... on DeletePipelineRunSuccess {
runId
}
... on RunNotFoundError {
runId
}
... on PythonError {
message
stack
}
}
}
""")
# Define the query variables
query_variables = {
"runId": "your-run-id" # replace with the actual run ID you want to delete
}
# Execute the mutation
try:
result = client.execute(delete_run_mutation, variable_values=query_variables)
print(result)
except Exception as e:
print(f"An error occurred: {e}") |
Beta Was this translation helpful? Give feedback.
One option is to use the python APIs against the
DagsterInstance
to query for older runs and delete them. This is a destructive operation that will remove the events, tags, and run record from the database. This will removedagster
s understanding that this run ever occurred, which can be particularly impactful to partitioned jobs and assets.Perform this operation with great care.
An example script would look something like this.