Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit number of sessions that are created by same client ip and same user for each graphd #3729

Merged
merged 27 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b85ff2c
add session_tool command to manage and analysis sessions
carrey-feng Jan 11, 2022
ac1a2a1
add session_tool command to manage and analysis sessions
carrey-feng Jan 11, 2022
011870d
Merge branch 'vesoft-inc:master' into master
carrey-feng Jan 14, 2022
7fd5318
add max_sessions_per_ip_per_user
carrey-feng Jan 14, 2022
5ef3245
odify judgment conditions
carrey-feng Jan 14, 2022
371196a
remove other pr content
carrey-feng Jan 14, 2022
fafed58
modify code
carrey-feng Mar 8, 2022
ee7f618
Merge branch 'master' into max_sessions_limit
Aiee Mar 8, 2022
d49d55c
fix the format reported
carrey-feng Mar 8, 2022
3293f20
fix the format reported
carrey-feng Mar 8, 2022
836cd8a
fix the format reported
carrey-feng Mar 9, 2022
8f19b8b
Merge branch 'master' into max_sessions_limit
Aiee Mar 10, 2022
875832d
fix the format for all files
carrey-feng Mar 10, 2022
1fd9348
Merge branch 'master' into max_sessions_limit
Aiee Mar 10, 2022
74575ad
change python test case
carrey-feng Mar 10, 2022
08646b5
Merge branch 'max_sessions_limit' of https://github.com/a516072575/ne…
carrey-feng Mar 10, 2022
cc82833
change python test case
carrey-feng Mar 10, 2022
3d32c94
Merge branch 'master' into max_sessions_limit
Aiee Mar 10, 2022
58fa208
change python test case
carrey-feng Mar 10, 2022
c6fc11e
Merge branch 'max_sessions_limit' of https://github.com/a516072575/ne…
carrey-feng Mar 10, 2022
8e0251f
delete spaces at last row of file
carrey-feng Mar 10, 2022
ea05902
Merge branch 'master' into max_sessions_limit
Aiee Mar 11, 2022
0822952
Merge branch 'master' into max_sessions_limit
CPWstatic Mar 14, 2022
fce2bcc
Merge branch 'master' into max_sessions_limit
Sophie-Xie Mar 14, 2022
616c06c
Fix Python Test Case session limit problem
carrey-feng Mar 15, 2022
fa40529
Merge branch 'master' into max_sessions_limit
Sophie-Xie Mar 15, 2022
e1015de
Merge branch 'master' into max_sessions_limit
Aiee Mar 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"session_idle_timeout_secs",
"session_reclaim_interval_secs",
"max_allowed_connections",
"disable_octal_escape_char"
"disable_octal_escape_char",
"max_sessions_per_ip_per_user"
],
"NESTED": [
"rocksdb_db_options",
Expand Down
20 changes: 20 additions & 0 deletions src/common/session/SessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@

namespace nebula {

class SessionCount {
private:
std::atomic<int32_t> count_ = 1;

public:
int fetch_add(int step) {
count_.fetch_add(step, std::memory_order_release);
return count_;
}
int fetch_sub(int step) {
count_.fetch_sub(step, std::memory_order_release);
return count_;
}
int get() {
return count_;
}
};

template <class SessionType>
class SessionManager {
public:
Expand Down Expand Up @@ -56,7 +74,9 @@ class SessionManager {

protected:
using SessionPtr = std::shared_ptr<SessionType>;
using SessionCountPtr = std::shared_ptr<SessionCount>;
folly::ConcurrentHashMap<SessionID, SessionPtr> activeSessions_;
folly::ConcurrentHashMap<std::string, SessionCountPtr> userIpSessionCount_;
std::unique_ptr<thread::GenericWorker> scavenger_;
meta::MetaClient* metaClient_{nullptr};
HostAddr myAddr_;
Expand Down
3 changes: 3 additions & 0 deletions src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ DEFINE_bool(disable_octal_escape_char,
DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature");

DEFINE_int32(num_rows_to_check_memory, 1024, "number rows to check memory");
DEFINE_int32(max_sessions_per_ip_per_user,
300,
"Maximum number of sessions that can be created per IP and per user");

// Sanity-checking Flag Values
static bool ValidateSessIdleTimeout(const char* flagname, int32_t value) {
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ DECLARE_bool(enable_authorize);
DECLARE_string(auth_type);
DECLARE_string(cloud_http_url);
DECLARE_uint32(max_allowed_statements);
DECLARE_int32(max_sessions_per_ip_per_user);

// Failed login attempt
// value of failed_login_attempts is in the range from 0 to 32767.
Expand Down
69 changes: 67 additions & 2 deletions src/graph/session/GraphSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::fin
if (!ret.second) {
return Status::Error("Insert session to local cache failed.");
}
std::string key = session.get_user_name() + session.get_client_ip();
bool addResp = addSessionCount(key);
if (!addResp) {
return Status::Error("Insert userIpSessionCount to local cache failed.");
}

// update the space info to sessionPtr
if (!spaceName.empty()) {
Expand All @@ -108,8 +113,22 @@ std::vector<meta::cpp2::Session> GraphSessionManager::getSessionFromLocalCache()

folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::createSession(
const std::string userName, const std::string clientIp, folly::Executor* runner) {
auto createCB = [this,
userName = userName](auto&& resp) -> StatusOr<std::shared_ptr<ClientSession>> {
// check the number of sessions per user per ip
std::string key = userName + clientIp;
auto maxSessions = FLAGS_max_sessions_per_ip_per_user;
auto uiscFindPtr = userIpSessionCount_.find(key);
if (uiscFindPtr != userIpSessionCount_.end() && maxSessions > 0 &&
uiscFindPtr->second.get()->get() > maxSessions - 1) {
return Status::Error(
"Create Session failed: Too many sessions created from %s by user %s. "
"the threshold is %d. You can change it by modifying '%s' in nebula-graphd.conf",
clientIp.c_str(),
userName.c_str(),
maxSessions,
"max_sessions_per_ip_per_user");
}
auto createCB = [this, userName = userName, clientIp = clientIp](
auto&& resp) -> StatusOr<std::shared_ptr<ClientSession>> {
if (!resp.ok()) {
LOG(ERROR) << "Create session failed:" << resp.status();
return Status::Error("Create session failed: %s", resp.status().toString().c_str());
Expand All @@ -127,6 +146,11 @@ folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::cre
if (!ret.second) {
return Status::Error("Insert session to local cache failed.");
}
std::string sessionKey = userName + clientIp;
bool addResp = addSessionCount(sessionKey);
if (!addResp) {
return Status::Error("Insert userIpSessionCount to local cache failed.");
}
updateSessionInfo(sessionPtr.get());
return sessionPtr;
}
Expand All @@ -153,7 +177,11 @@ void GraphSessionManager::removeSession(SessionID id) {
LOG(ERROR) << "Remove session `" << id << "' failed: " << resp.status();
return;
}
auto sessionCopy = iter->second->getSession();
std::string key = sessionCopy.get_user_name() + sessionCopy.get_client_ip();
activeSessions_.erase(iter);
// delete session count from cache
subSessionCount(key);
}

void GraphSessionManager::threadFunc() {
Expand Down Expand Up @@ -195,10 +223,14 @@ void GraphSessionManager::reclaimExpiredSessions() {
// TODO: Handle cases where the delete client failed
LOG(ERROR) << "Remove session `" << iter->first << "' failed: " << resp.status();
}
auto sessionCopy = iter->second->getSession();
std::string key = sessionCopy.get_user_name() + sessionCopy.get_client_ip();
iter = activeSessions_.erase(iter);
stats::StatsManager::decValue(kNumActiveSessions);
stats::StatsManager::addValue(kNumReclaimedExpiredSessions);
// TODO: Disconnect the connection of the session
// delete session count from cache
subSessionCount(key);
}
}

Expand Down Expand Up @@ -265,6 +297,7 @@ Status GraphSessionManager::init() {
if (!listSessionsRet.ok()) {
return Status::Error("Load sessions from meta failed.");
}
int64_t loadSessionCount = 0;
auto& sessions = *listSessionsRet.value().sessions_ref();
for (auto& session : sessions) {
if (session.get_graph_addr() != myAddr_) {
Expand All @@ -281,14 +314,46 @@ Status GraphSessionManager::init() {
continue;
}
session.queries_ref()->clear();
std::string key = session.get_user_name() + session.get_client_ip();
auto sessionPtr = ClientSession::create(std::move(session), metaClient_);
auto ret = activeSessions_.emplace(sessionId, sessionPtr);
if (!ret.second) {
return Status::Error("Insert session to local cache failed.");
}
bool addResp = addSessionCount(key);
if (!addResp) {
return Status::Error("Insert userIpSessionCount to local cache failed.");
}
updateSessionInfo(sessionPtr.get());
loadSessionCount++;
}
LOG(INFO) << "Total of " << loadSessionCount << " sessions are loaded";
return Status::OK();
}

bool GraphSessionManager::addSessionCount(std::string& key) {
auto countFindPtr = userIpSessionCount_.find(key);
if (countFindPtr != userIpSessionCount_.end()) {
countFindPtr->second.get()->fetch_add(1);
} else {
auto ret1 = userIpSessionCount_.emplace(key, std::make_shared<SessionCount>());
if (!ret1.second) {
return false;
}
}
return true;
}

bool GraphSessionManager::subSessionCount(std::string& key) {
auto countFindPtr = userIpSessionCount_.find(key);
if (countFindPtr == userIpSessionCount_.end()) {
return false;
}
auto count = countFindPtr->second.get()->fetch_sub(1);
if (count <= 0) {
userIpSessionCount_.erase(countFindPtr);
}
return true;
}
} // namespace graph
} // namespace nebula
6 changes: 6 additions & 0 deletions src/graph/session/GraphSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ class GraphSessionManager final : public SessionManager<ClientSession> {
// Updates session info locally.
// session: ClientSession which will be updated.
void updateSessionInfo(ClientSession* session);

// add sessionCount
bool addSessionCount(std::string& key);

// sub sessionCount
bool subSessionCount(std::string& key);
};

} // namespace graph
Expand Down
1 change: 1 addition & 0 deletions tests/admin/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def test_configs(self):
['GRAPH', 'session_reclaim_interval_secs', 'int', 'MUTABLE', 2],
['GRAPH', 'max_allowed_connections', 'int', 'MUTABLE', 9223372036854775807],
['GRAPH', 'disable_octal_escape_char', 'bool', 'MUTABLE', False],
['GRAPH', 'max_sessions_per_ip_per_user', 'int', 'MUTABLE', 300],
]
self.check_out_of_order_result(resp, expected_result)

Expand Down
46 changes: 45 additions & 1 deletion tests/job/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ def prepare(self):
resp = self.execute(query)
self.check_resp_succeeded(resp)

resp = self.execute('CREATE USER IF NOT EXISTS session_user1 WITH PASSWORD "123456"')
self.check_resp_succeeded(resp)

query = 'GRANT ROLE ADMIN ON nba TO session_user1'
resp = self.execute(query)
self.check_resp_succeeded(resp)

resp = self.execute('CREATE USER IF NOT EXISTS session_user2 WITH PASSWORD "123456"')
self.check_resp_succeeded(resp)

query = 'GRANT ROLE ADMIN ON nba TO session_user2'
resp = self.execute(query)
self.check_resp_succeeded(resp)

resp = self.execute('SHOW HOSTS GRAPH')
self.check_resp_succeeded(resp)
assert not resp.is_empty()
Expand All @@ -53,7 +67,10 @@ def cleanup(self):
session = self.client_pool.get_session('root', 'nebula')
resp = session.execute('DROP USER session_user')
self.check_resp_succeeded(resp)

resp = session.execute('DROP USER session_user1')
self.check_resp_succeeded(resp)
resp = session.execute('DROP USER session_user2')
self.check_resp_succeeded(resp)
def test_sessions(self):
# 1: test add session with right username
try:
Expand Down Expand Up @@ -225,3 +242,30 @@ def test_signout_and_execute(self):
resp = conn.execute(session_id, 'SHOW HOSTS')
assert resp.error_code == ttypes.ErrorCode.E_SESSION_INVALID, resp.error_msg
assert resp.error_msg.find(b'Session not existed!') > 0

def test_out_of_max_sessions_per_ip_per_user(self):
resp = self.execute('UPDATE CONFIGS graph:max_sessions_per_ip_per_user = 1')
self.check_resp_succeeded(resp)
time.sleep(3)

flag = True
# get new session failed for user session_user1
try:
self.client_pool.get_session("session_user1", "123456")
self.check_resp_succeeded(resp)
self.client_pool.get_session("session_user1", "123456")
flag = False
except Exception as e:
flag = True

# get new session success from session_user2
try:
self.client_pool.get_session("session_user2", "123456")
self.check_resp_succeeded(resp)
assert True and flag
except Exception as e:
assert False, e.message

resp = self.execute('UPDATE CONFIGS graph:max_sessions_per_ip_per_user = 300')
self.check_resp_succeeded(resp)
time.sleep(3)