Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize GatherElements further, add threshold for parallelizing Scaler. #5579

Merged
merged 3 commits into from
Oct 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions onnxruntime/core/providers/cpu/ml/scaler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ ONNX_CPU_OPERATOR_TYPED_ML_KERNEL(
KernelDefBuilder().TypeConstraint("T", DataTypeImpl::GetTensorType<int32_t>()).MayInplace(0, 0),
ScalerOp<int32_t>);

static constexpr int kParallelizationThreshold = 10 * 1000;

template <typename T>
ScalerOp<T>::ScalerOp(const OpKernelInfo& info) : OpKernel(info),
scale_(info.GetAttrsOrDefault<float>("scale")),
Expand All @@ -84,29 +86,27 @@ common::Status ScalerOp<T>::Compute(OpKernelContext* context) const {
size_t x_size = x_shape.Size();
int64_t stride = x_dims.size() == 1 ? x_dims[0] : x_dims[1];
auto* ttp = context->GetOperatorThreadPool();
auto num_threads = std::min<int>(concurrency::ThreadPool::DegreeOfParallelism(ttp), static_cast<int>(x_size));
auto conditional_batch_call = [ttp, x_size](std::function<void(ptrdiff_t)> f) {
if (x_size < kParallelizationThreshold) { // TODO: tune this, arbitrary threshold
pranavsharma marked this conversation as resolved.
Show resolved Hide resolved
for (size_t i = 0; i < x_size; ++i) {
f(i);
}
} else {
concurrency::ThreadPool::TryBatchParallelFor(ttp, x_size, f, 0);
}
};

if (static_cast<int64_t>(offset_.size()) == stride &&
static_cast<int64_t>(scale_.size()) == stride) {
concurrency::ThreadPool::TrySimpleParallelFor(
ttp,
num_threads,
[this, num_threads, y_data, x_data, stride, x_size](ptrdiff_t batch_num) {
auto work = concurrency::ThreadPool::PartitionWork(batch_num, num_threads, x_size);
for (auto i = work.start; i < work.end; ++i) {
y_data[i] = static_cast<float>((x_data[i] - offset_[i % stride]) * scale_[i % stride]);
}
});
auto fn = [this, y_data, x_data, stride](ptrdiff_t i) {
y_data[i] = static_cast<float>((x_data[i] - offset_[i % stride]) * scale_[i % stride]);
};
conditional_batch_call(fn);
} else if (offset_.size() == 1 && scale_.size() == 1) {
concurrency::ThreadPool::TrySimpleParallelFor(
ttp,
num_threads,
[this, num_threads, y_data, x_data, x_size](ptrdiff_t batch_num) {
auto work = concurrency::ThreadPool::PartitionWork(batch_num, num_threads, x_size);
for (auto i = work.start; i < work.end; ++i) {
y_data[i] = static_cast<float>((x_data[i] - offset_[0]) * scale_[0]);
}
});
auto fn = [this, y_data, x_data](ptrdiff_t i) {
y_data[i] = static_cast<float>((x_data[i] - offset_[0]) * scale_[0]);
};
conditional_batch_call(fn);
} else {
std::ostringstream err_msg;
err_msg << "Either both scale and offset can be of feature size (" << stride << ") or 1";
Expand Down
89 changes: 58 additions & 31 deletions onnxruntime/core/providers/cpu/tensor/gather_elements.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ ONNX_CPU_OPERATOR_KERNEL(
DataTypeImpl::GetTensorType<int64_t>()}),
GatherElements);

static constexpr int kParallelizationThreshold = 10 * 1000;

// Some helpers needed for GatherElements op -

// The following method computes the offset in the flattened array
Expand Down Expand Up @@ -87,7 +89,8 @@ static inline void increment_over_inner_dim(std::vector<int64_t>& current_dims,
}

template <typename Tin>
static inline int64_t GetNegativeIndexAdjustedValue(const Tin* indices_data, Tin index, int64_t axis, const TensorShape& input_shape) {
static inline int64_t GetNegativeIndexAdjustedValue(const Tin* indices_data, Tin index, int64_t axis,
const TensorShape& input_shape) {
int64_t retval = -1;
if (indices_data[index] < 0) {
retval = static_cast<int64_t>(indices_data[index] + input_shape[axis]);
Expand All @@ -105,7 +108,7 @@ static inline int64_t GetNegativeIndexAdjustedValue(const Tin* indices_data, Tin
#endif
template <bool is_string, typename T, typename Tin>
static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor,
Tensor* output_tensor, int64_t axis) {
Tensor* output_tensor, int64_t axis, concurrency::ThreadPool* ttp) {
// get pointer to input data
// optimizer will remove the redundant if/else block based on 'is_string' template parameter
const T* input_data = nullptr;
Expand Down Expand Up @@ -135,23 +138,37 @@ static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor,
auto num_elements = indices_tensor->Shape().Size();
int64_t lower_index_limit = -input_shape[axis];
int64_t upper_index_limit = input_shape[axis] - 1;
for (int64_t i = 0; i < num_elements; ++i) {

auto validation_fn = [indices_data, lower_index_limit, upper_index_limit](ptrdiff_t i) {
auto indices_val = indices_data[i];
if (indices_val < lower_index_limit || indices_val > upper_index_limit)
ORT_THROW("GatherElements op: Value in indices must be within bounds [",
lower_index_limit, " , ", upper_index_limit, "]. Actual value is ", indices_val);
};
for (int64_t i = 0; i < num_elements; ++i) { // TODO: parallelize this? didn't give any benefit in my tests
validation_fn(i);
}

int64_t num_inner_dim = calculate_num_inner_dim(indices_shape);
int64_t inner_dim_size = indices_shape[input_rank - 1];
bool processing_inner_dim = (axis == input_rank - 1) ? true : false;

int64_t base_offset = 0;
Tin indices_counter = -1;
int64_t output_counter = -1;
Tin indices_counter = 0;
size_t element_size = input_tensor->DataType()->Size();

std::vector<int64_t> process_dims(input_rank, 0);
int64_t output_counter = 0;

auto conditional_batch_call = [ttp, inner_dim_size](std::function<void(ptrdiff_t)> f) {
if (inner_dim_size < kParallelizationThreshold) { // TODO: tune this, arbitrary threshold
pranavsharma marked this conversation as resolved.
Show resolved Hide resolved
for (int64_t i = 0; i < inner_dim_size; ++i) {
f(i);
}
} else {
concurrency::ThreadPool::TryBatchParallelFor(ttp, inner_dim_size, f, 0);
pranavsharma marked this conversation as resolved.
Show resolved Hide resolved
}
};

if (!processing_inner_dim) {
while (num_inner_dim-- != 0) {
Expand All @@ -160,22 +177,27 @@ static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor,
// process 1 chunk of 'inner dimension' length
// optimizer will remove the redundant if/else block based on 'is_string' template parameter
if (is_string) {
for (int64_t i = 0; i < inner_dim_size; ++i) {
output_data[++output_counter] =
auto fn = [input_data, output_data, base_offset, input_shape_pitches,
indices_data, indices_counter, axis, input_shape, output_counter](ptrdiff_t i) {
output_data[i + output_counter] =
input_data[base_offset +
(GetNegativeIndexAdjustedValue<Tin>(indices_data, ++indices_counter, axis, input_shape) *
(GetNegativeIndexAdjustedValue<Tin>(indices_data, static_cast<Tin>(i) + indices_counter, axis, input_shape) *
input_shape_pitches[axis]) +
i];
}
};
conditional_batch_call(fn);
output_counter += inner_dim_size;
} else {
for (int64_t i = 0; i < inner_dim_size; ++i) {
// optimizer will remove the redundant if/else block based on 'is_string' template parameter
memcpy(output_data,
input_data + (base_offset + (GetNegativeIndexAdjustedValue<Tin>(indices_data, ++indices_counter, axis, input_shape) * input_shape_pitches[axis]) + i) * element_size,
auto fn = [input_data, output_data, base_offset, input_shape_pitches, element_size,
indices_data, indices_counter, axis, input_shape](ptrdiff_t i) {
memcpy(output_data + (i * element_size),
input_data + (base_offset + (GetNegativeIndexAdjustedValue<Tin>(indices_data, static_cast<Tin>(i) + indices_counter, axis, input_shape) * input_shape_pitches[axis]) + i) * element_size,
element_size);
output_data += element_size;
}
};
conditional_batch_call(fn);
output_data += inner_dim_size * element_size;
}
indices_counter += static_cast<Tin>(inner_dim_size);
increment_over_inner_dim(process_dims, indices_shape);
}
}
Expand All @@ -185,27 +207,31 @@ static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor,
base_offset = compute_base_offset(process_dims, input_shape_pitches, axis);

// process 1 chunk of 'inner dimension' length
// optimizer will remove the redundant if/else block based on 'is_string' template parameter
if (is_string) {
for (int64_t i = 0; i < inner_dim_size; ++i) {
auto fn = [input_data, output_data, base_offset,
indices_data, indices_counter, axis, input_shape, output_counter](ptrdiff_t i) {
// for innermost axis, input_shape_pitches[axis] = 1 (so no need to multiply)
output_data[++output_counter] =
output_data[i + output_counter] =
input_data[base_offset +
GetNegativeIndexAdjustedValue<Tin>(indices_data, ++indices_counter, axis, input_shape)];
}
GetNegativeIndexAdjustedValue<Tin>(indices_data, static_cast<Tin>(i) + indices_counter, axis, input_shape)];
};
conditional_batch_call(fn);
output_counter += inner_dim_size;
} else {
for (int64_t i = 0; i < inner_dim_size; ++i) {
// for innermost axis, input_shape_pitches[axis] = 1 (so no need to multiply)
// optimizer will remove the redundant if/else block based on 'is_string' template parameter
memcpy(output_data,
// for innermost axis, input_shape_pitches[axis] = 1 (so no need to multiply)
auto fn = [input_data, output_data, base_offset, element_size,
indices_data, indices_counter, axis, input_shape](ptrdiff_t i) {
memcpy(output_data + (i * element_size),
input_data + (base_offset +
GetNegativeIndexAdjustedValue<Tin>(indices_data, ++indices_counter, axis, input_shape)) *
GetNegativeIndexAdjustedValue<Tin>(indices_data, static_cast<Tin>(i) + indices_counter, axis, input_shape)) *
element_size,
element_size);
output_data += element_size;
}
};
conditional_batch_call(fn);
output_data += inner_dim_size * element_size;
}

indices_counter += static_cast<Tin>(inner_dim_size);
increment_over_inner_dim(process_dims, indices_shape);
}
}
Expand Down Expand Up @@ -272,16 +298,17 @@ Status GatherElements::Compute(OpKernelContext* context) const {
if (indices_shape.Size() == 0)
return Status::OK();

auto* ttp = context->GetOperatorThreadPool();
if (input_tensor->IsDataTypeString()) {
if (indices_tensor->IsDataType<int32_t>())
core_impl<true, std::string, int32_t>(input_tensor, indices_tensor, output_tensor, axis);
core_impl<true, std::string, int32_t>(input_tensor, indices_tensor, output_tensor, axis, ttp);
else
core_impl<true, std::string, int64_t>(input_tensor, indices_tensor, output_tensor, axis);
core_impl<true, std::string, int64_t>(input_tensor, indices_tensor, output_tensor, axis, ttp);
} else {
if (indices_tensor->IsDataType<int32_t>())
core_impl<false, int8_t, int32_t>(input_tensor, indices_tensor, output_tensor, axis);
core_impl<false, int8_t, int32_t>(input_tensor, indices_tensor, output_tensor, axis, ttp);
else
core_impl<false, int8_t, int64_t>(input_tensor, indices_tensor, output_tensor, axis);
core_impl<false, int8_t, int64_t>(input_tensor, indices_tensor, output_tensor, axis, ttp);
}

return Status::OK();
Expand Down
39 changes: 36 additions & 3 deletions onnxruntime/test/providers/cpu/ml/scaler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,29 @@

#include "gtest/gtest.h"
#include "test/providers/provider_test_utils.h"

using namespace std;
namespace onnxruntime {
namespace test {

template <typename T>
void TestScalar() {
void TestScalar(bool use_big_input = false) {
OpTester test("Scaler", 1, onnxruntime::kMLDomain);
vector<float> scale{3.f, -4.f, 3.0f};
vector<float> offset{4.8f, -0.5f, 77.0f};
test.AddAttribute("scale", scale);
test.AddAttribute("offset", offset);
vector<T> input{1, -2, 3, 4, 5, -6};
vector<int64_t> dims{2, 3};
vector<T> input;
vector<int64_t> dims;

if (!use_big_input) {
input = vector<T>{1, -2, 3, 4, 5, -6};
dims = {2, 3};
} else {
input.resize(15 * 1000); // must be >= kParallelizationThreshold in scaler.cc
std::iota(std::begin(input), std::end(input), static_cast<T>(1));
dims = {5000, 3};
}

// prepare expected output
vector<float> expected_output;
Expand All @@ -33,6 +43,7 @@ TEST(MLOpTest, ScalerOp) {
TestScalar<double>();
TestScalar<int64_t>();
TestScalar<int32_t>();
TestScalar<float>(true); // use big input
}

TEST(MLOpTest, ScalerOpScaleOffsetSize1) {
Expand All @@ -55,5 +66,27 @@ TEST(MLOpTest, ScalerOpScaleOffsetSize1) {
test.Run();
}

// tests invocation via TryBatchParallelFor for input of size 10K
TEST(MLOpTest, ScalerOpScaleOffsetSize1BigInput) {
OpTester test("Scaler", 1, onnxruntime::kMLDomain);
vector<float> scale{3.f};
vector<float> offset{4.8f};
test.AddAttribute("scale", scale);
test.AddAttribute("offset", offset);
vector<float> input(15 * 1000); // must be >= kParallelizationThreshold in scaler.cc
std::iota(std::begin(input), std::end(input), 1.0f);
vector<int64_t> dims{3, 5000};

// prepare expected output
vector<float> expected_output;
for (size_t i = 0; i < input.size(); ++i) {
expected_output.push_back((input[i] - offset[0]) * scale[0]);
}

test.AddInput<float>("X", dims, input);
test.AddOutput<float>("Y", dims, expected_output);
test.Run();
}

} // namespace test
} // namespace onnxruntime
18 changes: 18 additions & 0 deletions onnxruntime/test/providers/cpu/tensor/gather_elements_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,5 +315,23 @@ TEST(GatherElementsOpTest, string) {
RunTypedTest<std::string>();
}

TEST(GatherElementsOpTest, BigIndices) {
// int32_t indices - axis 0
OpTester test1("GatherElements", 11);

test1.AddAttribute<int64_t>("axis", 0LL);
const int kNumIndices = 10 * 1000; // must be >= kParallelizationThreshold in gather_elements.cc
std::vector<float> input(2 * kNumIndices);
std::iota(std::begin(input), std::end(input), 0.f);
test1.AddInput<float>("data", {2, kNumIndices}, input);

std::vector<int32_t> indices(kNumIndices, 0);
std::vector<float> output(kNumIndices);
std::iota(std::begin(output), std::end(output), 0.f);
test1.AddInput<int32_t>("indices", {1, kNumIndices}, indices);
test1.AddOutput<float>("output", {1, kNumIndices}, output);
test1.Run();
}

} // namespace test
} // namespace onnxruntime