Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,50 @@ build-all:
test: build-tests
cd $(BUILD_DIR) && ctest --output-on-failure

# Run only tests related to modified files (based on git status)
.PHONY: test-new
test-new: build-tests
@echo "Running tests for modified files..."
@MODIFIED_FILES=$$(git status --short | grep -E '^\s*M.*\.(cpp|h)' | awk '{print $$2}' | grep -E 'test|jobs|compute|unity' || true); \
if [ -z "$$MODIFIED_FILES" ]; then \
echo "No modified test files detected. Running all tests..."; \
cd $(BUILD_DIR)/tests && ./unit_tests; \
else \
echo "Modified files: $$MODIFIED_FILES"; \
if echo "$$MODIFIED_FILES" | grep -q "jobs"; then \
echo "Running Jobs tests..."; \
cd $(BUILD_DIR)/tests && ./unit_tests --gtest_filter='*Job*'; \
elif echo "$$MODIFIED_FILES" | grep -q "compute"; then \
echo "Running Compute tests..."; \
cd $(BUILD_DIR)/tests && ./unit_tests --gtest_filter='*Compute*:*Cluster*'; \
elif echo "$$MODIFIED_FILES" | grep -q "unity"; then \
echo "Running Unity Catalog tests..."; \
cd $(BUILD_DIR)/tests && ./unit_tests --gtest_filter='*UnityCatalog*'; \
else \
echo "Running all tests for safety..."; \
cd $(BUILD_DIR)/tests && ./unit_tests; \
fi \
fi

# Run tests with custom filter
# Usage: make test-filter FILTER='JobsApiTest.*'
.PHONY: test-filter
test-filter: build-tests
@if [ -z "$(FILTER)" ]; then \
echo "Usage: make test-filter FILTER='pattern'"; \
echo "Example: make test-filter FILTER='JobsApiTest.*'"; \
echo "Example: make test-filter FILTER='*Cancel*'"; \
exit 1; \
fi
@echo "Running tests matching filter: $(FILTER)"
cd $(BUILD_DIR)/tests && ./unit_tests --gtest_filter='$(FILTER)'

# List all available tests
.PHONY: test-list
test-list: build-tests
@echo "Available test cases:"
cd $(BUILD_DIR)/tests && ./unit_tests --gtest_list_tests

# Clean build artifacts
.PHONY: clean
clean:
Expand Down Expand Up @@ -166,7 +210,10 @@ help:
@echo " make clean - Remove build artifacts"
@echo ""
@echo "Testing:"
@echo " make test - Run tests"
@echo " make test - Run all tests"
@echo " make test-new - Run tests for modified files (smart)"
@echo " make test-filter FILTER='pattern' - Run tests matching pattern"
@echo " make test-list - List all available test cases"
@echo " make benchmark - Run connection pooling benchmark"
@echo ""
@echo "Documentation:"
Expand Down
20 changes: 20 additions & 0 deletions include/databricks/jobs/jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ class Jobs {
*/
uint64_t run_now(uint64_t job_id, const std::map<std::string, std::string>& notebook_params = {});

/**
* @brief Cancel a running or pending job run
*
* @param run_id The unique identifier of the job run to cancel
* @return true if the cancellation request was successful
* @throws std::runtime_error if the API request fails
*/
bool cancel_run(uint64_t run_id);

/**
* @brief Get the output of a completed job run
*
* @param run_id The unique identifier of the job run output
* @return RunOuput object containing execution output and logs
* @throws std::runtime_error if the API request fails, or if the run is not found
*
* @note Only completed runs have an output. Running jobs will return an error
*/
RunOutput get_run_output(uint64_t run_id);

private:
class Impl;
std::unique_ptr<Impl> pimpl_;
Expand Down
19 changes: 19 additions & 0 deletions include/databricks/jobs/jobs_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,23 @@ struct JobRun {
static JobRun from_json(const std::string& json_str);
};

/**
* @brief Represents a run output object for a Databricks job
*
* This struct contains the key fields from a Databricks job including logs, errors, and metadata.
*/
struct RunOutput {
std::string notebook_output; ///< Output from notebook tasks (JSON)
std::string sql_output; ///< Output from SQL tasks
std::string logs; ///< Execution logs
std::string error; ///< Error message if run failed
std::string error_trace; ///< Stack trace if available
std::map<std::string, std::string> metadata; ///< Additional output metadata

/**
* @brief Parse RunOutput from JSON string
*/
static RunOutput from_json(const std::string& json_str);
};

} // namespace databricks
57 changes: 56 additions & 1 deletion src/jobs/jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ std::string build_query_string(const std::map<std::string, std::string>& params)
} // namespace

// ============================================================================
// Job and JobRun JSON Parsing
// JSON Parsing Helper Functions
// ============================================================================

Job Job::from_json(const std::string& json_str) {
Expand Down Expand Up @@ -101,6 +101,27 @@ JobRun JobRun::from_json(const std::string& json_str) {
}
}

RunOutput RunOutput::from_json(const std::string& json_str) {
try {
auto j = json::parse(json_str);
RunOutput run_output;

run_output.notebook_output = j.value("notebook_output", "");
run_output.sql_output = j.value("sql_output", "");
run_output.logs = j.value("logs", "");
run_output.error = j.value("error", "");
run_output.error_trace = j.value("error_trace", "");

if (j.contains("metadata")) {
run_output.metadata["raw"] = j["metadata"].dump();
}

return run_output;
} catch (const json::exception& e) {
throw std::runtime_error("Failed to parse RunOutput JSON: " + std::string(e.what()));
}
}

// ============================================================================
// Jobs Constructor and Destructor
// ============================================================================
Expand Down Expand Up @@ -192,6 +213,40 @@ uint64_t Jobs::run_now(uint64_t job_id, const std::map<std::string, std::string>
}
}

bool Jobs::cancel_run(uint64_t run_id) {
internal::get_logger()->info("Cancelling run for run_id=" + std::to_string(run_id));

// Build request body
json body_json;
body_json["run_id"] = run_id;
std::string body = body_json.dump();

internal::get_logger()->debug("Request body: " + body);

// Make API request
auto response = pimpl_->http_client_->post("/jobs/runs/cancel", body);
pimpl_->http_client_->check_response(response, "cancelJob");
internal::get_logger()->info("Successfully cancelled run for run_id=" + std::to_string(run_id));

return true;
}

RunOutput Jobs::get_run_output(uint64_t run_id) {
internal::get_logger()->info("Retrieving the output for run_id=" + std::to_string(run_id));

// Build query parameters
std::map<std::string, std::string> params;
params["run_id"] = std::to_string(run_id);

// Make API request
std::string query = build_query_string(params);
auto response = pimpl_->http_client_->get("/jobs/runs/get-output" + query);
pimpl_->http_client_->check_response(response, "getRunOutput");

internal::get_logger()->debug("Job run output response: " + response.body);
return RunOutput::from_json(response.body);
}

// ============================================================================
// Private Helper Methods
// ============================================================================
Expand Down
63 changes: 63 additions & 0 deletions tests/unit/jobs/test_jobs.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
// Copyright (c) 2025 Calvin Min
// SPDX-License-Identifier: MIT
#include "../../mocks/mock_http_client.h"

#include <databricks/core/config.h>
#include <databricks/jobs/jobs.h>
#include <gtest/gtest.h>

using databricks::test::MockHttpClient;
using ::testing::_;
using ::testing::Return;

// Test fixture for Jobs tests
class JobsTest : public ::testing::Test {
protected:
Expand All @@ -16,6 +22,18 @@ class JobsTest : public ::testing::Test {
}
};

// Test fixture for Jobs API tests with mocks
class JobsApiTest : public ::testing::Test {
protected:
databricks::AuthConfig auth;

void SetUp() override {
auth.host = "https://test.databricks.com";
auth.set_token("test_token");
auth.timeout_seconds = 30;
}
};

// Test: Jobs client construction
TEST_F(JobsTest, ConstructorCreatesValidClient) {
ASSERT_NO_THROW({ databricks::Jobs jobs(auth); });
Expand Down Expand Up @@ -227,3 +245,48 @@ TEST_F(JobsTest, MultipleClientsCanCoexist) {
// Both should coexist without issues
});
}

// Test: Cancel Run
TEST_F(JobsApiTest, CancelRunReturnsTrueAndCallsApi) {
// Setup
auto mock_client = std::make_shared<MockHttpClient>();
EXPECT_CALL(*mock_client, post("/jobs/runs/cancel", _))
.WillOnce(Return(MockHttpClient::success_response(R"({"result":"OK"})")));

// check_response should be called and not throw
EXPECT_CALL(*mock_client, check_response(_, "cancelJob")).Times(1);

// Execute call with Mock Client
databricks::Jobs jobs(mock_client);
EXPECT_TRUE(jobs.cancel_run(42));
}

// Test: Get Run Output of Completed Run
TEST_F(JobsApiTest, GetRunOutputCompleted) {
// Setup
auto mock_client = std::make_shared<MockHttpClient>();

EXPECT_CALL(*mock_client, get("/jobs/runs/get-output?run_id=123"))
.WillOnce(Return(MockHttpClient::success_response(R"({"notebook_output":"success"} )")));

EXPECT_CALL(*mock_client, check_response(_, "getRunOutput")).Times(1);

databricks::Jobs jobs(mock_client);
auto output = jobs.get_run_output(123);
EXPECT_EQ(output.notebook_output, "success");
}

// Test: Get Run Output of a Failed Run
TEST_F(JobsApiTest, GetRunOutputFailedRun) {
// Setup
auto mock_client = std::make_shared<MockHttpClient>();
EXPECT_CALL(*mock_client, get("/jobs/runs/get-output?run_id=123"))
.WillOnce(Return(MockHttpClient::success_response(R"({"error":"error message","notebook_output":"failed"})")));

EXPECT_CALL(*mock_client, check_response(_, "getRunOutput")).Times(1);

databricks::Jobs jobs(mock_client);
auto output = jobs.get_run_output(123);
EXPECT_EQ(output.error, "error message");
EXPECT_EQ(output.notebook_output, "failed");
}