Skip to content

Commit

Permalink
fixed multiple bugs and cleaned up scheduler code
Browse files Browse the repository at this point in the history
  • Loading branch information
shirleycohen committed May 21, 2021
1 parent 6dc7edd commit 6788a5b
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 134 deletions.
12 changes: 6 additions & 6 deletions BigQueryUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def insert_row(self, table_id, data_asset, tagged_values):
else:
row[tagged_value['field_id']]= tagged_value['field_value']

print('row: ' + str(row))
#print('row: ' + str(row))

row_to_insert = [row,]

Expand All @@ -131,11 +131,11 @@ def insert_row(self, table_id, data_asset, tagged_values):
def copy_tag(self, table_name, table_fields, tagged_table, tagged_column, tagged_values):

print("*** inside BigQueryUtils.copy_tag() ***")
print("table_name: " + table_name)
print("table_fields: " + str(table_fields))
print("tagged_table: " + tagged_table)
print("tagged_column: " + tagged_column)
print("tagged_values: " + str(tagged_values))
#print("table_name: " + table_name)
#print("table_fields: " + str(table_fields))
#print("tagged_table: " + tagged_table)
#print("tagged_column: " + tagged_column)
#print("tagged_values: " + str(tagged_values))

exists, table_id, settings = self.table_exists(table_name)

Expand Down
4 changes: 4 additions & 0 deletions DataCatalogUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ def create_update_dynamic_tags(self, fields, included_uris, excluded_uris, tag_u
print('table_name: ' + table_name)
query_str = query_expression.replace('$table', table_name)

# table not in query expression (e.g. select 'string')
if table_index == -1:
query_str = query_expression

# run resulting query in BQ
print('query_str: ' + query_str)
rows = bq_client.query(query_str).result()
Expand Down
22 changes: 16 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,51 @@
5. Click Save and you're almost done!
6. Open Tag Engine and go to Report Settings. Add your project to the project_ids field so that it gets included in the Coverage Report.

### Step 3: To run Tag Engine:
### Step 3: Deploy Tag Engine:
```
export REPO=https://github.com/GoogleCloudPlatform/datacatalog-tag-engine.git
git clone $REPO
gcloud app deploy
gcloud app browse
```

### Step 4: Create App Engine Task Queue
### Step 4: Create App Engine Task Queue:
#### Task queue is used to refresh dynamic tags
```
gcloud config set project $PROJECT_ID
gcloud tasks queues create tag-engine
gcloud tasks queues update tag-engine --max-attempts=3
```

### Step 5: Create cron jobs through Cloud Scheduler
### Step 5: Create cron jobs through Cloud Scheduler:
#### Cron jobs are used to refresh dynamic tags and to run tag propagation
```
gcloud scheduler jobs create app-engine run-ready-jobs --schedule='every 60 minutes' --relative-url "/run_ready_jobs"
gcloud scheduler jobs create app-engine clear-stale-jobs --schedule='every 30 minutes' --relative-url "/clear_stale_jobs"
gcloud scheduler jobs create app-engine run-propagation --schedule='every 60 minutes' --relative-url "/run_propagation"
```

### Step 6: Deploy Zeta cloud function
### Step 6: Deploy Zeta cloud function:
#### Cloud function is used to parse BQ view definitions when running tag propagation
```
cd tag-engine/zeta
gcloud functions deploy zeta --trigger-http --entry-point com.google.cloud.sa.tagengine.service.zeta.ZetaSqlParserFunction \
--runtime java11 --memory 1GB --allow-unauthenticated
```

Open constants.py and set the ZETA_URL variable to your cloud function trigger URL:
### Step 7: Set config variables:

Open `tagengine.ini` and set the `TASK_QUEUE` and `ZETA_URL` variables. The `TASK_QUEUE` variable should be set to your fully qualified Tag Engine task queue and the `ZETA_URL` variable should be set to your zeta cloud function.

For example:

`TASK_QUEUE = 'projects/tag-engine-283315/locations/us-east1/queues/tag-engine'`
`ZETA_URL = 'https://us-central1-tag-engine-283315.cloudfunctions.net/zeta'`


### Step 8: To run Tag Engine:

`gcloud app browse`

### To clean up the task queue and cron jobs:
```
gcloud tasks queues delete tag-engine
Expand Down
120 changes: 69 additions & 51 deletions Resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def find_resources(self, uris):
dataset = self.bq_client.get_dataset(dataset_id)

table_expression = split_path[5]
print("table_expression: " + table_expression)

path_length = len(split_path)
print("path_length: " + str(path_length))
Expand All @@ -106,80 +107,97 @@ def find_resources(self, uris):
tag_type = constants.BQ_COLUMN_TAG


if tag_type == constants.BQ_TABLE_TAG:
if table_expression == "*":
print("list all tables in dataset")
tables = list(self.bq_client.list_tables(dataset))

if table_expression == "*":
print("list all tables in dataset")
tables = list(self.bq_client.list_tables(dataset))
for table in tables:

print("full_table_id: " + str(table.full_table_id))
resources.add(table.full_table_id)

elif "*" in table_expression:
print("table expression contains wildcard")
table_substring = table_expression.replace("*", "")

tables = list(self.bq_client.list_tables(dataset))

for table in tables:
print("full_table_id: " + str(table.full_table_id))
formatted_resource = self.format_table_resource(table.full_table_id)
resources.add(formatted_resource)
for table in tables:
if table_substring in table.full_table_id:

elif "*" in table_expression:
print("table expression contains wildcard")
table_substring = table_expression.replace("*", "")
print("full_table_id: " + str(table.full_table_id))
resources.add(table.full_table_id)

tables = list(self.bq_client.list_tables(dataset))

for table in tables:
if table_substring in table.full_table_id:
print("full_table_id: " + str(table.full_table_id))
formatted_resource = self.format_table_resource(table.full_table_id)
resources.add(formatted_resource)
else:
print("table expression == table name")

table_id = dataset_id + "." + table_expression

print('table_id: ' + table_id)

try:
table = self.bq_client.get_table(table_id)

else:
print("table expression == table name")
table_id = dataset_id + "." + table_expression
print("full_table_id: " + table.full_table_id)
resources.add(table.full_table_id)

try:
table = self.bq_client.get_table(table_id)

print("full_table_id: " + table.full_table_id)
formatted_resource = self.format_table_resource(table.full_table_id)
resources.add(formatted_resource)

except NotFound:
print("NotFound: table " + table_expression + " not found.")

except NotFound:
print("NotFound: table " + table_id + " not found.")

if tag_type == constants.BQ_COLUMN_TAG:
print("tagging a column")

column_exists = False
column = split_path[6]
print("column: " + column)

try:
table_id = dataset_id + "." + table_expression
print("table_id: " + table_id)

table = self.bq_client.get_table(table_id)
schema = table.schema
#print("table schema: " + str(table.schema))

for schema_field in schema:
if schema_field.name == column:
column_exists = True
break

column_resources = set()

for table_id in resources:

print('table_id: ' + table_id)

try:

table = self.bq_client.get_table(table_id.replace(':', '.'))

schema = table.schema
print("table schema: " + str(table.schema))

for schema_field in schema:
if schema_field.name == column:
column_exists = True
break

except:
print("NotFound: table " + table_id + " not found.")


if column_exists == True:
print("column exists")
table_resource = self.format_table_resource(table.full_table_id)
table_resource = self.format_table_resource(table_id)
table_column_resource = table_resource + "/column/" + column
print("table_column_resource: " + table_column_resource)
resources.add(table_column_resource)
column_resources.add(table_column_resource)
else:
print('Error: column ' + column + ' not found in table ' + table_id)
return None

except NotFound:
print("NotFound: table " + table_expression + " not found.")

return resources
return column_resources


if tag_type == constants.BQ_TABLE_TAG:
table_resources = set()

for table in resources:
formatted_table = self.format_table_resource(table)
table_resources.add(formatted_table)

return table_resources

if __name__ == '__main__':
res = Resources(project_id='tag-engine-283315');
included_uris='bigquery/project/tag-engine-283315/dataset/covid/Employees*, bigquery/project/tag-engine-283315/dataset/covid/Employee_RTO/emp_id, bigquery/project/tag-engine-283315/dataset/views/*'
included_uris='bigquery/project/data-lake-290221/dataset/hr/FTE_*'
excluded_uris=None
resources = res.get_resources(included_uris, excluded_uris)
print('resources: ' + str(resources))
2 changes: 1 addition & 1 deletion TagEngineUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ def write_dynamic_tag(self, config_status, fields, included_uris, excluded_uris,
'included_uris': included_uris,
'excluded_uris': excluded_uris,
'template_uuid': template_uuid,
'refresh_frequency': refresh_frequency,
'refresh_frequency': delta,
'tag_export': tag_export,
'scheduling_status': 'READY',
'next_run': next_run,
Expand Down
Loading

0 comments on commit 6788a5b

Please sign in to comment.