Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.databricks.jdbc.api.impl.converters.ArrowToJavaObjectConverter;
import com.databricks.jdbc.common.CompressionCodec;
import com.databricks.jdbc.common.HTTPRequestType;
import com.databricks.jdbc.common.util.DecompressionUtil;
import com.databricks.jdbc.common.util.DriverUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
Expand Down Expand Up @@ -272,7 +273,7 @@ void downloadData(IDatabricksHttpClient httpClient, CompressionCodec compression
HttpGet getRequest = new HttpGet(uriBuilder.build());
addHeaders(getRequest, chunkLink.getHttpHeaders());
// Retry would be done in http client, we should not bother about that here
response = httpClient.execute(getRequest, true);
response = httpClient.execute(getRequest, HTTPRequestType.CLOUD_FETCH, true);
checkHTTPError(response);
String decompressionContext =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.databricks.jdbc.api.impl.VolumeOperationStatus;
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.DatabricksClientConfiguratorManager;
import com.databricks.jdbc.common.HttpClientType;
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
import com.databricks.jdbc.common.util.StringUtil;
import com.databricks.jdbc.common.util.VolumeUtil;
Expand Down Expand Up @@ -63,8 +62,7 @@ public DBFSVolumeClient(IDatabricksConnectionContext connectionContext) {
this.workspaceClient = getWorkspaceClientFromConnectionContext(connectionContext);
this.apiClient = workspaceClient.apiClient();
this.databricksHttpClient =
DatabricksHttpClientFactory.getInstance()
.getClient(connectionContext, HttpClientType.VOLUME);
DatabricksHttpClientFactory.getInstance().getClient(connectionContext);
this.allowedVolumeIngestionPaths = connectionContext.getVolumeOperationAllowedPaths();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import static com.databricks.jdbc.common.util.StringUtil.escapeStringLiteral;

import com.databricks.jdbc.api.IDatabricksVolumeClient;
import com.databricks.jdbc.api.internal.IDatabricksConnectionInternal;
import com.databricks.jdbc.api.internal.IDatabricksResultSetInternal;
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
import com.databricks.jdbc.common.HTTPRequestType;
import com.databricks.jdbc.common.util.StringUtil;
import com.databricks.jdbc.dbclient.IDatabricksClient;
import com.databricks.jdbc.dbclient.impl.thrift.DatabricksThriftServiceClient;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import java.io.InputStream;
Expand All @@ -33,6 +37,41 @@ public DatabricksUCVolumeClient(Connection connection) {
this.connection = connection;
}

/**
* Helper method to set HTTP request configuration for volume operations based on operation type.
* Only works with Thrift client connections; SDK clients use their own retry logic.
*/
private void setVolumeOperationRequestType(HTTPRequestType requestType) {
try {
IDatabricksConnectionInternal databricksConnection =
connection.unwrap(IDatabricksConnectionInternal.class);

// Check if unwrap returned null (can happen in tests or with non-Databricks connections)
if (databricksConnection == null) {
LOGGER.debug("Connection unwrap returned null - skipping HTTP config setup");
return;
}

IDatabricksClient client = databricksConnection.getSession().getDatabricksClient();

// Only set config for Thrift clients (SDK clients handle their own retry logic)
if (client instanceof DatabricksThriftServiceClient) {
DatabricksThriftServiceClient thriftClient = (DatabricksThriftServiceClient) client;
thriftClient.setHttpRequestType(requestType);
LOGGER.debug("Set volume operation HTTP request type: {} for Thrift client", requestType);
} else {
LOGGER.debug(
"SDK client detected - using SDK's built-in retry logic for volume operations");
}
} catch (Exception e) {
// Catch all exceptions (SQLException, NullPointerException, etc.)
// This ensures volume operations continue to work even if HTTP config setup fails
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will the retry logic work in this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wont work in this case, the request type will be NOT_SET which will be treated as non idempotent.

LOGGER.debug(
"Could not set volume operation config (test environment or non-Databricks connection?): {}",
e.getMessage());
}
}

private static String getVolumePath(String catalog, String schema, String volume) {
// We need to escape '' to prevent SQL injection
return escapeStringLiteral(String.format("/Volumes/%s/%s/%s/", catalog, schema, volume));
Expand Down Expand Up @@ -122,6 +161,7 @@ public boolean prefixExists(
String listFilesSQLQuery = createListQuery(catalog, schema, volume, folder);

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_LIST);
try (ResultSet resultSet = statement.executeQuery(listFilesSQLQuery)) {
LOGGER.debug("SQL query executed successfully");
boolean exists = false;
Expand Down Expand Up @@ -165,6 +205,7 @@ public boolean objectExists(
String listFilesSQLQuery = createListQuery(catalog, schema, volume, folder);

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_LIST);
try (ResultSet resultSet = statement.executeQuery(listFilesSQLQuery)) {
LOGGER.info("SQL query executed successfully");
boolean exists = false;
Expand Down Expand Up @@ -210,6 +251,7 @@ public boolean volumeExists(
String showVolumesSQLQuery = createShowVolumesQuery(catalog, schema);

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_SHOW_VOLUMES);
try (ResultSet resultSet = statement.executeQuery(showVolumesSQLQuery)) {
LOGGER.info("SQL query executed successfully");
boolean exists = false;
Expand Down Expand Up @@ -270,6 +312,7 @@ public List<String> listObjects(
String listFilesSQLQuery = createListQuery(catalog, schema, volume, folder);

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_LIST);
try (ResultSet resultSet = statement.executeQuery(listFilesSQLQuery)) {
LOGGER.info("SQL query executed successfully");
List<String> filenames = new ArrayList<>();
Expand Down Expand Up @@ -305,6 +348,7 @@ public boolean getObject(
boolean volumeOperationStatus = false;

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_GET);
try (ResultSet resultSet = statement.executeQuery(getObjectQuery)) {
LOGGER.info("GET query executed successfully");
if (resultSet.next()) {
Expand Down Expand Up @@ -334,6 +378,7 @@ public InputStreamEntity getObject(
String getObjectQuery = createGetObjectQueryForInputStream(catalog, schema, volume, objectPath);

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_GET);
IDatabricksStatementInternal databricksStatement =
statement.unwrap(IDatabricksStatementInternal.class);
databricksStatement.allowInputStreamForVolumeOperation(true);
Expand Down Expand Up @@ -374,6 +419,7 @@ public boolean putObject(
boolean isOperationSucceeded = false;

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_PUT);
try (ResultSet resultSet = statement.executeQuery(putObjectQuery)) {
LOGGER.info("PUT query executed successfully");
if (resultSet.next()) {
Expand Down Expand Up @@ -413,6 +459,7 @@ public boolean putObject(
boolean isOperationSucceeded = false;

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_PUT);
IDatabricksStatementInternal databricksStatement =
statement.unwrap(IDatabricksStatementInternal.class);
databricksStatement.allowInputStreamForVolumeOperation(true);
Expand Down Expand Up @@ -449,6 +496,7 @@ public boolean deleteObject(String catalog, String schema, String volume, String
boolean isOperationSucceeded = false;

try (Statement statement = connection.createStatement()) {
setVolumeOperationRequestType(HTTPRequestType.VOLUME_DELETE);
try (ResultSet resultSet = statement.executeQuery(deleteObjectQuery)) {
LOGGER.info("SQL query executed successfully");
if (resultSet.next()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.jdbc.api.impl.volume;

import com.databricks.jdbc.api.impl.VolumeOperationStatus;
import com.databricks.jdbc.common.HTTPRequestType;
import com.databricks.jdbc.common.util.HttpUtil;
import com.databricks.jdbc.common.util.VolumeUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
Expand Down Expand Up @@ -237,7 +238,7 @@ void executeGetOperation() {
try {
// We return the input stream directly to clients, if they want to consume as input stream
if (isAllowedInputStreamForVolumeOperation) {
responseStream = databricksHttpClient.execute(httpGet);
responseStream = databricksHttpClient.execute(httpGet, HTTPRequestType.VOLUME_GET);
if (!HttpUtil.isSuccessfulHttpResponse(responseStream)) {
status = VolumeOperationStatus.FAILED;
errorMessage =
Expand Down Expand Up @@ -270,7 +271,8 @@ void executeGetOperation() {
return;
}

try (CloseableHttpResponse response = databricksHttpClient.execute(httpGet)) {
try (CloseableHttpResponse response =
databricksHttpClient.execute(httpGet, HTTPRequestType.VOLUME_GET)) {
if (!HttpUtil.isSuccessfulHttpResponse(response)) {
LOGGER.error(
"Failed to fetch content from volume with error {%s} for local file {%s}",
Expand Down Expand Up @@ -340,7 +342,8 @@ void executePutOperation() {
}

// Execute the request
try (CloseableHttpResponse response = databricksHttpClient.execute(httpPut)) {
try (CloseableHttpResponse response =
databricksHttpClient.execute(httpPut, HTTPRequestType.VOLUME_PUT)) {
// Process the response
if (HttpUtil.isSuccessfulHttpResponse(response)) {
status = VolumeOperationStatus.SUCCEEDED;
Expand Down Expand Up @@ -387,7 +390,8 @@ private void executeDeleteOperation() {
// TODO: Implement AWS-specific logic if required
HttpDelete httpDelete = new HttpDelete(operationUrl);
headers.forEach(httpDelete::addHeader);
try (CloseableHttpResponse response = databricksHttpClient.execute(httpDelete)) {
try (CloseableHttpResponse response =
databricksHttpClient.execute(httpDelete, HTTPRequestType.VOLUME_DELETE)) {
if (HttpUtil.isSuccessfulHttpResponse(response)) {
status = VolumeOperationStatus.SUCCEEDED;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.databricks.jdbc.api.impl.VolumeOperationStatus;
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
import com.databricks.jdbc.common.HttpClientType;
import com.databricks.jdbc.common.util.JsonUtil;
import com.databricks.jdbc.common.util.VolumeUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
Expand Down Expand Up @@ -55,8 +54,7 @@ public VolumeOperationResult(
this.resultHandler = resultHandler;
this.statement = statement;
this.httpClient =
DatabricksHttpClientFactory.getInstance()
.getClient(session.getConnectionContext(), HttpClientType.VOLUME);
DatabricksHttpClientFactory.getInstance().getClient(session.getConnectionContext());
this.currentRowIndex = -1;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.databricks.jdbc.auth;

import com.databricks.jdbc.common.HTTPRequestType;
import com.databricks.jdbc.common.util.JsonUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
import com.databricks.jdbc.exception.DatabricksHttpException;
Expand Down Expand Up @@ -137,7 +138,7 @@ private static Token retrieveToken(
HttpGet getRequest = new HttpGet(uriBuilder.build());
headers.forEach(getRequest::setHeader);
LOGGER.debug("Executing GET request to retrieve Azure MSI token");
HttpResponse response = hc.execute(getRequest);
HttpResponse response = hc.execute(getRequest, HTTPRequestType.AUTH);
OAuthResponse resp =
JsonUtil.getMapper().readValue(response.getEntity().getContent(), OAuthResponse.class);
LocalDateTime expiry = LocalDateTime.now().plus(resp.getExpiresIn(), ChronoUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.DatabricksJdbcConstants;
import com.databricks.jdbc.common.HTTPRequestType;
import com.databricks.jdbc.common.util.DriverUtil;
import com.databricks.jdbc.common.util.JsonUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
Expand Down Expand Up @@ -207,7 +208,7 @@ Token retrieveToken(
.collect(Collectors.toList()),
StandardCharsets.UTF_8));
headers.forEach(postRequest::setHeader);
HttpResponse response = hc.execute(postRequest);
HttpResponse response = hc.execute(postRequest, HTTPRequestType.AUTH);
OAuthResponse resp =
JsonUtil.getMapper().readValue(response.getEntity().getContent(), OAuthResponse.class);
return createToken(resp.getAccessToken(), resp.getTokenType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.nimbusds.jose.JWSAlgorithm.*;

import com.databricks.jdbc.common.HTTPRequestType;
import com.databricks.jdbc.common.util.DriverUtil;
import com.databricks.jdbc.common.util.JsonUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
Expand Down Expand Up @@ -179,7 +180,7 @@ protected static Token retrieveToken(
.collect(Collectors.toList()),
StandardCharsets.UTF_8));
headers.forEach(postRequest::setHeader);
HttpResponse response = hc.execute(postRequest);
HttpResponse response = hc.execute(postRequest, HTTPRequestType.AUTH);
OAuthResponse resp =
JsonUtil.getMapper().readValue(response.getEntity().getContent(), OAuthResponse.class);
LocalDateTime expiry = LocalDateTime.now().plus(resp.getExpiresIn(), ChronoUnit.SECONDS);
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/databricks/jdbc/common/HTTPRequestType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.databricks.jdbc.common;

public enum HTTPRequestType {
NOT_SET,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTPRequestType should be UNKNOWN instead of NOT_SET (I guess the weird nomenclature is because you're actually using RequestType as a state for the http client).

THRIFT_OPEN_SESSION,
THRIFT_CLOSE_SESSION,
THRIFT_METADATA,
THRIFT_CLOSE_OPERATION,
THRIFT_CANCEL_OPERATION,
THRIFT_EXECUTE_STATEMENT,
THRIFT_FETCH_RESULTS,
CLOUD_FETCH,
VOLUME_LIST,
VOLUME_SHOW_VOLUMES,
VOLUME_GET,
VOLUME_PUT,
VOLUME_DELETE,
AUTH,
TELEMETRY_PUSH,
OTHER
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this used for?

};
15 changes: 0 additions & 15 deletions src/main/java/com/databricks/jdbc/common/HttpClientType.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.DatabricksClientConfiguratorManager;
import com.databricks.jdbc.common.HTTPRequestType;
import com.databricks.jdbc.common.util.DriverUtil;
import com.databricks.jdbc.common.util.JsonUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
Expand Down Expand Up @@ -92,7 +93,7 @@ private void refreshAllFeatureFlags() {
@VisibleForTesting
void fetchAndSetFlagsFromServer(IDatabricksHttpClient httpClient, HttpGet request)
throws DatabricksHttpException, IOException {
try (CloseableHttpResponse response = httpClient.execute(request)) {
try (CloseableHttpResponse response = httpClient.execute(request, HTTPRequestType.AUTH)) {
if (response.getStatusLine().getStatusCode() == 200) {
String responseBody = EntityUtils.toString(response.getEntity());
FeatureFlagsResponse featureFlagsResponse =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.databricks.jdbc.dbclient;

import com.databricks.jdbc.common.HTTPRequestType;
import com.databricks.jdbc.exception.DatabricksHttpException;
import java.util.concurrent.Future;
import org.apache.hc.core5.concurrent.FutureCallback;
Expand All @@ -11,13 +12,20 @@
/** Http client interface for executing http requests. */
public interface IDatabricksHttpClient {

// gets the current http request config
HTTPRequestType getCurrentRequestType();

// sets the current http request config
void setCurrentRequestType(HTTPRequestType requestType);

/**
* Executes the given http request and returns the response
*
* @param request underlying http request
* @return http response
*/
CloseableHttpResponse execute(HttpUriRequest request) throws DatabricksHttpException;
CloseableHttpResponse execute(HttpUriRequest request, HTTPRequestType requestType)
throws DatabricksHttpException;

/**
* Executes the given http request and returns the response
Expand All @@ -26,7 +34,8 @@ public interface IDatabricksHttpClient {
* @param supportGzipEncoding whether to support gzip encoding header
* @return http response
*/
CloseableHttpResponse execute(HttpUriRequest request, boolean supportGzipEncoding)
CloseableHttpResponse execute(
HttpUriRequest request, HTTPRequestType requestType, boolean supportGzipEncoding)
throws DatabricksHttpException;

/**
Expand Down
Loading