Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-1340] Logging added to source ns #475

Merged
merged 4 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Its syntax is given by https://logging.apache.org/log4j/2.x/manual/configuration
-->
<Configuration status="WARN">
<Properties>
<Property name="pattern">%d{hh:mm:ss} %-5level %logger{36} - %msg%n</Property>
<Property name="pattern">%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n</Property>
okotsopoulos marked this conversation as resolved.
Show resolved Hide resolved
</Properties>
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
Expand Down
1 change: 0 additions & 1 deletion api/src/wfl/environment.clj
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,4 @@
(defn getenv
"Lookup the value of the environment variable specified by `name`."
[name]
(log/debugf "Reading environment variable %s" name)
okotsopoulos marked this conversation as resolved.
Show resolved Hide resolved
(or (@testing name) (__getenv name)))
4 changes: 2 additions & 2 deletions api/src/wfl/jdbc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@
[binding & body]
`(let [id# (rand-int 10000)
init# ~(second binding)]
(log/info "JDBC transaction" id# "started to" (format-db init#))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's trace logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR - In Log4j, 2 levels below INFO, providing finer-grained information compared to DEBUG.

Log4j's log level hierarchy includes the following:

…
INFO
DEBUG
TRACE
…

But @rfricke-asymmetrik just pinged me to say that Log4j will be going away completely in his work, and with that we'll also see a change in available log levels. We will lose TRACE but gain NOTICE:

…
DEBUG | (100) Debug or trace information.
INFO | (200) Routine information, such as ongoing status or performance.
NOTICE | (300) Normal but significant events, such as start up, shut down, or a configuration change.
…

My recommendation there will be to audit the categorization of existing Log4j TRACE, DEBUG, INFO logs to determine the new appropriate level.

For background why I downgraded this two levels: latest development has us spinning up transactions more frequently rather than passing around existing transactions, so the volume of these logs obscured useful information without adding value. I stopped short of removing completely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - @rfricke-asymmetrik maybe we can restore trace? That now seems useful. Sorry - I know I asked you to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I completely agree this information isn't very helpful. Downgrading was the right decision. Maybe we can make reading environment variables traces too!?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I completely agree this information isn't very helpful. Downgrading was the right decision. Maybe we can make reading environment variables traces too!?

Jinx :) can do.

Copy link
Contributor Author

@okotsopoulos okotsopoulos Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - @rfricke-asymmetrik maybe we can restore trace? That now seems useful. Sorry - I know I asked you to do that.

I actually don't mind trace going away as long as we commit to thoughtfully recategorizing existing logs at or below INFO. Stackdriver's distinction between INFO and NOTICE is more instructive compared to "relatively how noisy do you want everything to be" haha

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trace is not an option provided by Google Logging. So even if we were to have a trace macro in the log namespace, it would just reference a different severity level like NOTICE or DEBUG.

Copy link
Member

@ehigham ehigham Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! I mean, I'm happy with DEBUG being noisy because we'd want a detailed record of what's going on when we turn that on. I can see the value of an additional level of verbosity. I don't really care what it's called though - trace, verbose, notice, susan, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rfricke-asymmetrik let's use NOTICE then :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, there is a Trace field that can be added to the json log. So you can associate logs with a specific trace resource id from the Google Trace API.

(log/trace "JDBC transaction" id# "started to" (format-db init#))
(let [exe# (jdbc/with-db-transaction [~(first binding) init#] ~@body)]
(log/info "JDBC SQL transaction" id# "ended")
(log/trace "JDBC SQL transaction" id# "ended")
exe#)))

(defmacro prepare-statement
Expand Down
5 changes: 3 additions & 2 deletions api/src/wfl/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(let [{:keys [watchers] :as workload}
(workloads/load-workload-for-id tx id)]
(log/infof "Updating workload %s" uuid)
(try
(workloads/update-workload! tx workload)
(catch UserException e
Expand All @@ -99,11 +100,11 @@
(try
(do-update! workload)
(catch Throwable t
(log/error "Failed to update workload %s" uuid)
(log/errorf "Failed to update workload %s" uuid)
(log/error t))))
(update-workloads []
(try
(log/info "updating workloads")
(log/info "Finding workloads to update...")
(run! try-update
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(jdbc/query tx "SELECT id,uuid FROM workload
Expand Down
77 changes: 49 additions & 28 deletions api/src/wfl/source.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
(ns wfl.source
(:require [clojure.edn :as edn]
[clojure.spec.alpha :as s]
[clojure.set :as set]
[clojure.tools.logging.readable :as log]
[wfl.api.workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.service.datarepo :as datarepo]
[wfl.service.postgres :as postgres]
[wfl.stage :as stage]
[wfl.util :as util :refer [utc-now]]
[wfl.module.all :as all])
(:require [clojure.edn :as edn]
[clojure.spec.alpha :as s]
[clojure.set :as set]
[clojure.tools.logging :as log]
[wfl.api.workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.service.datarepo :as datarepo]
[wfl.service.postgres :as postgres]
[wfl.stage :as stage :refer [log-prefix]]
[wfl.util :as util :refer [utc-now]]
[wfl.module.all :as all])
(:import [clojure.lang ExceptionInfo]
[java.sql Timestamp]
[java.time OffsetDateTime ZoneId]
Expand Down Expand Up @@ -149,11 +149,14 @@

(defn ^:private find-new-rows
"Find new rows in TDR by querying the dataset between the `interval`."
[{:keys [dataset table column] :as _source} interval]
(-> dataset
(datarepo/query-table-between table column interval [:datarepo_row_id])
:rows
flatten))
[{:keys [dataset table column] :as source}
[start end :as interval]]
(do (log/debugf "%s Looking for rows in %s.%s between [%s, %s]..."
(log-prefix source) (:name dataset) table start end)
(-> dataset
(datarepo/query-table-between table column interval [:datarepo_row_id])
:rows
flatten)))
okotsopoulos marked this conversation as resolved.
Show resolved Hide resolved

(defn ^:private go-create-snapshot!
"Create snapshot in TDR from `dataset` body, `table` and `row-ids` then
Expand Down Expand Up @@ -218,7 +221,9 @@
"Write the shards and corresponding snapshot creation jobs from
`shards->snapshot-jobs` into source `details` table, with the frozen `now`.
Also initialize all jobs statuses to running."
[{:keys [last_checked details] :as _source} now shards->snapshot-jobs]
[{:keys [last_checked details] :as source}
now
shards->snapshot-jobs]
okotsopoulos marked this conversation as resolved.
Show resolved Hide resolved
(letfn [(make-record [[shard id]]
{:snapshot_creation_job_id id
:snapshot_creation_job_status "running"
Expand All @@ -228,7 +233,9 @@
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(->> shards->snapshot-jobs
(map make-record)
(jdbc/insert-multi! tx details)))))
(jdbc/insert-multi! tx details)))
(log/debugf "%s Snapshot creation jobs written."
(log-prefix source))))

(defn ^:private update-last-checked
"Update the `last_checked` field in source table with
Expand All @@ -249,21 +256,35 @@

(defn ^:private find-and-snapshot-new-rows
"Create and enqueue snapshots from new rows in the `source` dataset."
[{:keys [last_checked] :as source} utc-now]
(let [shards->jobs (->> [(timestamp-to-offsetdatetime last_checked) utc-now]
(mapv #(.format % bigquery-datetime-format))
(find-new-rows source)
[{:keys [dataset table last_checked] :as source}
utc-now]
(let [[start end :as interval]
(->> [(timestamp-to-offsetdatetime last_checked) utc-now]
(mapv #(.format % bigquery-datetime-format)))
shards->jobs (->> (find-new-rows source interval)
(create-snapshots source utc-now))]
(when (seq shards->jobs)
(write-snapshots-creation-jobs source utc-now shards->jobs)
(update-last-checked source utc-now))))
(if (seq shards->jobs)
(do (log/infof "%s Snapshots created from new rows in %s.%s."
(log-prefix source) (:name dataset) table)
(write-snapshots-creation-jobs source utc-now shards->jobs)
(update-last-checked source utc-now))
(log/debugf "%s No rows in %s.%s between [%s, %s]."
(log-prefix source) (:name dataset) table start end))))
okotsopoulos marked this conversation as resolved.
Show resolved Hide resolved

(defn ^:private update-pending-snapshot-jobs
"Update the status of TDR snapshot jobs that are still 'running'."
[source]
(->> (get-pending-tdr-jobs source)
(map #(update % 1 check-tdr-job))
(run! #(write-snapshot-id source %))))
(log/debugf "%s Looking for running snapshot jobs to update..."
(log-prefix source))
(let [pending-tdr-jobs (get-pending-tdr-jobs source)]
(if (seq pending-tdr-jobs)
(do (->> pending-tdr-jobs
(map #(update % 1 check-tdr-job))
(run! #(write-snapshot-id source %)))
(log/debugf "%s Running snapshot jobs updated."
(log-prefix source)))
(log/debugf "%s No running snapshot jobs to update."
(log-prefix source)))))

(defn ^:private update-tdr-source
"Check for new data in TDR from `source`, create new snapshots,
Expand Down
9 changes: 7 additions & 2 deletions api/src/wfl/stage.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns wfl.stage
"An interface for operations on a queue-based pipeline processing stage,
e.g. source, executor, or sink."
"Interface and methods for operations on a queue-based
pipeline processing stage, e.g. source, executor, or sink."
(:require [wfl.util :as util])
(:import [wfl.util UserException]))

Expand Down Expand Up @@ -28,3 +28,8 @@
(defmulti done?
"Test if the processing `stage` is complete and will not process any more data."
:type)

(defn log-prefix
"Prefix string for `stage` logs indicating the `type` (table) and row `id`."
[{:keys [type id] :as _stage}]
(format "[%s id=%s]" type id))