Skip to content

Commit a6f58f8

Browse files
authored
Merge 0a1f8f6 into f1268ad
2 parents f1268ad + 0a1f8f6 commit a6f58f8

19 files changed

+913
-71
lines changed

src/remote/blockingUDPTransport.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock
435435
{
436436
if (IS_LOGGABLE(logLevelDebug))
437437
{
438-
LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
439-
length, _remoteName.c_str(), inetAddressToString(address).c_str());
438+
LOG(logLevelDebug, "UDP Tx (%lu) %s -> %s.",
439+
static_cast<unsigned long>(length), _remoteName.c_str(), inetAddressToString(address).c_str());
440440
}
441441

442442
int retval = sendto(_channel, buffer,
@@ -460,8 +460,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address)
460460

461461
if (IS_LOGGABLE(logLevelDebug))
462462
{
463-
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
464-
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
463+
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
464+
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(address).c_str());
465465
}
466466

467467
int retval = sendto(_channel, buffer->getBuffer(),
@@ -498,8 +498,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) {
498498

499499
if (IS_LOGGABLE(logLevelDebug))
500500
{
501-
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
502-
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
501+
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
502+
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
503503
}
504504

505505
int retval = sendto(_channel, buffer->getBuffer(),
@@ -684,7 +684,7 @@ void initializeUDPTransports(bool serverFlag,
684684
}
685685
}
686686
LOG(logLevelDebug,
687-
"Broadcast address #%zu: %s. (%sunicast)", i, inetAddressToString(list[i]).c_str(),
687+
"Broadcast address #%lu: %s. (%sunicast)", static_cast<unsigned long>(i), inetAddressToString(list[i]).c_str(),
688688
isunicast[i]?"":"not ");
689689
}
690690

@@ -714,7 +714,7 @@ void initializeUDPTransports(bool serverFlag,
714714
getSocketAddressList(ignoreAddressVector, ignoreAddressList, 0, 0);
715715

716716
//
717-
// Setup UDP trasport(s) (per interface)
717+
// Setup UDP transport(s) (per interface)
718718
//
719719

720720
InetAddrVector tappedNIF;

src/remote/channelSearchManager.cpp

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ static const int MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1;
7171
static const int MAX_FRAMES_AT_ONCE = 10;
7272
static const int DELAY_BETWEEN_FRAMES_MS = 50;
7373

74+
static const int MAX_NAME_SERVER_SEARCH_COUNT = 3;
7475

7576
ChannelSearchManager::ChannelSearchManager(Context::shared_pointer const & context) :
7677
m_context(context),
7778
m_responseAddress(), // initialized in activate()
7879
m_canceled(),
7980
m_sequenceNumber(0),
81+
m_nsSearchCounter(0),
8082
m_sendBuffer(MAX_UDP_UNFRAGMENTED_SEND),
8183
m_channels(),
8284
m_lastTimeSent(),
@@ -135,6 +137,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
135137
if (m_canceled.get())
136138
return;
137139

140+
LOG(logLevelDebug, "Registering search instance: %s", channel->getSearchInstanceName().c_str());
138141
bool immediateTrigger;
139142
{
140143
Lock guard(m_channelMutex);
@@ -154,6 +157,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
154157

155158
void ChannelSearchManager::unregisterSearchInstance(SearchInstance::shared_pointer const & channel)
156159
{
160+
LOG(logLevelDebug, "Unregistering search instance: %s", channel->getSearchInstanceName().c_str());
157161
Lock guard(m_channelMutex);
158162
pvAccessID id = channel->getSearchInstanceID();
159163
m_channels.erase(id);
@@ -180,6 +184,7 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
180184
SearchInstance::shared_pointer si(channelsIter->second.lock());
181185

182186
// remove from search list
187+
LOG(logLevelDebug, "Removing cid %d from the channel map", cid);
183188
m_channels.erase(cid);
184189

185190
guard.unlock();
@@ -188,6 +193,30 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
188193
if(si)
189194
si->searchResponse(guid, minorRevision, serverAddress);
190195
}
196+
// Release all NS connections if there are
197+
// no more channels to search for
198+
releaseNameServerTransport();
199+
}
200+
201+
void ChannelSearchManager::releaseNameServerTransport(bool forceRelease)
202+
{
203+
bool releaseAllConnections = true;
204+
if(m_channels.size() == 0)
205+
{
206+
// No more channels to search for, release all connections
207+
m_nsSearchCounter = 0;
208+
m_context.lock()->releaseNameServerSearchTransport(m_nsTransport, releaseAllConnections);
209+
m_nsTransport.reset();
210+
}
211+
else if(forceRelease)
212+
{
213+
// There are channels to search for, release only connection
214+
// that is currently used
215+
releaseAllConnections = false;
216+
m_nsSearchCounter = 0;
217+
m_context.lock()->releaseNameServerSearchTransport(m_nsTransport, releaseAllConnections);
218+
m_nsTransport.reset();
219+
}
191220
}
192221

193222
void ChannelSearchManager::newServerDetected()
@@ -196,6 +225,46 @@ void ChannelSearchManager::newServerDetected()
196225
callback();
197226
}
198227

228+
void ChannelSearchManager::send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control)
229+
{
230+
control->startMessage(CMD_SEARCH, 4+1+3+16+2+1+4+2);
231+
buffer->putInt(m_sequenceNumber);
232+
buffer->putByte((int8_t)QOS_REPLY_REQUIRED); // CAST_POSITION
233+
buffer->putByte((int8_t)0); // reserved
234+
buffer->putShort((int16_t)0); // reserved
235+
236+
osiSockAddr anyAddress;
237+
memset(&anyAddress, 0, sizeof(anyAddress));
238+
anyAddress.ia.sin_family = AF_INET;
239+
anyAddress.ia.sin_port = htons(0);
240+
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
241+
encodeAsIPv6Address(buffer, &anyAddress);
242+
buffer->putShort((int16_t)ntohs(anyAddress.ia.sin_port));
243+
buffer->putByte((int8_t)1);
244+
SerializeHelper::serializeString("tcp", buffer, control);
245+
buffer->putShort((int16_t)0); // initial channel count
246+
247+
vector<SearchInstance::shared_pointer> toSend;
248+
{
249+
Lock guard(m_channelMutex);
250+
toSend.reserve(m_channels.size());
251+
252+
for(m_channels_t::iterator channelsIter = m_channels.begin();
253+
channelsIter != m_channels.end(); channelsIter++)
254+
{
255+
SearchInstance::shared_pointer inst(channelsIter->second.lock());
256+
if(!inst) continue;
257+
toSend.push_back(inst);
258+
}
259+
}
260+
261+
vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin();
262+
for (; siter != toSend.end(); siter++)
263+
{
264+
generateSearchRequestMessage(*siter, buffer, control);
265+
}
266+
}
267+
199268
void ChannelSearchManager::initializeSendBuffer()
200269
{
201270
// for now OK, since it is only set here
@@ -236,15 +305,41 @@ void ChannelSearchManager::flushSendBuffer()
236305
{
237306
Lock guard(m_mutex);
238307

308+
// UDP transport
239309
Transport::shared_pointer tt = m_context.lock()->getSearchTransport();
240310
BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast<BlockingUDPTransport>(tt);
241311

312+
// UDP search
242313
m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x80); // unicast, no reply required
243314
ut->send(&m_sendBuffer, inetAddressType_unicast);
244315

245316
m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x00); // b/m-cast, no reply required
246317
ut->send(&m_sendBuffer, inetAddressType_broadcast_multicast);
247318

319+
// Name server transport
320+
if(m_nsTransport)
321+
{
322+
// Reset transport (current connection only)
323+
// after max. number of attempts is reached.
324+
if (m_nsSearchCounter >= MAX_NAME_SERVER_SEARCH_COUNT)
325+
{
326+
LOG(logLevelDebug, "Resetting name server transport after %d search attempts", m_nsSearchCounter);
327+
releaseNameServerTransport(true);
328+
}
329+
}
330+
331+
if(!m_nsTransport)
332+
{
333+
m_nsTransport = m_context.lock()->getNameServerSearchTransport();
334+
}
335+
336+
// Name server search
337+
if(m_nsTransport)
338+
{
339+
m_nsSearchCounter++;
340+
LOG(logLevelDebug, "Initiating name server search for %d channels, search attempt %d", int(m_channels.size()), m_nsSearchCounter);
341+
m_nsTransport->enqueueSendRequest(shared_from_this());
342+
}
248343
initializeSendBuffer();
249344
}
250345

@@ -253,7 +348,6 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
253348
ByteBuffer* requestMessage, TransportSendControl* control)
254349
{
255350
epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
256-
257351
dataCount++;
258352

259353
/*
@@ -262,6 +356,8 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
262356
*/
263357

264358
const std::string& name(channel->getSearchInstanceName());
359+
LOG(logLevelDebug, "Searching for channel: %s", name.c_str());
360+
265361
// not nice...
266362
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
267363
if(((int)requestMessage->getRemaining()) < addedPayloadSize)

src/remote/codec.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,8 +1608,8 @@ void BlockingServerTCPTransportCodec::destroyAllChannels() {
16081608
{
16091609
LOG(
16101610
logLevelDebug,
1611-
"Transport to %s still has %zu channel(s) active and closing...",
1612-
_socketName.c_str(), _channels.size());
1611+
"Transport to %s still has %lu channel(s) active and closing...",
1612+
_socketName.c_str(), static_cast<unsigned long>(_channels.size()));
16131613
}
16141614

16151615
_channels_t temp;
@@ -1761,7 +1761,7 @@ bool BlockingClientTCPTransportCodec::acquire(ClientChannelImpl::shared_pointer
17611761

17621762
if (IS_LOGGABLE(logLevelDebug))
17631763
{
1764-
LOG(logLevelDebug, "Acquiring transport to %s.", _socketName.c_str());
1764+
LOG(logLevelDebug, "Acquiring transport to %s for channel cid %d.", _socketName.c_str(), client->getID());
17651765
}
17661766

17671767
_owners[client->getID()] = ClientChannelImpl::weak_pointer(client);
@@ -1789,8 +1789,8 @@ void BlockingClientTCPTransportCodec::internalClose() {
17891789
{
17901790
LOG(
17911791
logLevelDebug,
1792-
"Transport to %s still has %zu client(s) active and closing...",
1793-
_socketName.c_str(), refs);
1792+
"Transport to %s still has %lu client(s) active and closing...",
1793+
_socketName.c_str(), static_cast<unsigned long>(refs));
17941794
}
17951795

17961796
TransportClientMap_t::iterator it = _owners.begin();
@@ -1828,6 +1828,11 @@ void BlockingClientTCPTransportCodec::release(pvAccessID clientID) {
18281828
}
18291829
}
18301830

1831+
bool BlockingClientTCPTransportCodec::isUsed()
1832+
{
1833+
return (_owners.size() > 0);
1834+
}
1835+
18311836
void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer,
18321837
TransportSendControl* control)
18331838
{

src/remote/pv/channelSearchManager.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class SearchInstance {
5353

5454
class ChannelSearchManager :
5555
public epics::pvData::TimerCallback,
56+
public TransportSender,
5657
public std::tr1::enable_shared_from_this<ChannelSearchManager>
5758
{
5859
public:
@@ -99,13 +100,19 @@ class ChannelSearchManager :
99100
/// Timer stooped callback.
100101
virtual void timerStopped() OVERRIDE FINAL;
101102

103+
// Transport sender interface.
104+
virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL;
105+
102106
/**
103107
* Private constructor.
104108
* @param context
105109
*/
106110
ChannelSearchManager(Context::shared_pointer const & context);
107111
void activate();
108112

113+
// Releases name server transport.
114+
void releaseNameServerTransport(bool forceRelease=false);
115+
109116
private:
110117

111118
bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush);
@@ -140,6 +147,16 @@ class ChannelSearchManager :
140147
*/
141148
int32_t m_sequenceNumber;
142149

150+
/**
151+
* Name server search attempt counter.
152+
*/
153+
int m_nsSearchCounter;
154+
155+
/**
156+
* Name server transport
157+
*/
158+
Transport::shared_pointer m_nsTransport;
159+
143160
/**
144161
* Send byte buffer (frame)
145162
*/

src/remote/pv/codec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,8 @@ class BlockingClientTCPTransportCodec :
612612

613613
virtual void release(pvAccessID clientId) OVERRIDE FINAL;
614614

615+
virtual bool isUsed() OVERRIDE FINAL;
616+
615617
virtual void send(epics::pvData::ByteBuffer* buffer,
616618
TransportSendControl* control) OVERRIDE FINAL;
617619

src/remote/pv/remote.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include <osiSock.h>
1919

2020
#include <pv/serialize.h>
21-
#include <pv/pvType.h>
2221
#include <pv/byteBuffer.h>
2322
#include <pv/timer.h>
2423
#include <pv/pvData.h>
@@ -185,6 +184,11 @@ class epicsShareClass Transport : public epics::pvData::DeserializableControl {
185184
*/
186185
virtual void release(pvAccessID clientId) = 0;
187186

187+
/**
188+
* Is transport used
189+
*/
190+
virtual bool isUsed() {return false;}
191+
188192
/**
189193
* Get protocol type (tcp, udp, ssl, etc.).
190194
* @return protocol type.
@@ -305,6 +309,8 @@ class Context {
305309

306310
virtual std::tr1::shared_ptr<Channel> getChannel(pvAccessID id) = 0;
307311
virtual Transport::shared_pointer getSearchTransport() = 0;
312+
virtual Transport::shared_pointer getNameServerSearchTransport() {return Transport::shared_pointer();}
313+
virtual void releaseNameServerSearchTransport(const Transport::shared_pointer& nsTransport=Transport::shared_pointer(), bool releaseAllConnections=true) {}
308314
};
309315

310316
/**

src/remote/transportRegistry.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void TransportRegistry::clear()
127127
if(temp.empty())
128128
return;
129129

130-
LOG(logLevelDebug, "Context still has %zu transport(s) active and closing...", temp.size());
130+
LOG(logLevelDebug, "Context still has %lu transport(s) active and closing...", static_cast<unsigned long>(temp.size()));
131131

132132
for(transports_t::iterator it(temp.begin()), end(temp.end());
133133
it != end; ++it)

0 commit comments

Comments
 (0)