Skip to content

Commit

Permalink
Merge branch 'master' into fix-gc-parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu authored Mar 14, 2022
2 parents ad440a1 + 2a94905 commit ec1b11f
Show file tree
Hide file tree
Showing 98 changed files with 2,738 additions and 780 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Columns/ColumnDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class ColumnDecimal final : public COWPtrHelper<ColumnVectorHelper, ColumnDecima
const T & getElement(size_t n) const { return data[n]; }
T & getElement(size_t n) { return data[n]; }

UInt32 getScale() const { return scale; }

protected:
Container data;
UInt32 scale;
Expand Down
36 changes: 31 additions & 5 deletions dbms/src/Common/Decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ using ScaleType = UInt32;
constexpr PrecType decimal_max_prec = 65;
constexpr ScaleType decimal_max_scale = 30;

// IntPrec indicates the max precision of different integer types.
// For now, the binary arithmetic functions use it to calculate result precision.
// And cast function use it to do some optimizations, such as skipping overflow check.
// But in TiDB the signed types will plus 1, for example IntPrec<int8_t>::prec is 4.
// This is a little confusing because we will add 1 when return result to client.
// Here we make sure TiFlash code is clean and will fix TiDB later.
template <typename T>
struct IntPrec
{
};
template <>
struct IntPrec<int8_t>
{
static constexpr PrecType prec = 4;
static constexpr PrecType prec = 3;
};
template <>
struct IntPrec<uint8_t>
Expand All @@ -37,7 +43,7 @@ struct IntPrec<uint8_t>
template <>
struct IntPrec<int16_t>
{
static constexpr PrecType prec = 6;
static constexpr PrecType prec = 5;
};
template <>
struct IntPrec<uint16_t>
Expand All @@ -47,7 +53,7 @@ struct IntPrec<uint16_t>
template <>
struct IntPrec<int32_t>
{
static constexpr PrecType prec = 11;
static constexpr PrecType prec = 10;
};
template <>
struct IntPrec<uint32_t>
Expand All @@ -57,14 +63,26 @@ struct IntPrec<uint32_t>
template <>
struct IntPrec<int64_t>
{
static constexpr PrecType prec = 20;
static constexpr PrecType prec = 19;
};
template <>
struct IntPrec<uint64_t>
{
static constexpr PrecType prec = 20;
};

template <>
struct IntPrec<Int128>
{
static constexpr PrecType prec = 39;
};

template <>
struct IntPrec<Int256>
{
static constexpr PrecType prec = 78;
};

// 1) If the declared type of both operands of a dyadic arithmetic operator is exact numeric, then the declared
// type of the result is an implementation-defined exact numeric type, with precision and scale determined as
// follows:
Expand Down Expand Up @@ -359,6 +377,8 @@ class DecimalMaxValue final : public ext::Singleton<DecimalMaxValue>
public:
static Int256 get(PrecType idx)
{
// In case DecimalMaxValue::get(IntPrec<Int256>::prec), where IntPrec<Int256>::prec > 65.
assert(idx <= decimal_max_prec);
return instance().getInternal(idx);
}

Expand All @@ -384,12 +404,18 @@ class DecimalMaxValue final : public ext::Singleton<DecimalMaxValue>

// In some case, getScaleMultiplier and its callee may not be auto inline by the compiler.
// This may hurt performance. __attribute__((flatten)) tells compliler to inline the callee of this function.
template <typename T>
template <typename T, std::enable_if_t<IsDecimal<T>> * = nullptr>
__attribute__((flatten)) inline typename T::NativeType getScaleMultiplier(ScaleType scale)
{
return static_cast<typename T::NativeType>(DecimalMaxValue::get(scale) + 1);
}

template <typename T, std::enable_if_t<is_integer_v<T>> * = nullptr>
__attribute__((flatten)) inline T getScaleMultiplier(ScaleType scale)
{
return static_cast<T>(DecimalMaxValue::get(scale) + 1);
}

template <typename T>
inline void checkDecimalOverflow(Decimal<T> v, PrecType prec)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Common/DynamicThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ void DynamicThreadPool::scheduledToNewDynamicThread(TaskPtr & task)
t.detach();
}

void executeTask(const std::unique_ptr<IExecutableTask> & task)
void DynamicThreadPool::executeTask(TaskPtr & task)
{
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_active_threads_of_thdpool, type_max_active_threads_of_thdpool);
task->execute();
task.reset();
}

void DynamicThreadPool::fixedWork(size_t index)
Expand Down Expand Up @@ -124,7 +125,6 @@ void DynamicThreadPool::dynamicWork(TaskPtr initial_task)
if (!node.task) // may be timeout or cancelled
break;
executeTask(node.task);
node.task.reset();
}
alive_dynamic_threads.fetch_sub(1);
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/DynamicThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class DynamicThreadPool
void fixedWork(size_t index);
void dynamicWork(TaskPtr initial_task);

static void executeTask(TaskPtr & task);

const std::chrono::nanoseconds dynamic_auto_shrink_cooldown;

std::vector<std::thread> fixed_threads;
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ class MPMCQueue
{
}

~MPMCQueue()
{
std::unique_lock lock(mu);
for (; read_pos < write_pos; ++read_pos)
destruct(getObj(read_pos));
}

/// Block util:
/// 1. Pop succeeds with a valid T: return true.
/// 2. The queue is cancelled or finished: return false.
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ namespace DB
F(type_max_threads_of_establish_mpp, {"type", "rpc_establish_mpp_max"}), \
F(type_active_threads_of_establish_mpp, {"type", "rpc_establish_mpp"}), \
F(type_max_threads_of_dispatch_mpp, {"type", "rpc_dispatch_mpp_max"}), \
F(type_active_threads_of_dispatch_mpp, {"type", "rpc_dispatch_mpp"}))
F(type_active_threads_of_dispatch_mpp, {"type", "rpc_dispatch_mpp"}), \
F(type_active_rpc_async_worker, {"type", "active_rpc_async_worker"}), \
F(type_total_rpc_async_worker, {"type", "total_rpc_async_worker"}))
// clang-format on

struct ExpBuckets
Expand Down
43 changes: 43 additions & 0 deletions dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

namespace DB::tests
{
namespace
{
class DynamicThreadPoolTest : public ::testing::Test
{
};
Expand Down Expand Up @@ -148,4 +150,45 @@ try
}
CATCH

struct X
{
std::mutex * mu;
std::condition_variable * cv;
bool * destructed;

X(std::mutex * mu_, std::condition_variable * cv_, bool * destructed_)
: mu(mu_)
, cv(cv_)
, destructed(destructed_)
{}

~X()
{
std::unique_lock lock(*mu);
*destructed = true;
cv->notify_all();
}
};

TEST_F(DynamicThreadPoolTest, testTaskDestruct)
try
{
std::mutex mu;
std::condition_variable cv;
bool destructed = false;

DynamicThreadPool pool(0, std::chrono::minutes(1));
auto tmp = std::make_shared<X>(&mu, &cv, &destructed);
pool.schedule(true, [x = tmp] {});
tmp.reset();

{
std::unique_lock lock(mu);
auto ret = cv.wait_for(lock, std::chrono::seconds(1), [&] { return destructed; });
ASSERT_TRUE(ret);
}
}
CATCH

} // namespace
} // namespace DB::tests
61 changes: 49 additions & 12 deletions dbms/src/Common/tests/gtest_mpmc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
#include <thread>
#include <vector>

namespace DB
namespace DB::tests
{
namespace tests
namespace
{
class TestMPMCQueue : public ::testing::Test
class MPMCQueueTest : public ::testing::Test
{
protected:
std::random_device rd;
Expand Down Expand Up @@ -489,28 +489,28 @@ class TestMPMCQueue : public ::testing::Test
};

template <>
struct TestMPMCQueue::ValueHelper<int>
struct MPMCQueueTest::ValueHelper<int>
{
static int make(int v) { return v; }
static int extract(int v) { return v; }
};

template <>
struct TestMPMCQueue::ValueHelper<std::unique_ptr<int>>
struct MPMCQueueTest::ValueHelper<std::unique_ptr<int>>
{
static std::unique_ptr<int> make(int v) { return std::make_unique<int>(v); }
static int extract(std::unique_ptr<int> & v) { return *v; }
};

template <>
struct TestMPMCQueue::ValueHelper<std::shared_ptr<int>>
struct MPMCQueueTest::ValueHelper<std::shared_ptr<int>>
{
static std::shared_ptr<int> make(int v) { return std::make_shared<int>(v); }
static int extract(std::shared_ptr<int> & v) { return *v; }
};

#define ADD_TEST_FOR(type_name, type, test_name, ...) \
TEST_F(TestMPMCQueue, type_name##_##test_name) \
TEST_F(MPMCQueueTest, type_name##_##test_name) \
try \
{ \
test##test_name<type>(__VA_ARGS__); \
Expand All @@ -533,7 +533,7 @@ ADD_TEST(CancelEmpty, 4, 4);
ADD_TEST(CancelConcurrentPop, 4);
ADD_TEST(CancelConcurrentPush, 4);

TEST_F(TestMPMCQueue, ExceptionSafe)
TEST_F(MPMCQueueTest, ExceptionSafe)
try
{
MPMCQueue<ThrowInjectable> queue(10);
Expand Down Expand Up @@ -590,8 +590,8 @@ try
}
CATCH


TEST_F(TestMPMCQueue, isNextOpNonBlocking)
TEST_F(MPMCQueueTest, isNextOpNonBlocking)
try
{
MPMCQueue<int> q(2);
ASSERT_TRUE(q.isNextPushNonBlocking());
Expand Down Expand Up @@ -621,6 +621,43 @@ TEST_F(TestMPMCQueue, isNextOpNonBlocking)
ASSERT_TRUE(q.isNextPushNonBlocking());
ASSERT_TRUE(q.isNextPopNonBlocking());
}
CATCH

struct Counter
{
static int count;
Counter()
{
++count;
}

~Counter()
{
--count;
}
};
int Counter::count = 0;

TEST_F(MPMCQueueTest, objectsDestructed)
try
{
{
MPMCQueue<Counter> queue(100);
queue.emplace();
ASSERT_EQ(Counter::count, 1);

{
Counter cnt;
queue.pop(cnt);
}
ASSERT_EQ(Counter::count, 0);

queue.emplace();
ASSERT_EQ(Counter::count, 1);
}
ASSERT_EQ(Counter::count, 0);
}
CATCH

} // namespace tests
} // namespace DB
} // namespace
} // namespace DB::tests
1 change: 1 addition & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ DBGInvoker::DBGInvoker()
regSchemalessFunc("rename_column_in_tidb_table", MockTiDBTable::dbgFuncRenameColumnInTiDBTable);
regSchemalessFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable);
regSchemalessFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable);
regSchemalessFunc("create_tidb_tables", MockTiDBTable::dbgFuncCreateTiDBTables);

regSchemalessFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,19 @@ TableID MockTiDB::newTable(
return addTable(database_name, std::move(*table_info));
}

int MockTiDB::newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
const String & engine_type)
{
for (const auto & [table_name, columns, handle_pk_name] : tables)
{
newTable(database_name, table_name, columns, tso, handle_pk_name, engine_type);
}
return 0;
}

TableID MockTiDB::addTable(const String & database_name, TiDB::TableInfo && table_info)
{
auto table = std::make_shared<Table>(database_name, databases[database_name], table_info.name, std::move(table_info));
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class MockTiDB : public ext::Singleton<MockTiDB>
const String & handle_pk_name,
const String & engine_type);

int newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
const String & engine_type);

TableID addTable(const String & database_name, TiDB::TableInfo && table_info);

static TiDB::TableInfoPtr parseColumns(
Expand Down
Loading

0 comments on commit ec1b11f

Please sign in to comment.