Skip to content

Commit

Permalink
[dy] Add sensor templates for sql databases (mage-ai#2219)
Browse files Browse the repository at this point in the history
* [dy] Add mysql

* [dy] Formatting
  • Loading branch information
dy46 authored Mar 17, 2023
1 parent fc63466 commit 5f37710
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 4 deletions.
9 changes: 6 additions & 3 deletions mage_ai/data_preparation/shared/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def get_secret_value(name: str) -> str:
from mage_ai.orchestration.db.models import Secret
fernet = Fernet(get_encryption_key())

secret = Secret.query.filter(Secret.name == name).one_or_none()
if secret:
return fernet.decrypt(secret.value.encode('utf-8')).decode('utf-8')
try:
secret = Secret.query.filter(Secret.name == name).one_or_none()
if secret:
return fernet.decrypt(secret.value.encode('utf-8')).decode('utf-8')
except Exception:
print(f'WARNING: Could not find secret value for secret {name}')
32 changes: 32 additions & 0 deletions mage_ai/data_preparation/templates/sensors/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from os import path

if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor


@sensor
def query_bigquery_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a BigQuery query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""

config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

query = 'Your BigQuery query' # Specify your SQL query here

loader = BigQuery.with_config(ConfigFileLoader(config_path, config_profile))
df = loader.load(query)

# Add your checks here
if df.empty:
return False

return True
33 changes: 33 additions & 0 deletions mage_ai/data_preparation/templates/sensors/mysql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.mysql import MySQL
from os import path

if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor


@sensor
def query_mysql_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a MySQL query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""

config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

query = 'Your MySQL query' # Specify your SQL query here

with MySQL.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)

# Add your checks here
if df.empty:
return False

return True
33 changes: 33 additions & 0 deletions mage_ai/data_preparation/templates/sensors/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path

if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor


@sensor
def query_postgres_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a Postgres query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""

config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

query = 'Your Postgres query' # Specify your SQL query here

with Postgres.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)

# Add your checks here
if df.empty:
return False

return True
33 changes: 33 additions & 0 deletions mage_ai/data_preparation/templates/sensors/redshift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.redshift import Redshift
from os import path

if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor


@sensor
def query_redshift_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a Redshift query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""

config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

query = 'Your Redshift query' # Specify your SQL query here

with Redshift.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)

# Add your checks here
if df.empty:
return False

return True
3 changes: 2 additions & 1 deletion mage_ai/data_preparation/templates/sensors/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def check_condition(**kwargs) -> bool:
bucket_name = 'your_bucket_name'
s3_path = 'path/to/folder/or/file'

return S3.with_config(ConfigFileLoader(config_path, config_profile)).exists(
config_file_loader = ConfigFileLoader(config_path, config_profile)
return S3.with_config(config_file_loader).exists(
bucket_name, s3_path
)
33 changes: 33 additions & 0 deletions mage_ai/data_preparation/templates/sensors/snowflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from os import path

if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor


@sensor
def query_snowflake_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a Snowflake query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""

config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

query = 'Your Snowflake query' # Specify your SQL query here

with Snowflake.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)

# Add your checks here
if df.empty:
return False

return True
5 changes: 5 additions & 0 deletions mage_ai/frontend/interfaces/DataSourceType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ export const DATA_SOURCE_TYPES: { [blockType in BlockTypeEnum]?: DataSourceTypeE
],
[BlockTypeEnum.SENSOR]: [
DataSourceTypeEnum.GENERIC,
DataSourceTypeEnum.BIGQUERY,
DataSourceTypeEnum.MYSQL,
DataSourceTypeEnum.POSTGRES,
DataSourceTypeEnum.REDSHIFT,
DataSourceTypeEnum.S3,
DataSourceTypeEnum.SNOWFLAKE,
],
};

Expand Down

0 comments on commit 5f37710

Please sign in to comment.