Commit cd053b9e authored by Patrick Reh's avatar Patrick Reh

client: add an option to limit concurrent connection attempts

This is made especially to counter problems with RDMA over omni-path. It turned out that
rdma_route_resolve() fails sometimes when there are massive amounts of connections opened
at the same time (in our case the problem starts when opening several hundred connections at
once, for example when using a benchmarking tool). This is kind of nondeterministic, so it doesn't
happen all the time.

The introduced option limits the amount of parallel connection attempts so this doesn't happen
anymore. There is no recommended value  since it's hard to find out where it stops working.
A value of 10 to 100 seems to be pretty good though, the connection process even seems to be faster.

(cherry picked from commit 7ab3302c4447d1110ab30dc1dbf178e5855212f4)
parent 2e32a79f
......@@ -32,6 +32,7 @@ connCommRetrySecs = 600
connFallbackExpirationSecs = 900
connInterfacesFile =
connMaxInternodeNum = 12
connMaxConcurrentAttempts = 0
connNetFilterFile =
connUseRDMA = true
......@@ -166,6 +167,16 @@ sysACLsEnabled = false
# The maximum number of simultaneous connections to the same node.
# Default: 12
# [connMaxConcurrentAttempts]
# The maximum number of simultaneous connection attempts. This may help in case
# establishing new connections keeps failing and produces fallbacks.
# It may happen particularly when using RDMA in an Omni-Path setup. If you
# don't have failing connection attempts, tuning this option might still lead
# to a faster connection process. This option is experimental, so there is no
# experience with different values. Setting it to 0 disables it, which means
# concurrent connection attempts are not limited.
# Default: 0
# [connNetFilterFile]
# The path to a text file that specifies allowed IP subnets, which may be used
# for outgoing communication. One subnet per line in classless notation (IP
......
......@@ -204,6 +204,7 @@ void _Config_loadDefaults(Config* this)
_Config_configMapRedefine(this, "connRDMABufNum", "70");
_Config_configMapRedefine(this, "connRDMATypeOfService", "0");
_Config_configMapRedefine(this, "connNetFilterFile", "");
_Config_configMapRedefine(this, "connMaxConcurrentAttempts", "0");
_Config_configMapRedefine(this, "connAuthFile", "");
_Config_configMapRedefine(this, "connRecvNonIntrTimeoutMS", "5000");
_Config_configMapRedefine(this, "connTcpOnlyFilterFile", "");
......@@ -366,6 +367,11 @@ bool _Config_applyConfigMap(Config* this, bool enableException)
this->connNetFilterFile = StringTk_strDup(valueStr);
}
else
if (!strcmp(keyStr, "connMaxConcurrentAttempts"))
{
this->connMaxConcurrentAttempts = StringTk_strToUInt(valueStr);
}
else
if(!strcmp(keyStr, "connAuthFile") )
{
SAFE_KFREE(this->connAuthFile);
......
......@@ -84,6 +84,7 @@ static inline unsigned Config_getConnRDMABufNum(Config* this);
static inline int Config_getConnRDMATypeOfService(Config* this);
static inline char* Config_getConnNetFilterFile(Config* this);
static inline char* Config_getConnAuthFile(Config* this);
static inline unsigned Config_getConnMaxConcurrentAttempts(Config* this);
static inline uint64_t Config_getConnAuthHash(Config* this);
static inline unsigned Config_getConnRecvNonIntrTimeoutMS(Config* this);
static inline char* Config_getConnTcpOnlyFilterFile(Config* this);
......@@ -185,6 +186,7 @@ struct Config
unsigned connRDMABufNum;
int connRDMATypeOfService;
char* connNetFilterFile; // allowed IP addresses (all IPs allowed, if empty)
unsigned connMaxConcurrentAttempts;
char* connAuthFile;
uint64_t connAuthHash; // implicitly set based on hash of connAuthFile contents
unsigned connRecvNonIntrTimeoutMS; // timeout before allowing interuptions, e.g. SIGINT
......@@ -378,6 +380,11 @@ char* Config_getConnAuthFile(Config* this)
return this->connAuthFile;
}
unsigned Config_getConnMaxConcurrentAttempts(Config* this)
{
return this->connMaxConcurrentAttempts;
}
uint64_t Config_getConnAuthHash(Config* this)
{
return this->connAuthHash;
......
......@@ -39,6 +39,9 @@ void NodeConnPool_init(NodeConnPool* this, struct App* app, struct Node* parentN
this->maxConns = Config_getConnMaxInternodeNum(cfg);
this->fallbackExpirationSecs = Config_getConnFallbackExpirationSecs(cfg);
this->maxConcurrentAttempts = Config_getConnMaxConcurrentAttempts(cfg);
sema_init(&this->connSemaphore, this->maxConcurrentAttempts);
this->parentNode = parentNode;
this->streamPort = streamPort;
......@@ -191,6 +194,12 @@ Socket* NodeConnPool_acquireStreamSocketEx(NodeConnPool* this, bool allowWaiting
Mutex_unlock(&this->mutex); // U N L O C K
if (this->maxConcurrentAttempts > 0)
{
if (down_interruptible(&this->connSemaphore))
return NULL;
}
// walk over all available NICs, create the corresponding socket and try to connect
NicAddressListIter_init(&nicIter, &nicListCopy);
......@@ -314,6 +323,12 @@ Socket* NodeConnPool_acquireStreamSocketEx(NodeConnPool* this, bool allowWaiting
kfree(endpointStr);
}
if (this->maxConcurrentAttempts > 0)
{
up(&this->connSemaphore);
}
Mutex_lock(&this->mutex); // L O C K
if(!NicAddressListIter_end(&nicIter) )
......
......@@ -117,7 +117,8 @@ struct NodeConnPool
unsigned establishedConns; // not equal to connList.size!!
unsigned maxConns;
unsigned fallbackExpirationSecs; // expiration time for conns to fallback interfaces
unsigned maxConcurrentAttempts;
NodeConnPoolStats stats;
NodeConnPoolErrorState errState;
......@@ -125,6 +126,7 @@ struct NodeConnPool
Mutex mutex;
Condition changeCond;
struct semaphore connSemaphore;
};
......
......@@ -166,6 +166,7 @@ void Config::applyConfigMap(bool enableException, bool addDashes) throw (Invalid
if(testConfigMapKeyMatch(iter, "tuneDentryCacheSize", addDashes) )
tuneDentryCacheSize = StringTk::strToUInt64(iter->second.c_str() );
else
IGNORE_CONFIG_CLIENT_VALUE("connMaxConcurrentAttempts")
IGNORE_CONFIG_CLIENT_VALUE("tuneFileCacheType")
IGNORE_CONFIG_CLIENT_VALUE("tunePagedIOBufSize")
IGNORE_CONFIG_CLIENT_VALUE("tunePagedIOBufNum")
......
......@@ -226,6 +226,7 @@ void Config::applyConfigMap(bool enableException, bool addDashes) throw(InvalidC
if(testConfigMapKeyMatch(iter, "tuneNumWorkers", addDashes) )
tuneNumWorkers = StringTk::strToUInt(iter->second);
else
IGNORE_CONFIG_CLIENT_VALUE("connMaxConcurrentAttempts")
IGNORE_CONFIG_CLIENT_VALUE("tuneNumRetryWorkers")
if(testConfigMapKeyMatch(iter, "tunePreferredMetaFile", addDashes) )
tunePreferredMetaFile = iter->second;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment