Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions doc/admin-guide/files/records.yaml.en.rst
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,25 @@ Network
between `proxy.config.net.max_connections_in` and `proxy.config.net.max_requests_in`
is the amount of maximum idle (keepalive) connections |TS| will maintain.

.. ts:cv:: CONFIG proxy.config.net.per_client.max_connections_in INT 0
:reloadable:

The total number of concurrent client connections that |TS| will accept from
a given client IP address. Any received connections from a client beyond this
limit will be immediately closed. Once the number of concurrent client
connections drops below this configured value, |TS| will begin accepting new
connections from that IP while the number of concurrent connections remains
below this limit. A value of 0 disables the per client concurrent connection
limit.

.. ts:cv:: CONFIG proxy.config.http.per_client.connection.alert_delay INT 60
:reloadable:
:units: seconds

Throttle alerts per client IP address to be no more often than this many
seconds. Summary data is provided per alert to allow log scrubbing to
generate accurate data.

.. ts:cv:: CONFIG proxy.config.net.max_requests_in INT 0

The total number of concurrent requests or active client connections
Expand Down
150 changes: 70 additions & 80 deletions include/iocore/net/ConnectionTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
#include <string_view>
#include <chrono>
#include <atomic>
#include <memory>
#include <mutex>
#include <sstream>
#include <tuple>
#include <mutex>
#include "records/RecCore.h"
#include "tscore/ink_platform.h"
#include "tscore/ink_config.h"
Expand All @@ -44,10 +45,12 @@
#include "iocore/net/SessionSharingAPIEnums.h"

/**
* Singleton class to keep track of the number of outbound connections.
* Singleton class to keep track of the number of inbound and outbound connections.
*
* Outbound connections are divided in to equivalence classes (called "groups" here) based on the
* session matching setting. Tracking data is stored for each group.
* Outbound connections are divided into equivalence classes called "groups"
* here. For outbound connections, groups will vary based on the session
* matching configuration. For inbound connections, a group is always based on
* the remote IP address. Tracking data is stored for each group.
*/
class ConnectionTracker
{
Expand Down Expand Up @@ -78,12 +81,14 @@ class ConnectionTracker

/** Static configuration values. */
struct GlobalConfig {
std::chrono::seconds client_alert_delay{60}; ///< Alert delay in seconds.
std::chrono::seconds server_alert_delay{60}; ///< Alert delay in seconds.
};

// The names of the configuration values.
// Unfortunately these are not used in RecordsConfig.cc so that must be made consistent by hand.
// Note: These need to be @c constexpr or there are static initialization ordering risks.
static constexpr std::string_view CONFIG_CLIENT_VAR_ALERT_DELAY{"proxy.config.http.per_client.connection.alert_delay"};
static constexpr std::string_view CONFIG_SERVER_VAR_MAX{"proxy.config.http.per_server.connection.max"};
static constexpr std::string_view CONFIG_SERVER_VAR_MIN{"proxy.config.http.per_server.connection.min"};
static constexpr std::string_view CONFIG_SERVER_VAR_MATCH{"proxy.config.http.per_server.connection.match"};
Expand All @@ -108,49 +113,54 @@ class ConnectionTracker
MatchType const &_match_type; ///< Type of matching.
};

IpEndpoint _addr; ///< Remote IP address.
CryptoHash _hash; ///< Hash of the FQDN.
MatchType _match_type; ///< Type of matching.
std::string _fqdn; ///< Expanded FQDN, set if matching on FQDN.
int _min_keep_alive_conns; /// < Min keep alive conns on this server group
Key _key; ///< Pre-assembled key which references the following members.
enum class DirectionType { INBOUND, OUTBOUND };

DirectionType _direction; ///< Whether the group is for inbound or outbound connections.
IpEndpoint _addr; ///< Remote IP address.
CryptoHash _hash; ///< Hash of the FQDN.
MatchType _match_type{MATCH_IP}; ///< Type of matching.
std::string _fqdn; ///< Expanded FQDN, set if matching on FQDN.
int _min_keep_alive_conns{0}; /// < Min keep alive conns on this server group
Key _key; ///< Pre-assembled key which references the following members.
std::chrono::seconds const &_alert_delay; ///< A reference to client or server alert_delay depending upon connection direction.

// Counting data.
std::atomic<int> _count{0}; ///< Number of outbound connections.
std::atomic<int> _count{0}; ///< Number of inbound or outbound connections.
std::atomic<int> _count_max{0}; ///< largest observed @a count value.
std::atomic<int> _blocked{0}; ///< Number of connections blocked since last alert.
std::atomic<int> _in_queue{0}; ///< # of connections queued, waiting for a connection.
std::atomic<Ticker> _last_alert{0}; ///< Absolute time of the last alert.

// Links for intrusive container.
Group *_next{nullptr};
Group *_prev{nullptr};

/** Constructor.
* Construct from @c Key because the use cases do a table lookup first so the @c Key is already constructed.
* @param key A populated @c Key structure - values are copied to the @c Group.
* @param fqdn The full FQDN.
* @param min_keep_alive The minimum number of origin keep alive connections to maintain.
*/
Group(Key const &key, std::string_view fqdn, int min_keep_alive);
Group(DirectionType direction, Key const &key, std::string_view fqdn, int min_keep_alive);
~Group();
/// Key equality checker.
static bool equal(Key const &lhs, Key const &rhs);
/// Hashing function.
static uint64_t hash(Key const &);
static size_t hash(Key const &);
/// Check and clear alert enable.
/// This is a modifying call - internal state will be updated to prevent too frequent alerts.
/// @param lat The last alert time, in epoch seconds, if the method returns @c true.
/// @return @c true if an alert should be generated, @c false otherwise.
bool should_alert(std::time_t *lat = nullptr);
/// Time of the last alert in epoch seconds.
std::time_t get_last_alert_epoch_time() const;

/// Release the reference count to this group and remove it from the
/// group table if it is no longer referenced.
void release();
};

/// Container for per transaction state and operations.
struct TxnState {
Group *_g{nullptr}; ///< Active group for this transaction.
bool _reserved_p{false}; ///< Set if a connection slot has been reserved.
bool _queued_p{false}; ///< Set if the connection is delayed / queued.
std::shared_ptr<Group> _g; ///< Active group for this transaction.
bool _reserved_p{false}; ///< Set if a connection slot has been reserved.
bool _queued_p{false}; ///< Set if the connection is delayed / queued.

/// Check if tracking is active.
bool is_active();
Expand All @@ -167,9 +177,9 @@ class ConnectionTracker
void blocked();
/// Clear all reservations.
void clear();
/// Drop the reservation - assume it will be cleaned up elsewhere.
/// Transfer ownership of the group outside of this state.
/// @return The group for this reservation.
Group *drop();
std::shared_ptr<Group> drop();
/// Update the maximum observed count if needed against @a count.
void update_max_count(int count);

Expand All @@ -192,6 +202,12 @@ class ConnectionTracker
void Warn_Blocked(int max_connections, int64_t id, int count, const sockaddr *addr, const char *debug_tag = nullptr);
};

/** Get or create the @c Group for the specified inbound session properties.
* @param addr The IP address of the client.
* @return A @c Group for the arguments, existing if possible and created if not.
*/
static TxnState obtain_inbound(IpEndpoint const &addr);

/** Get or create the @c Group for the specified outbound session properties.
* @param txn_cnf The transaction local configuration.
* @param fqdn The fully qualified domain name of the upstream.
Expand All @@ -200,17 +216,17 @@ class ConnectionTracker
*/
static TxnState obtain_outbound(TxnConfig const &txn_cnf, std::string_view fqdn, IpEndpoint const &addr);

/** Get the currently existing groups.
/** Get the currently existing outbound groups.
* @param [out] groups parameter - pointers to the groups are pushed in to this container.
*
* The groups are loaded in to @a groups, which is cleared before loading. Note the groups returned will remain valid
* although data inside the groups is volatile.
*/
static void get(std::vector<Group const *> &groups);
/** Write the connection tracking data to JSON.
static void get_outbound_groups(std::vector<std::shared_ptr<Group const>> &groups);
/** Write the outbound connection tracking data to JSON.
* @return string containing a JSON encoding of the table.
*/
static std::string to_json_string();
static std::string outbound_to_json_string();
/** Write the groups to @a f.
* @param f Output file.
*/
Expand Down Expand Up @@ -253,55 +269,47 @@ class ConnectionTracker
protected:
static GlobalConfig *_global_config; ///< Global configuration data.

/// Types and methods for the hash table.
struct Linkage {
/// Provide std::unordered_map compatible hash and equality functions for @c Group.
struct GroupMapHelper {
using key_type = Group::Key const &;
using value_type = Group;

static value_type *&next_ptr(value_type *value);
static value_type *&prev_ptr(value_type *value);

static uint64_t hash_of(key_type key);

static key_type key_of(value_type *v);
/// Return the hash of @a key.
size_t operator()(key_type &key) const;

static bool equal(key_type lhs, key_type rhs);
/// Compare @a lhs and @a rhs for equality.
bool operator()(key_type &lhs, key_type &rhs) const;
};

/// Internal implementation class instance.
struct Imp {
swoc::IntrusiveHashMap<Linkage> _table; ///< Hash table of upstream groups.
std::mutex _mutex; ///< Lock for insert & find.
struct TableSingleton {
friend ConnectionTracker::Group;
std::unordered_map<Group::Key, std::shared_ptr<Group>, GroupMapHelper, GroupMapHelper>
_table; ///< Hash table of connection groups.
std::mutex _mutex; ///< Lock for insert, delete, and find.
};
static Imp _imp;
static TableSingleton _inbound_table;
static TableSingleton _outbound_table;

/// Get the implementation instance.
/// @note This is done purely to allow subclasses to reuse methods in this class.
Imp &instance();
TableSingleton &inbound_instance();
TableSingleton &outbound_instance();
};

inline ConnectionTracker::Imp &
ConnectionTracker::instance()
inline ConnectionTracker::TableSingleton &
ConnectionTracker::inbound_instance()
{
return _imp;
return _inbound_table;
}

inline ConnectionTracker::Group::Group(Key const &key, std::string_view fqdn, int min_keep_alive)
: _hash(key._hash), _match_type(key._match_type), _min_keep_alive_conns(min_keep_alive), _key{_addr, _hash, _match_type}
inline ConnectionTracker::TableSingleton &
ConnectionTracker::outbound_instance()
{
// store the host name if relevant.
if (MATCH_HOST == _match_type || MATCH_BOTH == _match_type) {
_fqdn.assign(fqdn);
}
// store the IP address if relevant.
if (MATCH_HOST == _match_type) {
_addr.setToAnyAddr(AF_INET);
} else {
ats_ip_copy(_addr, key._addr);
}
return _outbound_table;
}

inline uint64_t
inline size_t
ConnectionTracker::Group::hash(const Key &key)
{
switch (key._match_type) {
Expand Down Expand Up @@ -340,11 +348,11 @@ ConnectionTracker::TxnState::release()
}
}

inline ConnectionTracker::Group *
inline std::shared_ptr<ConnectionTracker::Group>
ConnectionTracker::TxnState::drop()
{
_reserved_p = false;
return _g;
return std::move(_g);
}

inline int
Expand Down Expand Up @@ -388,33 +396,15 @@ ConnectionTracker::TxnState::blocked()
++_g->_blocked;
}

/* === Linkage === */
inline auto
ConnectionTracker::Linkage::next_ptr(value_type *value) -> value_type *&
{
return value->_next;
}

inline auto
ConnectionTracker::Linkage::prev_ptr(value_type *value) -> value_type *&
{
return value->_prev;
}

inline uint64_t
ConnectionTracker::Linkage::hash_of(key_type key)
/* === GroupMapHelper === */
inline size_t
ConnectionTracker::GroupMapHelper::operator()(key_type &key) const
{
return Group::hash(key);
}

inline auto
ConnectionTracker::Linkage::key_of(value_type *value) -> key_type
{
return value->_key;
}

inline bool
ConnectionTracker::Linkage::equal(key_type lhs, key_type rhs)
ConnectionTracker::GroupMapHelper::operator()(key_type &lhs, key_type &rhs) const
{
return Group::equal(lhs, rhs);
}
Expand Down
2 changes: 2 additions & 0 deletions include/iocore/net/NetHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler
bool add_to_active_queue(NetEvent *ne);
void remove_from_active_queue(NetEvent *ne);
static int get_additional_accepts();
static int get_per_client_max_connections_in();

/// Per process initialization logic.
static void init_for_process();
Expand Down Expand Up @@ -236,6 +237,7 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler
// accept threads are not always on a standard NET thread with a NetHandler
// that has TS_EVENT_MGMT_UPDATE handling logic.
static std::atomic<uint32_t> additional_accepts;
static std::atomic<uint32_t> per_client_max_connections_in;

void _close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count);

Expand Down
23 changes: 10 additions & 13 deletions include/proxy/PoolableSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#pragma once

#include <memory>

#include "iocore/net/ConnectionTracker.h"
#include "proxy/ProxySession.h"

Expand Down Expand Up @@ -75,7 +77,7 @@ class PoolableSession : public ProxySession
TSServerSessionSharingMatchMask sharing_match = TS_SERVER_SESSION_SHARING_MATCH_MASK_NONE;
TSServerSessionSharingPoolType sharing_pool = TS_SERVER_SESSION_SHARING_POOL_GLOBAL;

void enable_outbound_connection_tracking(ConnectionTracker::Group *group);
void enable_outbound_connection_tracking(std::shared_ptr<ConnectionTracker::Group> group);
void release_outbound_connection_tracking();

void attach_hostname(const char *hostname);
Expand All @@ -90,7 +92,7 @@ class PoolableSession : public ProxySession

// Keep track of connection limiting and a pointer to the
// singleton that keeps track of the connection counts.
ConnectionTracker::Group *conn_track_group = nullptr;
std::shared_ptr<ConnectionTracker::Group> conn_track_group;

virtual IOBufferReader *get_remote_reader() = 0;

Expand Down Expand Up @@ -210,33 +212,28 @@ PoolableSession::FQDNLinkage::equal(CryptoHash const &lhs, CryptoHash const &rhs
}

inline void
PoolableSession::enable_outbound_connection_tracking(ConnectionTracker::Group *group)
PoolableSession::enable_outbound_connection_tracking(std::shared_ptr<ConnectionTracker::Group> group)
{
ink_assert(nullptr == conn_track_group);
conn_track_group = group;
conn_track_group = std::move(group);
}

inline void
PoolableSession::release_outbound_connection_tracking()
{
// Update upstream connection tracking data if present.
if (conn_track_group) {
if (conn_track_group->_count >= 0) {
(conn_track_group->_count)--;
conn_track_group = nullptr;
} else {
// A bit dubious, as there's no guarantee it's still negative, but even that would be interesting to know.
Error("[http_ss] [%" PRId64 "] number of connections should be greater than or equal to zero: %u", con_id,
conn_track_group->_count.load());
}
conn_track_group->release();
conn_track_group.reset();
}
}

inline void
PoolableSession::attach_hostname(const char *hostname)
{
if (hostname_hash.is_zero()) {
CryptoContext().hash_immediate(hostname_hash, (unsigned char *)hostname, strlen(hostname));
CryptoContext().hash_immediate(hostname_hash, static_cast<const unsigned char *>(static_cast<const void *>(hostname)),
strlen(hostname));
}
}

Expand Down
Loading