diff --git a/config/cron_udp.hjson b/config/cron_udp.hjson index 2d1fb157..fd3beadd 100644 --- a/config/cron_udp.hjson +++ b/config/cron_udp.hjson @@ -5,18 +5,18 @@ // and it wouldn’t reflect when the data was actually dumped from canvas. // More info on UDP's batch-ingest DAG process can be found here: https://resources.unizin.org/display/UDP/Batch-ingest+application ''' - select 'canvasdatadate' as pkey, min(dag_run) as pvalue from report.publish_info pi2 + SELECT * FROM EXTERNAL_QUERY("us.context_store", "select 'canvasdatadate' as pkey, min(dag_run) as pvalue from report.publish_info pi2"); ''', "user" : ''' select ( - cast(%(canvas_data_id_increment)s as bigint) + cast(@canvas_data_id_increment as bigint) + cast(p2.lms_ext_id as bigint) ) as user_id, case - when pe.email_address is not null then lower(split_part(pe.email_address , '@', 1)) + WHEN pe.email_address IS NOT NULL THEN LOWER(REGEXP_EXTRACT(pe.email_address, r'^([^@]+)')) else p2.sis_ext_id end as sis_name, cast(co.lms_int_id as bigint) as course_id, cg.le_current_score as current_grade, @@ -27,22 +27,22 @@ when cse.role = 'Teacher' then 'TeacherEnrollment' else '' end as enrollment_type - from entity.course_section_enrollment cse - left join entity.course_section cs + from context_store_entity.course_section_enrollment cse + left join context_store_entity.course_section cs on cse.course_section_id = cs.course_section_id - left join keymap.course_offering co + left join context_store_keymap.course_offering co on cs.le_current_course_offering_id = co.id - left join entity.person p + left join context_store_entity.person p on cse.person_id = p.person_id - left join keymap.person p2 + left join context_store_keymap.person p2 on p.person_id = p2.id - left join entity.person_email pe + left join context_store_entity.person_email pe on p.person_id = pe.person_id - left join entity.course_grade cg + left join context_store_entity.course_grade cg on cse.course_section_id = cg.course_section_id and cse.person_id = cg.person_id where - co.lms_int_id = ANY(%(course_ids)s) - and cse.role = ANY(ARRAY['Student', 'Teacher', 'TeachingAssistant']::text[]) + co.lms_int_id IN UNNEST(@course_ids) + and cse.role IN UNNEST(ARRAY['Student', 'Teacher', 'TeachingAssistant']) and cse.role_status = 'Enrolled' and cse.enrollment_status = 'Active' order by user_id @@ -51,28 +51,31 @@ ''' with assignment_details as ( select la.due_date, title, la.course_offering_id, la.learner_activity_id, la.points_possible, la.learner_activity_group_id - from entity.learner_activity la, keymap.course_offering co + from context_store_entity.learner_activity la, context_store_keymap.course_offering co where la.visibility = 'everyone' and la.status = 'published' and la.course_offering_id = co.id - and co.lms_int_id = ANY(%(course_ids)s) + and co.lms_int_id IN UNNEST(@course_ids) ), assignment_grp as ( select lg.* - from entity.learner_activity_group lg, keymap.course_offering co + from context_store_entity.learner_activity_group lg, context_store_keymap.course_offering co where lg.status = 'available' and lg.course_offering_id = co.id - and co.lms_int_id = ANY(%(course_ids)s) + and co.lms_int_id IN UNNEST(@course_ids) ), assign_more as ( select distinct(a.learner_activity_group_id), da.group_points from assignment_details a - join ( - select learner_activity_group_id, sum(points_possible) as group_points - from assignment_details - group by learner_activity_group_id - ) as da - on a.learner_activity_group_id = da.learner_activity_group_id + JOIN UNNEST(( + SELECT ARRAY_AGG(STRUCT(learner_activity_group_id, group_points)) + FROM ( + select learner_activity_group_id, sum(points_possible) as group_points + from assignment_details + group by learner_activity_group_id + ) + )) as da + on a.learner_activity_group_id = da.learner_activity_group_id ), grp_full as ( select a.group_points, b.learner_activity_group_id from assign_more a @@ -81,7 +84,7 @@ ), assign_rules as ( select distinct ad.learner_activity_group_id, agr.drop_lowest_amount as drop_lowest, agr.drop_highest_amount as drop_highest from grp_full ad - join entity.learner_activity_group agr + join context_store_entity.learner_activity_group agr on ad.learner_activity_group_id = agr.learner_activity_group_id ), assignment_grp_points as ( select ag.*, am.group_points AS group_points, ar.drop_lowest as drop_lowest, ar.drop_highest as drop_highest @@ -90,16 +93,16 @@ join assign_rules ar on ag.learner_activity_group_id = ar.learner_activity_group_id ) select - cast(lag_km.lms_int_id as BIGINT) as id, - cast(co_km.lms_int_id as BIGINT) as course_id, - cast(agp.group_weight as float) as weight, + cast(lag_km.lms_int_id as INT64) as id, + cast(co_km.lms_int_id as INT64) as course_id, + cast(agp.group_weight as FLOAT64) as weight, agp.name as name, agp.group_points as group_points, agp.drop_lowest as drop_lowest, agp.drop_highest as drop_highest from assignment_grp_points agp, - keymap.course_offering co_km, - keymap.learner_activity_group lag_km + context_store_keymap.course_offering co_km, + context_store_keymap.learner_activity_group lag_km where agp.course_offering_id = co_km.id and agp.learner_activity_group_id = lag_km.id order by id @@ -109,22 +112,22 @@ with assignment_info as ( select - la.due_date AT TIME ZONE 'UTC' as due_date, + la.due_date as due_date, la.title as name, - cast(co.lms_int_id as BIGINT) as course_id, - cast(la_km.lms_int_id as BIGINT) as id, + cast(co.lms_int_id as INT64) as course_id, + cast(la_km.lms_int_id as INT64) as id, la.points_possible as points_possible, - cast(lag_km.lms_int_id as BIGINT) as assignment_group_id + cast(lag_km.lms_int_id as INT64) as assignment_group_id from - entity.learner_activity la, - keymap.course_offering co, - keymap.learner_activity la_km, - keymap.learner_activity_group lag_km + context_store_entity.learner_activity la, + context_store_keymap.course_offering co, + context_store_keymap.learner_activity la_km, + context_store_keymap.learner_activity_group lag_km where la.visibility = 'everyone' and la.status = 'published' and la.course_offering_id = co.id - and co.lms_int_id = ANY(%(course_ids)s) + and co.lms_int_id IN UNNEST(@course_ids) and la.learner_activity_id = la_km.id and la.learner_activity_group_id = lag_km.id ) @@ -142,24 +145,24 @@ cast(0 as boolean) end as consider_weight from - entity.learner_activity_group lag, - keymap.course_offering co_km + context_store_entity.learner_activity_group lag, + context_store_keymap.course_offering co_km where lag.course_offering_id = co_km.id - and co_km.lms_int_id = ANY(%(course_ids)s) + and co_km.lms_int_id IN UNNEST(@course_ids) group by co_km.lms_int_id ''', "term": ''' select - cast(ka.lms_int_id as BIGINT) as id, - cast(ka.lms_ext_id as BIGINT) as canvas_id, + cast(ka.lms_int_id as INT64) as id, + cast(ka.lms_ext_id as INT64) as canvas_id, a.name as name, - a.le_term_begin_date::timestamp without time zone as date_start, - a.le_term_end_date::timestamp without time zone as date_end + a.le_term_begin_date as date_start, + a.le_term_end_date as date_end from - entity.academic_term as a - left join keymap.academic_term as ka on ka.id = a.academic_term_id + context_store_entity.academic_term as a + left join context_store_keymap.academic_term as ka on ka.id = a.academic_term_id where ka.lms_ext_id is not null order by id @@ -170,18 +173,18 @@ "course": ''' SELECT - cast(co2.lms_int_id as BIGINT) as id, - cast(co2.lms_ext_id as BIGINT) as canvas_id, - cast(at2.lms_int_id as BIGINT) as enrollment_term_id, + cast(co2.lms_int_id as INT64) as id, + cast(co2.lms_ext_id as INT64) as canvas_id, + cast(at2.lms_int_id as INT64) as enrollment_term_id, co.le_code as name, - co.le_start_date::timestamp without time zone as start_at, - co.le_end_date::timestamp without time zone as conclude_at + TIMESTAMP(co.le_start_date) as start_at, + TIMESTAMP(co.le_end_date) as conclude_at FROM - entity.course_offering co - LEFT OUTER JOIN entity.academic_term at1 on (co.academic_term_id = at1.academic_term_id), - keymap.course_offering co2, - keymap.academic_term at2 - WHERE co2.lms_int_id = ANY(%(course_ids)s) + context_store_entity.course_offering co + LEFT OUTER JOIN context_store_entity.academic_term at1 on (co.academic_term_id = at1.academic_term_id), + context_store_keymap.course_offering co2, + context_store_keymap.academic_term at2 + WHERE co2.lms_int_id IN UNNEST(@course_ids) and co.course_offering_id = co2.id and at1.academic_term_id = at2.id ''', @@ -191,29 +194,28 @@ cast(f_km.lms_int_id as BIGINT) as id, f.status as file_state, f.display_name as display_name - from entity.file f, keymap.file f_km, keymap.course_offering co_km + from context_store_entity.file f, context_store_keymap.file f_km, context_store_keymap.course_offering co_km where f.course_offering_id = co_km.id and f.file_id = f_km.id - and co_km.lms_int_id = ANY(%(course_ids)s) + and co_km.lms_int_id IN UNNEST(@course_ids) order by id ''', "submission": ''' - create temporary table all_assign_sub as ( with enrollment as ( select distinct cse.person_id as user_id - from entity.course_section_enrollment cse - left join entity.course_section cs + from context_store_entity.course_section_enrollment cse + left join context_store_entity.course_section cs on cse.course_section_id = cs.course_section_id - left join keymap.course_offering co + left join context_store_keymap.course_offering co on cs.le_current_course_offering_id = co.id where - co.lms_int_id = ANY(:course_ids) + co.lms_int_id in UNNEST(@course_ids) and cse.role_status ='Enrolled' - and cse."role" = 'Student' + and cse.role = 'Student' and cse.enrollment_status = 'Active' ), submission as @@ -222,7 +224,7 @@ la.status, la.visibility, la2.lms_int_id as assignment_id, - cast(co.lms_int_id as BIGINT) as course_id, + co.lms_int_id as course_id, la.title as assignment_title, lar.published_score as published_score, lar.response_date as submitted_at, @@ -232,22 +234,24 @@ la.title as title, lar.learner_activity_result_id as learner_activity_result_id, lar.person_id as short_user_id, - cast(lar2.lms_int_id as BIGINT) as submission_id, - (cast(:canvas_data_id_increment as bigint) + cast(p.lms_ext_id as bigint)) as canvas_user_id - from entity.learner_activity_result lar + lar2.lms_int_id as submission_id, + CAST(@canvas_data_id_increment AS INT64) + CAST(p.lms_ext_id AS INT64) as canvas_user_id + from context_store_entity.learner_activity_result lar join enrollment on lar.person_id= enrollment.user_id join enrollment e on lar.person_id = e.user_id - join keymap.learner_activity_result lar2 on lar.learner_activity_result_id = lar2.id - left join entity.learner_activity la on lar.learner_activity_id = la.learner_activity_id - left join keymap.learner_activity la2 on la.learner_activity_id = la2.id - left join keymap.course_offering co on co.id = la.course_offering_id - join keymap.person p on p.id = lar.person_id + join context_store_keymap.learner_activity_result lar2 on lar.learner_activity_result_id = lar2.id + left join context_store_entity.learner_activity la on lar.learner_activity_id = la.learner_activity_id + left join context_store_keymap.learner_activity la2 on la.learner_activity_id = la2.id + left join context_store_keymap.course_offering co on co.id = la.course_offering_id + join context_store_keymap.person p on p.id = lar.person_id where - co.lms_int_id = ANY(:course_ids) + co.lms_int_id in UNNEST(@course_ids) and la.status = 'published' - ) + ), + all_assign_sub as + ( select - cast(submission_id as BIGINT) AS id, + submission_id AS id, assignment_id AS assignment_id, course_id, canvas_user_id, @@ -264,22 +268,20 @@ submitted_at AS submitted_at, graded_at AS graded_date, grade_posted - from - submission + from + submission + order by assignment_id ) - ''', - "submission_with_avg_score": - ''' select - f.id::bigint, - f.assignment_id::bigint assignment_id, + f.id, + CAST(f.assignment_id AS INT64) AS assignment_id, f.course_id, - f.canvas_user_id::bigint as user_id, - f.score::float, + CAST(f.canvas_user_id AS INT64) AS user_id, + CAST(f.score AS FLOAT64) AS score, f.submitted_at, f.graded_date, f.grade_posted, - cast(f1.avg_score as float) as avg_score + CAST(f1.avg_score AS FLOAT64) AS avg_score from all_assign_sub f join ( diff --git a/config/env_sample.hjson b/config/env_sample.hjson index f1c48abf..f367b8db 100644 --- a/config/env_sample.hjson +++ b/config/env_sample.hjson @@ -185,26 +185,8 @@ # By default this is empty and no views are disabled # options are as described in course_view_options table column names [\"show_assignment_planning\", \"show_grade_distribution\"] "VIEWS_DISABLED": "", - # Data Warehouse configuration - # Uncomment these variables and fill them in if you're using cron to load - # from a data warehouse. These are optional - # Database engine driver - "DATA_WAREHOUSE": { - "ENGINE": "django.db.backends.postgresql", - # database name - "NAME": "", - # database user - "USER": "", - # database password - "PASSWORD": "", - # database host - "HOST": "", - # database port - "PORT": 5432, - # Enable/Disable Unizin Date Warehouse specific features/data - "IS_UNIZIN": true - }, - # Learning Record Store configuration + # Data Warehoue and Learning Record Store configuration + # The warehouse and LRS are combined now in the same data source "LRS": { # LRS database engine driver (use `google.cloud.bigquery` for bigquery). no other LRS settings needed "ENGINE": "google.cloud.bigquery", diff --git a/dashboard/common/db_util.py b/dashboard/common/db_util.py index 36d0d2d1..d0316844 100644 --- a/dashboard/common/db_util.py +++ b/dashboard/common/db_util.py @@ -1,6 +1,6 @@ # Some utility functions used by other classes in this project import logging -from datetime import datetime +import datetime from typing import Dict, List, Literal, TypedDict, Union from urllib.parse import quote_plus @@ -21,7 +21,7 @@ class DjangoDBParams(TypedDict): - ENGINE: Literal['django.db.backends.mysql', 'django.db.backends.postgresql'] + ENGINE: Literal['django.db.backends.mysql'] NAME: str USER: str PASSWORD: str @@ -37,7 +37,7 @@ def create_sqlalchemy_engine(db_params: DjangoDBParams) -> Engine: if new_db_params['ENGINE'] == (BACKENDS_PATH + 'mysql'): return create_engine(f'mysql+mysqldb://{core_string}?charset=utf8mb4') else: - return create_engine('postgresql+psycopg://' + core_string) + raise Exception("Only mysql is supported") def canvas_id_to_incremented_id(canvas_id): @@ -171,7 +171,7 @@ def get_user_courses_info(username: str, course_id: Union[int, None] = None) -> return enrollments -def get_last_cronjob_run() -> Union[datetime, None]: +def get_last_cronjob_run() -> Union[datetime.datetime, None]: try: c = CronJobLog.objects.filter(is_success=1).latest('end_time') end_time = c.end_time @@ -181,10 +181,7 @@ def get_last_cronjob_run() -> Union[datetime, None]: return None -def get_canvas_data_date() -> Union[datetime, None]: - if not settings.DATABASES.get('DATA_WAREHOUSE', {}).get('IS_UNIZIN'): - return get_last_cronjob_run() - +def get_canvas_data_date() -> Union[datetime.datetime, None]: try: with django.db.connection.cursor() as cursor: cursor.execute("SELECT pvalue from unizin_metadata where pkey = 'canvasdatadate'") @@ -194,4 +191,4 @@ def get_canvas_data_date() -> Union[datetime, None]: return date except Exception: logger.info("Value could not be found from metadata", exc_info = True) - return None + return None \ No newline at end of file diff --git a/dashboard/cron.py b/dashboard/cron.py index 867b64b6..a4aab726 100644 --- a/dashboard/cron.py +++ b/dashboard/cron.py @@ -1,7 +1,7 @@ from datetime import datetime import logging from collections import namedtuple -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union from zoneinfo import ZoneInfo import hjson @@ -23,100 +23,126 @@ logger = logging.getLogger(__name__) -engine = db_util.create_sqlalchemy_engine(settings.DATABASES['default']) -data_warehouse_engine = db_util.create_sqlalchemy_engine(settings.DATABASES['DATA_WAREHOUSE']) +# cron job to populate course and user tables +class DashboardCronJob(CronJobBase): -# Set up queries array from configuration file -CRON_QUERY_FILE = settings.CRON_QUERY_FILE -logger.info(CRON_QUERY_FILE) -try: - with open(CRON_QUERY_FILE) as cron_query_file: - queries = hjson.load(cron_query_file) -except FileNotFoundError: - logger.error( - f'Cannot find cron queries file "{CRON_QUERY_FILE}".') - queries = dict() + schedule = Schedule(run_at_times=settings.RUN_AT_TIMES) + code = 'dashboard.DashboardCronJob' # a unique code -# Split a list into *size* shorter pieces + def setup_queries(self): + # Set up queries array from configuration file + CRON_QUERY_FILE = settings.CRON_QUERY_FILE + logger.info(CRON_QUERY_FILE) + try: + with open(CRON_QUERY_FILE) as cron_query_file: + self.queries = hjson.load(cron_query_file) + except FileNotFoundError: + logger.error(f'Cannot find cron queries file "{CRON_QUERY_FILE}".') + def setup_bigquery(self): + # Instantiates a client + self.bigquery_client = bigquery.Client() -def split_list(a_list: list, size: int = 20): - return [a_list[i:i + size] for i in range(0, len(a_list), size)] + # BQ Total Bytes Billed to report to status + self.total_bytes_billed = 0 -# the util function + def __init__(self) -> None: + """Constructor to be used to declare valid_locked_course_ids instance variable.""" + super().__init__() + self.myla_engine = db_util.create_sqlalchemy_engine(settings.DATABASES['default']) + self.setup_bigquery() + self.setup_queries() + self.valid_locked_course_ids: List[str] + # Split a list into *size* shorter pieces + def split_list(self, a_list: list, size: int = 20): + return [a_list[i:i + size] for i in range(0, len(a_list), size)] -def util_function(sql_string, mysql_table, param_object=None, table_identifier=None): - logger.debug(f'sql={sql_string}') - logger.debug(f'table={mysql_table} param_object={param_object} table_identifier={table_identifier}') - df = pd.read_sql(sql_string, data_warehouse_engine, params=param_object) - # drop duplicates - df = df.drop_duplicates(keep='first') + # This util_function is used to run a query against the context store and insert the result into a MySQL table + def util_function(self, sql_string, mysql_table, bq_job_config:Optional[bigquery.QueryJobConfig]=None, table_identifier=None): + logger.debug(f'sql={sql_string}') + logger.debug(f'table={mysql_table} params={bq_job_config} table_identifier={table_identifier}') - logger.debug(" table: " + mysql_table + " insert size: " + str(df.shape[0])) + df = self.execute_bq_query(sql_string, bq_job_config).to_dataframe() - # write to MySQL - try: - df.to_sql(con=engine, name=mysql_table, if_exists='append', index=False) - except Exception as e: - logger.exception(f"Error running to_sql on table {mysql_table}") - raise + # drop duplicates + df = df.drop_duplicates(keep='first') - # returns the row size of dataframe - return f"{str(df.shape[0])} {mysql_table} : {param_object}\n" + logger.debug(" table: " + mysql_table + " insert size: " + str(df.shape[0])) + # write to MySQL + try: + df.to_sql(con=self.myla_engine, name=mysql_table, if_exists='append', index=False) + except Exception as e: + logger.exception(f"Error running to_sql on table {mysql_table}") + raise -# execute database query -def execute_db_query(query: str, params: Dict = None) -> ResultProxy: - with engine.begin() as connection: - connection.detach() - if params: - return connection.execute(text(query), params) - else: - return connection.execute(text(query)) - - -# remove all records inside the specified table -def delete_all_records_in_table(table_name: str, where_clause: str = "", where_params: Dict = None): - # delete all records in the table first, can have an optional where clause - result_proxy = execute_db_query(f"delete from {table_name} {where_clause}", where_params) - return(f"\n{result_proxy.rowcount} rows deleted from {table_name}\n") - - -def soft_update_datetime_field( - model_inst: models.Model, - field_name: str, - warehouse_field_value: Union[datetime, None], -) -> List[str]: - """ - Uses Django ORM to update DateTime field of model instance if the field value is null and the warehouse data is non-null. - """ - model_name: str = model_inst.__class__.__name__ - current_field_value: Union[datetime, None] = getattr(model_inst, field_name) - # Skipping update if the field already has a value, provided by a previous cron run or administrator - if current_field_value is not None: - logger.info( - f'Skipped update of {field_name} for {model_name} instance ({model_inst.id}); existing value was found') - else: - if warehouse_field_value: - warehouse_field_value = warehouse_field_value.replace(tzinfo=ZoneInfo('UTC')) - setattr(model_inst, field_name, warehouse_field_value) - logger.info(f'Updated {field_name} for {model_name} instance ({model_inst.id})') - return [field_name] - return [] + # returns the row size of dataframe + return f"{str(df.shape[0])} {mysql_table}\n" -# cron job to populate course and user tables -class DashboardCronJob(CronJobBase): + # Execute a query against the bigquery database - schedule = Schedule(run_at_times=settings.RUN_AT_TIMES) - code = 'dashboard.DashboardCronJob' # a unique code + def execute_bq_query(self, query: str, bq_job_config: Optional[bigquery.QueryJobConfig] = None): + # Remove the newlines from the query + query = query.replace("\n", " ") - def __init__(self) -> None: - """Constructor to be used to declare valid_locked_course_ids instance variable.""" - super().__init__() - self.valid_locked_course_ids: List[str] + if bq_job_config: + try: + # Convert to bq schema object + query_job = self.bigquery_client.query(query, job_config=bq_job_config) + query_job_result = query_job.result() + + self.total_bytes_billed += query_job.total_bytes_billed + logger.debug(f"This job had {query_job.total_bytes_billed} bytes. Total: {self.total_bytes_billed}") + return query_job_result + except Exception as e: + logger.error(f"Error ({str(e)}) in setting up schema for query {query}.") + raise Exception(e) + else: + query_job = self.bigquery_client.query(query) + query_job_result = query_job.result() + self.total_bytes_billed += query_job.total_bytes_billed + logger.debug(f"This job had {query_job.total_bytes_billed} bytes. Total: {self.total_bytes_billed}") + return query_job_result + + # Execute a query against the MyLA database + def execute_myla_query(self, query: str, params: Optional[Dict] = None) -> ResultProxy: + with self.myla_engine.begin() as connection: + connection.detach() + if params: + return connection.execute(text(query), params) + else: + return connection.execute(text(query)) + + # remove all records inside the specified table + def execute_myla_delete_query(self, query: str, params: Optional[Dict[str,str]] = None) -> str: + # delete all records in the table first, can have an optional where clause + result_proxy = self.execute_myla_query(query, params) + return(f"\n{result_proxy.rowcount} rows deleted from {query}\n") + + def soft_update_datetime_field( + self, + model_inst: models.Model, + field_name: str, + warehouse_field_value: Union[datetime, None], + ) -> List[str]: + """ + Uses Django ORM to update DateTime field of model instance if the field value is null and the warehouse data is non-null. + """ + model_name: str = model_inst.__class__.__name__ + current_field_value: Union[datetime, None] = getattr(model_inst, field_name) + # Skipping update if the field already has a value, provided by a previous cron run or administrator + if current_field_value is not None: + logger.info( + f'Skipped update of {field_name} for {model_name} instance ({model_inst.id}); existing value was found') + else: + if warehouse_field_value: + setattr(model_inst, field_name, warehouse_field_value) + logger.info(f'Updated {field_name} for {model_name} instance ({model_inst.id})') + return [field_name] + return [] # verify whether course ids are valid def verify_course_ids(self): @@ -125,7 +151,14 @@ def verify_course_ids(self): logger.debug("in checking course") supported_courses = Course.objects.get_supported_courses() course_ids = [str(x) for x in supported_courses.values_list('id', flat=True)] - courses_data = pd.read_sql(queries['course'], data_warehouse_engine, params={'course_ids': course_ids}) + + courses_data = self.execute_bq_query( + self.queries['course'], + bigquery.QueryJobConfig(query_parameters=[ + bigquery.ArrayQueryParameter('course_ids', 'STRING', course_ids), + ]) + ) + courses_data = courses_data.to_dataframe() # error out when course id is invalid, otherwise add DataFrame to list for course_id, data_last_updated in supported_courses: if course_id not in list(courses_data['id']): @@ -145,8 +178,7 @@ def verify_course_ids(self): CourseVerification = namedtuple("CourseVerification", ["invalid_course_ids", "course_data"]) return CourseVerification(invalid_course_id_list, courses_data) - # update USER records from DATA_WAREHOUSE - + # Update the user table with the data from the data warehouse def update_user(self): # cron status @@ -155,19 +187,21 @@ def update_user(self): logger.info("in update with data warehouse user") # delete all records in the table first - status += delete_all_records_in_table("user") + status += self.execute_myla_delete_query("DELETE FROM user") # select all student registered for the course - status += util_function( - queries['user'], - 'user', - {'course_ids': self.valid_locked_course_ids, - 'canvas_data_id_increment': settings.CANVAS_DATA_ID_INCREMENT - }) + status += self.util_function( + self.queries['user'], + 'user', + bigquery.QueryJobConfig(query_parameters=[ + bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,), + bigquery.ScalarQueryParameter('canvas_data_id_increment', 'INT64', settings.CANVAS_DATA_ID_INCREMENT), + ]) + ) return status - # update unizin metadata from DATA_WAREHOUSE + # update unizin metadata from data in the data warehouse def update_unizin_metadata(self): @@ -177,14 +211,14 @@ def update_unizin_metadata(self): logger.debug("in update unizin metadata") # delete all records in the table first - status += delete_all_records_in_table("unizin_metadata") + status += self.execute_myla_delete_query("DELETE FROM unizin_metadata") # select all student registered for the course - metadata_sql = queries['metadata'] + metadata_sql = self.queries['metadata'] logger.debug(metadata_sql) - status += util_function(metadata_sql, 'unizin_metadata') + status += self.util_function(metadata_sql, 'unizin_metadata') return status @@ -198,9 +232,13 @@ def update_canvas_resource(self): # Select all the files for these courses # convert int array to str array - df_attach = pd.read_sql(queries['resource'], - data_warehouse_engine, - params={'course_ids': self.valid_locked_course_ids }) + df_attach = self.execute_bq_query( + self.queries['resource'], + bigquery.QueryJobConfig(query_parameters=[ + bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,), + ]) + ).to_dataframe() + logger.debug(df_attach) # Update these back again based on the dataframe # Remove any rows where file_state is not available! @@ -223,22 +261,15 @@ def update_resource_access(self): # return string with concatenated SQL insert result return_string = "" - if settings.LRS_IS_BIGQUERY: - # Instantiates a client - bigquery_client = bigquery.Client() - - # BQ Total Bytes Billed to report to status - total_bytes_billed = 0 - data_last_updated = Course.objects.filter(id__in=self.valid_locked_course_ids).get_data_earliest_date() logger.info(f"Deleting all records in resource_access after {data_last_updated}") - status += delete_all_records_in_table("resource_access", f"WHERE access_time > :data_last_updated", {'data_last_updated': data_last_updated }) + status += self.execute_myla_delete_query("DELETE FROM resource_access WHERE access_time > :data_last_updated", {'data_last_updated': data_last_updated }) # loop through multiple course ids, 20 at a time # (This is set by the CRON_BQ_IN_LIMIT from settings) - for data_warehouse_course_ids in split_list(self.valid_locked_course_ids, settings.CRON_BQ_IN_LIMIT): + for data_warehouse_course_ids in self.split_list(self.valid_locked_course_ids, settings.CRON_BQ_IN_LIMIT): # query to retrieve all file access events for one course # There is no catch if this query fails, event_store.events needs to exist final_query = [] @@ -279,11 +310,11 @@ def update_resource_access(self): job_config.query_parameters = query_params # Location must match that of the dataset(s) referenced in the query. - bq_job = bigquery_client.query(final_query, location='US', job_config=job_config) + bq_job = self.bigquery_client.query(final_query, location='US', job_config=job_config) # This is the call that could result in an exception - resource_access_df: pd.DataFrame = bq_job.result().to_dataframe() - total_bytes_billed += bq_job.total_bytes_billed - logger.debug(total_bytes_billed) + resource_access_df: pd.DataFrame = bq_job.to_dataframe() + self.total_bytes_billed += bq_job.total_bytes_billed + logger.debug(self.total_bytes_billed) else: query_params = { 'course_ids': data_warehouse_course_ids, @@ -326,7 +357,7 @@ def update_resource_access(self): 'select sis_name as user_login_name,' 'cast(user_id as char) as user_id_str ' f'from user where sis_name in ({login_names})', - engine) + self.myla_engine) logger.debug(f'user_id_df:\n' f'{user_id_df}\n' @@ -396,7 +427,7 @@ def update_resource_access(self): student_enrollment_type = User.EnrollmentType.STUDENT student_enrollment_df = pd.read_sql( 'select user_id, course_id from user where enrollment_type= %s', - engine, params=[(str(student_enrollment_type),)]) + self.myla_engine, params=[(str(student_enrollment_type),)]) resource_access_df = pd.merge( resource_access_df, student_enrollment_df, on=['user_id', 'course_id'], @@ -406,7 +437,7 @@ def update_resource_access(self): # First, update resource table try: dtype = {'resource_id': types.VARCHAR(255)} - pangres.upsert(con=engine, df=resource_df, + pangres.upsert(con=self.myla_engine, df=resource_df, table_name='resource', if_row_exists='update', create_schema=False, add_new_columns=False, dtype=dtype) @@ -416,7 +447,7 @@ def update_resource_access(self): # Next, update resource_access table try: - resource_access_df.to_sql(con=engine, name='resource_access', + resource_access_df.to_sql(con=self.myla_engine, name='resource_access', if_exists='append', index=False) except Exception as e: logger.exception('Error running to_sql on table ' @@ -428,12 +459,6 @@ def update_resource_access(self): map(repr, data_warehouse_course_ids)) + ']\n' logger.info(return_string) - if settings.LRS_IS_BIGQUERY: - total_tbytes_billed = total_bytes_billed / 1024 / 1024 / 1024 / 1024 - # $5 per TB as of Feb 2019 https://cloud.google.com/bigquery/pricing - total_tbytes_price = round(5 * total_tbytes_billed, 2) - status += (f'TBytes billed for BQ: {total_tbytes_billed} = ' - f'${total_tbytes_price}\n') return status def update_groups(self): @@ -443,16 +468,20 @@ def update_groups(self): logger.info("update_groups(): ") # delete all records in assignment_group table - status += delete_all_records_in_table("assignment_groups") + status += self.execute_myla_delete_query("DELETE FROM assignment_groups") # update groups # Loading the assignment groups inforamtion along with weight/points associated ith arn assignment logger.debug("update_assignment_groups(): ") # loop through multiple course ids - status += util_function(queries['assignment_groups'], - 'assignment_groups', - {'course_ids': self.valid_locked_course_ids}) + status += self.util_function( + self.queries['assignment_groups'], + 'assignment_groups', + bigquery.QueryJobConfig(query_parameters=[ + bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,), + ]), + ) return status @@ -463,12 +492,16 @@ def update_assignment(self): logger.info("update_assignment(): ") # delete all records in assignment table - status += delete_all_records_in_table("assignment") + status += self.execute_myla_delete_query("DELETE FROM assignment") # loop through multiple course ids - status += util_function(queries['assignment'], - 'assignment', - {'course_ids': self.valid_locked_course_ids}) + status += self.util_function( + self.queries['assignment'], + 'assignment', + bigquery.QueryJobConfig(query_parameters=[ + bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,), + ]), + ) return status @@ -479,34 +512,23 @@ def submission(self): logger.info("update_submission(): ") - # delete all records in resource_access table - status += delete_all_records_in_table("submission") + # delete all records in submission table + status += self.execute_myla_delete_query("DELETE FROM submission") # loop through multiple course ids # filter out not released grades (submission_dim.posted_at date is not null) and partial grades (submission_dim.workflow_state != 'graded') - query_params = { - 'course_ids': self.valid_locked_course_ids, - 'canvas_data_id_increment': settings.CANVAS_DATA_ID_INCREMENT, - } - Session = sessionmaker(bind=data_warehouse_engine) - try: - # Create a session - with Session() as session: - # Execute the first query to create the temporary table - session.execute(text(queries['submission']).bindparams(**query_params)) + bq_job_config = bigquery.QueryJobConfig(query_parameters=[ + bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,), + bigquery.ScalarQueryParameter('canvas_data_id_increment', 'INT64', settings.CANVAS_DATA_ID_INCREMENT), + ]) - # Execute the second query using the temporary table - result = session.execute(text(queries['submission_with_avg_score'])) - df = pd.DataFrame(result.fetchall(), columns=result.keys()) - df = df.drop_duplicates(keep='first') - df.to_sql(con=engine, name='submission', if_exists='append', index=False) + df = self.execute_bq_query(self.queries['submission'], bq_job_config).to_dataframe() + df = df.drop_duplicates(keep='first') + df.to_sql(con=self.myla_engine, name='submission', if_exists='append', index=False) - except Exception as e: - logger.exception('Error running sql on table submission', str(e)) - raise - status+=f"{str(df.shape[0])} submission: {query_params}\n" + status+=f"{str(df.shape[0])} submission\n" - # returns the row size of dataframe + # returns the row size of dataframe return status def weight_consideration(self): @@ -517,13 +539,16 @@ def weight_consideration(self): logger.info("weight_consideration()") # delete all records in assignment_weight_consideration table - status += delete_all_records_in_table("assignment_weight_consideration") + status += self.execute_myla_delete_query("DELETE FROM assignment_weight_consideration") # loop through multiple course ids - status += util_function(queries['assignment_weight'], - 'assignment_weight_consideration', - {'course_ids': self.valid_locked_course_ids }, - 'weight') + status += self.util_function( + self.queries['assignment_weight'], + 'assignment_weight_consideration', + bigquery.QueryJobConfig(query_parameters=[ + bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,), + ]), + 'weight') logger.debug(status + "\n\n") @@ -536,9 +561,9 @@ def update_term(self) -> str: status: str = '' logger.info('update_term()') - term_sql: str = queries['term'] + term_sql: str = self.queries['term'] logger.debug(term_sql) - warehouse_term_df: pd.DataFrame = pd.read_sql(term_sql, data_warehouse_engine) + warehouse_term_df: pd.DataFrame = self.execute_bq_query(term_sql).to_dataframe() existing_terms_ids: List[int] = [term.id for term in list(AcademicTerms.objects.all())] new_term_ids: List[int] = [int(id) for id in warehouse_term_df['id'].to_list() if id not in existing_terms_ids] @@ -548,7 +573,7 @@ def update_term(self) -> str: else: new_term_df: pd.DataFrame = warehouse_term_df.loc[warehouse_term_df['id'].isin(new_term_ids)] try: - new_term_df.to_sql(con=engine, name='academic_terms', if_exists='append', index=False) + new_term_df.to_sql(con=self.myla_engine, name='academic_terms', if_exists='append', index=False) term_message: str = f'Added {len(new_term_df)} new records to academic_terms table: {new_term_ids}' logger.info(term_message) status += term_message + '\n' @@ -587,15 +612,15 @@ def update_course(self, warehouse_courses_data: pd.DataFrame) -> str: updated_fields.append('term') warehouse_date_start: Union[datetime, None] = ( - warehouse_course_dict['start_at'].to_pydatetime() if pd.notna( + warehouse_course_dict['start_at'] if pd.notna( warehouse_course_dict['start_at']) else None ) - updated_fields += soft_update_datetime_field(course, 'date_start', warehouse_date_start) + updated_fields += self.soft_update_datetime_field(course, 'date_start', warehouse_date_start) warehouse_date_end: Union[datetime, None] = ( - warehouse_course_dict['conclude_at'].to_pydatetime() if pd.notna( + warehouse_course_dict['conclude_at'] if pd.notna( warehouse_course_dict['conclude_at']) else None ) - updated_fields += soft_update_datetime_field(course, 'date_end', warehouse_date_end) + updated_fields += self.soft_update_datetime_field(course, 'date_end', warehouse_date_end) if updated_fields: course.save() @@ -645,7 +670,6 @@ def do(self) -> str: status += self.update_assignment() status += self.submission() status += self.weight_consideration() - logger.info("** resources") if 'show_resources_accessed' not in settings.VIEWS_DISABLED: try: @@ -656,9 +680,8 @@ def do(self) -> str: status += str(e) exception_in_run = True - if settings.DATABASES.get('DATA_WAREHOUSE', {}).get('IS_UNIZIN'): - logger.info("** informational") - status += self.update_unizin_metadata() + logger.info("** informational") + status += self.update_unizin_metadata() all_str_course_ids = set( str(x) for x in Course.objects.get_supported_courses().values_list('id', flat=True) @@ -676,8 +699,14 @@ def do(self) -> str: else: logger.warn("data_last_updated not updated because of an Exception during this run") - status += "End cron: " + str(datetime.now()) + "\n" - logger.info("************ total status=" + status + "\n") + if settings.LRS_IS_BIGQUERY: + total_tbytes_billed = self.total_bytes_billed / 1024 / 1024 / 1024 / 1024 + # $6.25 per TB as of Feb 2024 https://cloud.google.com/bigquery/pricing + total_tbytes_price = round(6.25 * total_tbytes_billed, 2) + status += (f'TBytes billed for BQ: {total_tbytes_billed} = ' + f'${total_tbytes_price}\n') + status += "End cron: " + str(datetime.now()) + "\n" + logger.info("************ total status=" + status + "\n") return status diff --git a/dashboard/settings.py b/dashboard/settings.py index 4eb34406..99acf547 100644 --- a/dashboard/settings.py +++ b/dashboard/settings.py @@ -235,20 +235,7 @@ def apply_env_overrides(env: Dict[str, Any], environ: os._Environ) -> Dict[str, }, }, **ENV.get('MYSQL', {}) - }, - 'DATA_WAREHOUSE': { - **{ - 'ENGINE': 'django.db.backends.postgresql', - 'NAME': '', - 'USER': '', - 'PASSWORD': '', - 'HOST': '', - 'PORT': 5432, - 'OPTIONS': {}, - 'IS_UNIZIN': True - }, - **ENV.get('DATA_WAREHOUSE', {}) - }, + } } # optionally set LRS data source LRS_IS_BIGQUERY = ENV.get('LRS', {}).get('ENGINE', 'google.cloud.bigquery') == 'google.cloud.bigquery' @@ -365,7 +352,7 @@ def apply_env_overrides(env: Dict[str, Any], environ: os._Environ) -> Dict[str, # IMPORT LOCAL ENV # ===================== try: - from settings_local import * + from settings_local import * # type: ignore except ImportError: pass @@ -465,6 +452,6 @@ def apply_env_overrides(env: Dict[str, Any], environ: os._Environ) -> Dict[str, # IMPORT LOCAL ENV # ===================== try: - from settings_local import * + from settings_local import * #type: ignore except ImportError: pass diff --git a/requirements.txt b/requirements.txt index 61a1355b..8ab0509d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,9 +32,8 @@ pandas==2.1.2 pangres==4.2.1 SQLAlchemy==2.0.23 -psycopg==3.1.12 mysqlclient==2.2.4 -google-cloud-bigquery[pandas]==3.13.0 +google-cloud-bigquery[pandas]==3.19.0 debugpy==1.8.0 jsonschema==4.19.2