Skip to content

Commit

Permalink
Add backoff during DatabaseChunkedReader queries (#976)
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan authored Feb 27, 2024
1 parent 939d874 commit 453659e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class DatabaseChunkedReader implements Closeable {

private DatabaseChunkedReaderMetrics _metrics;

private long _statementExecutionDelayMs;
private long _lastStatementExecutionMs = 0;

/**
* Create a DatabaseChunkedReader instance
* @param props Configuration
Expand Down Expand Up @@ -123,6 +126,7 @@ public DatabaseChunkedReader(Properties props, Connection connection, String sou
_connection = connection;
_chunkedQueryManager = _databaseChunkedReaderConfig.getChunkedQueryManager();
_skipBadMessagesEnabled = _databaseChunkedReaderConfig.getShouldSkipBadMessage();
_statementExecutionDelayMs = _databaseChunkedReaderConfig.getStatementExecutionDelay();

if (StringUtils.isBlank(db)) {
_database = _connection.getMetaData().getUserName();
Expand Down Expand Up @@ -183,8 +187,18 @@ private void prepareChunkedQuery(PreparedStatement stmt, List<Object> keys) thro
}

private void executeChunkedQuery(PreparedStatement stmt) throws SQLException {
long executionDelayMs = System.currentTimeMillis() - _lastStatementExecutionMs;
if (executionDelayMs < _statementExecutionDelayMs) {
try {
Thread.sleep(Math.max(0, _statementExecutionDelayMs - executionDelayMs));
} catch (InterruptedException e) {
throw new DatastreamRuntimeException("Failed to Thread.sleep() before next statement execution", e);
}
}

long timeStart = System.currentTimeMillis();
_queryResultSet = stmt.executeQuery();
_lastStatementExecutionMs = System.currentTimeMillis();
_metrics.updateQueryExecutionDuration(System.currentTimeMillis() - timeStart);
_metrics.updateQueryExecutionRate();
}
Expand Down Expand Up @@ -344,6 +358,10 @@ public DatabaseRow poll() throws SQLException {
return row;
}

public long getLastStatementExecutionMs() {
return _lastStatementExecutionMs;
}

/**
* Only API that will not rethrow SQLException. Will swallow error and print an error log.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class DatabaseChunkedReaderConfig {
// cache the query results and only return fetchSize rows to the driver.
public static final String FETCH_SIZE = "fetchSize";
public static final String SKIP_BAD_MESSAGE = "skipBadMessage";
// Adds a delay between DB connection statement executions. This is useful to throttle Brooklin queries.
public static final String STATEMENT_EXECUTION_DELAY = "statementExecutionDelay";
// Max number of rows to fetch for each query. This will help the server limit the number of full row
// fetches that it has to do. For example in Oracle, a ROWNUM <= 1000 will add a stopKey constraint where the DB will
// only look for first 1000 matches that match the specified constraints and will do a full row fetch only for these.
Expand All @@ -38,12 +40,14 @@ public class DatabaseChunkedReaderConfig {
private static final int DEFAULT_FETCH_SIZE = 10000;
private static final long DEFAULT_ROW_COUNT_LIMIT = 50000;
private static final boolean DEFAULT_SKIP_BAD_MESSAGE = false;
private static final long DEFAULT_STATEMENT_EXECUTION_DELAY = 0;

private final int _queryTimeout;
private final int _fetchSize;
private final long _rowCountLimit;
private ChunkedQueryManager _chunkedQueryManager;
private final boolean _shouldSkipBadMessage;
private final long _statementExecutionDelay;

/**
* Constructor for DatabaseChunkedReaderConfig
Expand All @@ -61,6 +65,7 @@ public DatabaseChunkedReaderConfig(Properties properties) {
Validate.inclusiveBetween(0, Long.MAX_VALUE, _fetchSize);
Validate.inclusiveBetween(0, Long.MAX_VALUE, _fetchSize);
_shouldSkipBadMessage = verifiableProperties.getBoolean(SKIP_BAD_MESSAGE, DEFAULT_SKIP_BAD_MESSAGE);
_statementExecutionDelay = verifiableProperties.getLong(STATEMENT_EXECUTION_DELAY, DEFAULT_STATEMENT_EXECUTION_DELAY);

String queryManagerClass = verifiableProperties.getString(DATABASE_QUERY_MANAGER_CLASS_NAME);
if (StringUtils.isBlank(queryManagerClass)) {
Expand Down Expand Up @@ -92,4 +97,8 @@ public ChunkedQueryManager getChunkedQueryManager() {
public boolean getShouldSkipBadMessage() {
return _shouldSkipBadMessage;
}

public long getStatementExecutionDelay() {
return _statementExecutionDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ void testSkipBadMessages() throws SQLException, SchemaGenerationException {
reader.subscribe(Collections.singletonList(0), null);
for (DatabaseRow row = reader.poll(); row != null; row = reader.poll()) {
Assert.assertEquals(row, new DatabaseRow(Collections.singletonList(field)));
Assert.assertTrue(reader.getLastStatementExecutionMs() > 0);
count++;
}
Assert.assertEquals(2, count);
Expand Down

0 comments on commit 453659e

Please sign in to comment.