diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index a74a3a880bf9..4b61ce153f21 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -51,17 +51,20 @@ import java.sql.SQLWarning; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** - * HiveStatement. - * + * The object used for executing a static SQL statement and returning the + * results it produces. */ public class HiveStatement implements java.sql.Statement { - public static final Logger LOG = LoggerFactory.getLogger(HiveStatement.class.getName()); + + private static final Logger LOG = LoggerFactory.getLogger(HiveStatement.class); public static final String QUERY_CANCELLED_MESSAGE = "Query was cancelled."; private static final int DEFAULT_FETCH_SIZE = @@ -71,10 +74,10 @@ public class HiveStatement implements java.sql.Statement { private TCLIService.Iface client; private TOperationHandle stmtHandle = null; private final TSessionHandle sessHandle; - Map sessConf = new HashMap(); + Map sessConf = new HashMap<>(); private int fetchSize; private final int defaultFetchSize; - private boolean isScrollableResultset = false; + private final boolean isScrollableResultset; private boolean isOperationComplete = false; private boolean closeOnResultSetCompletion = false; /** @@ -118,15 +121,9 @@ public class HiveStatement implements java.sql.Statement { */ private boolean isLogBeingGenerated = true; - /** - * Keep this state so we can know whether the statement is submitted to HS2 and start execution - * successfully. - */ - private boolean isExecuteStatementFailed = false; - private int queryTimeout = 0; - private InPlaceUpdateStream inPlaceUpdateStream = InPlaceUpdateStream.NO_OP; + private Optional inPlaceUpdateStream; public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { @@ -146,25 +143,14 @@ public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessi this.isScrollableResultset = isScrollableResultset; this.defaultFetchSize = defaultFetchSize; this.fetchSize = (initFetchSize == 0) ? defaultFetchSize : initFetchSize; + this.inPlaceUpdateStream = Optional.empty(); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#addBatch(java.lang.String) - */ - @Override public void addBatch(String sql) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#cancel() - */ - @Override public void cancel() throws SQLException { checkConnection("cancel"); @@ -181,28 +167,16 @@ public void cancel() throws SQLException { } catch (SQLException e) { throw e; } catch (Exception e) { - throw new SQLException(e.toString(), "08S01", e); + throw new SQLException("Failed to cancel statement", "08S01", e); } isCancelled = true; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#clearBatch() - */ - @Override public void clearBatch() throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#clearWarnings() - */ - @Override public void clearWarnings() throws SQLException { warningChain = null; @@ -223,14 +197,13 @@ private void closeStatementIfNeeded() throws SQLException { } catch (SQLException e) { throw e; } catch (Exception e) { - throw new SQLException(e.toString(), "08S01", e); + throw new SQLException("Failed to close statement", "08S01", e); } } void closeClientOperation() throws SQLException { closeStatementIfNeeded(); isQueryClosed = true; - isExecuteStatementFailed = false; stmtHandle = null; } @@ -241,11 +214,6 @@ void closeOnResultSetCompletion() throws SQLException { } } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#close() - */ @Override public void close() throws SQLException { if (isClosed) { @@ -260,17 +228,11 @@ public void close() throws SQLException { isClosed = true; } - // JDK 1.7 + @Override public void closeOnCompletion() throws SQLException { closeOnResultSetCompletion = true; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#execute(java.lang.String) - */ - @Override public boolean execute(String sql) throws SQLException { runAsyncOnServer(sql); @@ -335,15 +297,12 @@ private void runAsyncOnServer(String sql) throws SQLException { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); - isExecuteStatementFailed = false; } catch (SQLException eS) { - isExecuteStatementFailed = true; isLogBeingGenerated = false; throw eS; } catch (Exception ex) { - isExecuteStatementFailed = true; isLogBeingGenerated = false; - throw new SQLException(ex.toString(), "08S01", ex); + throw new SQLException("Failed to run async statement", "08S01", ex); } } @@ -356,12 +315,12 @@ private TGetOperationStatusResp waitForResultSetStatus() throws SQLException { TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); TGetOperationStatusResp statusResp = null; - while(statusResp == null || !statusResp.isSetHasResultSet()) { + while (statusResp == null || !statusResp.isSetHasResultSet()) { try { statusResp = client.GetOperationStatus(statusReq); } catch (TException e) { isLogBeingGenerated = false; - throw new SQLException(e.toString(), "08S01", e); + throw new SQLException("Failed to wait for result set status", "08S01", e); } } @@ -369,17 +328,16 @@ private TGetOperationStatusResp waitForResultSetStatus() throws SQLException { } TGetOperationStatusResp waitForOperationToComplete() throws SQLException { - TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); - boolean shouldGetProgressUpdate = inPlaceUpdateStream != InPlaceUpdateStream.NO_OP; - statusReq.setGetProgressUpdate(shouldGetProgressUpdate); - if (!shouldGetProgressUpdate) { - /** - * progress bar is completed if there is nothing we want to request in the first place. - */ - inPlaceUpdateStream.getEventNotifier().progressBarCompleted(); - } TGetOperationStatusResp statusResp = null; + final TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); + statusReq.setGetProgressUpdate(inPlaceUpdateStream.isPresent()); + + // Progress bar is completed if there is nothing to request + if (inPlaceUpdateStream.isPresent()) { + inPlaceUpdateStream.get().getEventNotifier().progressBarCompleted(); + } + // Poll on the operation status, till the operation is complete do { try { @@ -388,8 +346,8 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ statusResp = client.GetOperationStatus(statusReq); - if(!isOperationComplete) { - inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse()); + if (!isOperationComplete && inPlaceUpdateStream.isPresent()) { + inPlaceUpdateStream.get().update(statusResp.getProgressUpdateResponse()); } Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { @@ -401,12 +359,10 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { break; case CANCELED_STATE: // 01000 -> warning - String errMsg = statusResp.getErrorMessage(); - if (errMsg != null && !errMsg.isEmpty()) { - throw new SQLException(QUERY_CANCELLED_MESSAGE + " " + errMsg, "01000"); - } else { - throw new SQLException(QUERY_CANCELLED_MESSAGE, "01000"); - } + final String errMsg = statusResp.getErrorMessage(); + final String fullErrMsg = + (errMsg == null || errMsg.isEmpty()) ? QUERY_CANCELLED_MESSAGE : QUERY_CANCELLED_MESSAGE + " " + errMsg; + throw new SQLException(fullErrMsg, "01000"); case TIMEDOUT_STATE: throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds"); case ERROR_STATE: @@ -426,20 +382,20 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { throw e; } catch (Exception e) { isLogBeingGenerated = false; - throw new SQLException(e.toString(), "08S01", e); + throw new SQLException("Failed to wait for operation to complete", "08S01", e); } } while (!isOperationComplete); - /* - we set progress bar to be completed when hive query execution has completed - */ - inPlaceUpdateStream.getEventNotifier().progressBarCompleted(); + // set progress bar to be completed when hive query execution has completed + if (inPlaceUpdateStream.isPresent()) { + inPlaceUpdateStream.get().getEventNotifier().progressBarCompleted(); + } return statusResp; } private void checkConnection(String action) throws SQLException { if (isClosed) { - throw new SQLException("Can't " + action + " after statement has been closed"); + throw new SQLException("Cannot " + action + " after statement has been closed"); } } @@ -452,269 +408,130 @@ private void reInitState() throws SQLException { isCancelled = false; isQueryClosed = false; isLogBeingGenerated = true; - isExecuteStatementFailed = false; isOperationComplete = false; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#execute(java.lang.String, int) - */ - @Override public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#execute(java.lang.String, int[]) - */ - @Override public boolean execute(String sql, int[] columnIndexes) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#execute(java.lang.String, java.lang.String[]) - */ - @Override public boolean execute(String sql, String[] columnNames) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#executeBatch() - */ - @Override public int[] executeBatch() throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#executeQuery(java.lang.String) - */ - @Override public ResultSet executeQuery(String sql) throws SQLException { if (!execute(sql)) { - throw new SQLException("The query did not generate a result set!"); + throw new SQLException("The query did not generate a result set"); } return resultSet; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#executeUpdate(java.lang.String) - */ - @Override public int executeUpdate(String sql) throws SQLException { execute(sql); return getUpdateCount(); - //return getLargeUpdateCount(); - not currently implemented... wrong type } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#executeUpdate(java.lang.String, int) - */ - @Override public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#executeUpdate(java.lang.String, int[]) - */ - @Override public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#executeUpdate(java.lang.String, java.lang.String[]) - */ - @Override public int executeUpdate(String sql, String[] columnNames) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getConnection() - */ - @Override public Connection getConnection() throws SQLException { checkConnection("getConnection"); return this.connection; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getFetchDirection() - */ - @Override public int getFetchDirection() throws SQLException { checkConnection("getFetchDirection"); return ResultSet.FETCH_FORWARD; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getFetchSize() - */ - @Override public int getFetchSize() throws SQLException { checkConnection("getFetchSize"); return fetchSize; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getGeneratedKeys() - */ - @Override public ResultSet getGeneratedKeys() throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getMaxFieldSize() - */ - @Override public int getMaxFieldSize() throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getMaxRows() - */ - @Override public int getMaxRows() throws SQLException { checkConnection("getMaxRows"); return maxRows; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getMoreResults() - */ - @Override public boolean getMoreResults() throws SQLException { return false; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getMoreResults(int) - */ - @Override public boolean getMoreResults(int current) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getQueryTimeout() - */ - @Override public int getQueryTimeout() throws SQLException { checkConnection("getQueryTimeout"); - return 0; + return this.queryTimeout; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getResultSet() - */ - @Override public ResultSet getResultSet() throws SQLException { checkConnection("getResultSet"); return resultSet; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getResultSetConcurrency() - */ - @Override public int getResultSetConcurrency() throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getResultSetHoldability() - */ - @Override public int getResultSetHoldability() throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getResultSetType() - */ - @Override public int getResultSetType() throws SQLException { checkConnection("getResultSetType"); return ResultSet.TYPE_FORWARD_ONLY; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getUpdateCount() - */ @Override public int getUpdateCount() throws SQLException { checkConnection("getUpdateCount"); @@ -723,74 +540,44 @@ public int getUpdateCount() throws SQLException { * client might end up using executeAsync and then call this to check if the query run is * finished. */ - long numModifiedRows = -1; + long numModifiedRows = -1L; TGetOperationStatusResp resp = waitForOperationToComplete(); if (resp != null) { numModifiedRows = resp.getNumModifiedRows(); } - if (numModifiedRows == -1 || numModifiedRows > Integer.MAX_VALUE) { - LOG.warn("Number of rows is greater than Integer.MAX_VALUE"); + if (numModifiedRows == -1L || numModifiedRows > Integer.MAX_VALUE) { + LOG.warn("Invalid number of updated rows: {}", numModifiedRows); return -1; } return (int) numModifiedRows; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#getWarnings() - */ - @Override public SQLWarning getWarnings() throws SQLException { checkConnection("getWarnings"); return warningChain; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#isClosed() - */ - @Override public boolean isClosed() throws SQLException { return isClosed; } - // JDK 1.7 + @Override public boolean isCloseOnCompletion() throws SQLException { return closeOnResultSetCompletion; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#isPoolable() - */ - @Override public boolean isPoolable() throws SQLException { return false; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#setCursorName(java.lang.String) - */ - @Override public void setCursorName(String name) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#setEscapeProcessing(boolean) - */ - @Override public void setEscapeProcessing(boolean enable) throws SQLException { if (enable) { @@ -798,26 +585,14 @@ public void setEscapeProcessing(boolean enable) throws SQLException { } } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#setFetchDirection(int) - */ - @Override public void setFetchDirection(int direction) throws SQLException { checkConnection("setFetchDirection"); if (direction != ResultSet.FETCH_FORWARD) { - throw new SQLException("Not supported direction " + direction); + throw new SQLException("Not supported direction: " + direction); } } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#setFetchSize(int) - */ - @Override public void setFetchSize(int rows) throws SQLException { checkConnection("setFetchSize"); @@ -830,71 +605,35 @@ public void setFetchSize(int rows) throws SQLException { } } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#setMaxFieldSize(int) - */ - @Override public void setMaxFieldSize(int max) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#setMaxRows(int) - */ - @Override public void setMaxRows(int max) throws SQLException { checkConnection("setMaxRows"); if (max < 0) { - throw new SQLException("max must be >= 0"); + throw new SQLException("Maximum number of rows must be >= 0"); } maxRows = max; } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#setPoolable(boolean) - */ - @Override public void setPoolable(boolean poolable) throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } - /* - * (non-Javadoc) - * - * @see java.sql.Statement#setQueryTimeout(int) - */ - @Override public void setQueryTimeout(int seconds) throws SQLException { this.queryTimeout = seconds; } - /* - * (non-Javadoc) - * - * @see java.sql.Wrapper#isWrapperFor(java.lang.Class) - */ - @Override public boolean isWrapperFor(Class iface) throws SQLException { return false; } - /* - * (non-Javadoc) - * - * @see java.sql.Wrapper#unwrap(java.lang.Class) - */ - @Override public T unwrap(Class iface) throws SQLException { throw new SQLException("Cannot unwrap to " + iface); @@ -944,7 +683,6 @@ public List getQueryLog(boolean incremental, int fetchSize) "statement has been closed or cancelled."); } - List logs = new ArrayList(); TFetchResultsResp tFetchResultsResp = null; try { if (stmtHandle != null) { @@ -958,36 +696,30 @@ public List getQueryLog(boolean incremental, int fetchSize) throw new ClosedOrCancelledStatementException("Method getQueryLog() failed. The " + "statement has been closed or cancelled."); } else { - return logs; + return Collections.emptyList(); } } } catch (SQLException e) { throw e; - } catch (TException e) { - throw new SQLException("Error when getting query log: " + e, e); } catch (Exception e) { - throw new SQLException("Error when getting query log: " + e, e); + throw new SQLException("Error when getting query log", e); } + final List logs = new ArrayList<>(); try { - RowSet rowSet; - rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol()); + final RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol()); for (Object[] row : rowSet) { logs.add(String.valueOf(row[0])); } } catch (TException e) { - throw new SQLException("Error building result set for query log: " + e, e); + throw new SQLException("Error building result set for query log", e); } - return logs; + return Collections.unmodifiableList(logs); } private TFetchOrientation getFetchOrientation(boolean incremental) { - if (incremental) { - return TFetchOrientation.FETCH_NEXT; - } else { - return TFetchOrientation.FETCH_FIRST; - } + return (incremental) ? TFetchOrientation.FETCH_NEXT : TFetchOrientation.FETCH_FIRST; } /** @@ -1006,17 +738,18 @@ public String getYarnATSGuid() { } /** - * This is only used by the beeline client to set the stream on which in place progress updates - * are to be shown + * This is only used by the beeline client to set the stream on which in place + * progress updates are to be shown. */ public void setInPlaceUpdateStream(InPlaceUpdateStream stream) { - this.inPlaceUpdateStream = stream; + this.inPlaceUpdateStream = Optional.ofNullable(stream); } /** - * Returns the Query ID if it is running. - * This method is a public API for usage outside of Hive, although it is not part of the - * interface java.sql.Statement. + * Returns the Query ID if it is running. This method is a public API for + * usage outside of Hive, although it is not part of the interface + * java.sql.Statement. + * * @return Valid query ID if it is running else returns NULL. * @throws SQLException If any internal failures. */ @@ -1030,7 +763,7 @@ public String getQueryId() throws SQLException { return null; } try { - String queryId = client.GetQueryId(new TGetQueryIdReq(stmtHandleTmp)).getQueryId(); + final String queryId = client.GetQueryId(new TGetQueryIdReq(stmtHandleTmp)).getQueryId(); // queryId can be empty string if query was already closed. Need to return null in such case. return StringUtils.isBlank(queryId) ? null : queryId; diff --git a/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java index cb5fb8221426..310c4969d268 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java +++ b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java @@ -24,20 +24,6 @@ public interface InPlaceUpdateStream { void update(TProgressUpdateResp response); - InPlaceUpdateStream NO_OP = new InPlaceUpdateStream() { - private final EventNotifier eventNotifier = new EventNotifier(); - @Override - public void update(TProgressUpdateResp response) { - - } - - @Override - public EventNotifier getEventNotifier() { - return eventNotifier; - } - - }; - EventNotifier getEventNotifier(); class EventNotifier {