Skip to content

Commit 8fc112b

Browse files
authored
Merge 7a64947 into f1268ad
2 parents f1268ad + 7a64947 commit 8fc112b

19 files changed

+890
-71
lines changed

src/remote/blockingUDPTransport.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock
435435
{
436436
if (IS_LOGGABLE(logLevelDebug))
437437
{
438-
LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
439-
length, _remoteName.c_str(), inetAddressToString(address).c_str());
438+
LOG(logLevelDebug, "UDP Tx (%lu) %s -> %s.",
439+
static_cast<unsigned long>(length), _remoteName.c_str(), inetAddressToString(address).c_str());
440440
}
441441

442442
int retval = sendto(_channel, buffer,
@@ -460,8 +460,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address)
460460

461461
if (IS_LOGGABLE(logLevelDebug))
462462
{
463-
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
464-
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
463+
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
464+
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(address).c_str());
465465
}
466466

467467
int retval = sendto(_channel, buffer->getBuffer(),
@@ -498,8 +498,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) {
498498

499499
if (IS_LOGGABLE(logLevelDebug))
500500
{
501-
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
502-
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
501+
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
502+
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
503503
}
504504

505505
int retval = sendto(_channel, buffer->getBuffer(),
@@ -684,7 +684,7 @@ void initializeUDPTransports(bool serverFlag,
684684
}
685685
}
686686
LOG(logLevelDebug,
687-
"Broadcast address #%zu: %s. (%sunicast)", i, inetAddressToString(list[i]).c_str(),
687+
"Broadcast address #%lu: %s. (%sunicast)", static_cast<unsigned long>(i), inetAddressToString(list[i]).c_str(),
688688
isunicast[i]?"":"not ");
689689
}
690690

@@ -714,7 +714,7 @@ void initializeUDPTransports(bool serverFlag,
714714
getSocketAddressList(ignoreAddressVector, ignoreAddressList, 0, 0);
715715

716716
//
717-
// Setup UDP trasport(s) (per interface)
717+
// Setup UDP transport(s) (per interface)
718718
//
719719

720720
InetAddrVector tappedNIF;

src/remote/channelSearchManager.cpp

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ static const int MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1;
7171
static const int MAX_FRAMES_AT_ONCE = 10;
7272
static const int DELAY_BETWEEN_FRAMES_MS = 50;
7373

74+
static const int MAX_NAME_SERVER_SEARCH_COUNT = 3;
7475

7576
ChannelSearchManager::ChannelSearchManager(Context::shared_pointer const & context) :
7677
m_context(context),
7778
m_responseAddress(), // initialized in activate()
7879
m_canceled(),
7980
m_sequenceNumber(0),
81+
m_nsSearchCounter(0),
8082
m_sendBuffer(MAX_UDP_UNFRAGMENTED_SEND),
8183
m_channels(),
8284
m_lastTimeSent(),
@@ -135,6 +137,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
135137
if (m_canceled.get())
136138
return;
137139

140+
LOG(logLevelDebug, "Registering search instance: %s", channel->getSearchInstanceName().c_str());
138141
bool immediateTrigger;
139142
{
140143
Lock guard(m_channelMutex);
@@ -154,6 +157,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
154157

155158
void ChannelSearchManager::unregisterSearchInstance(SearchInstance::shared_pointer const & channel)
156159
{
160+
LOG(logLevelDebug, "Unregistering search instance: %s", channel->getSearchInstanceName().c_str());
157161
Lock guard(m_channelMutex);
158162
pvAccessID id = channel->getSearchInstanceID();
159163
m_channels.erase(id);
@@ -180,6 +184,7 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
180184
SearchInstance::shared_pointer si(channelsIter->second.lock());
181185

182186
// remove from search list
187+
LOG(logLevelDebug, "Removing cid %d from the channel map", cid);
183188
m_channels.erase(cid);
184189

185190
guard.unlock();
@@ -188,6 +193,17 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
188193
if(si)
189194
si->searchResponse(guid, minorRevision, serverAddress);
190195
}
196+
releaseNameServerTransport();
197+
}
198+
199+
void ChannelSearchManager::releaseNameServerTransport(bool forceRelease)
200+
{
201+
if(m_channels.size() == 0 || forceRelease)
202+
{
203+
m_nsSearchCounter = 0;
204+
m_context.lock()->releaseNameServerSearchTransport(m_nsTransport);
205+
m_nsTransport.reset();
206+
}
191207
}
192208

193209
void ChannelSearchManager::newServerDetected()
@@ -196,6 +212,46 @@ void ChannelSearchManager::newServerDetected()
196212
callback();
197213
}
198214

215+
void ChannelSearchManager::send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control)
216+
{
217+
control->startMessage(CMD_SEARCH, 4+1+3+16+2+1+4+2);
218+
buffer->putInt(m_sequenceNumber);
219+
buffer->putByte((int8_t)QOS_REPLY_REQUIRED); // CAST_POSITION
220+
buffer->putByte((int8_t)0); // reserved
221+
buffer->putShort((int16_t)0); // reserved
222+
223+
osiSockAddr anyAddress;
224+
memset(&anyAddress, 0, sizeof(anyAddress));
225+
anyAddress.ia.sin_family = AF_INET;
226+
anyAddress.ia.sin_port = htons(0);
227+
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
228+
encodeAsIPv6Address(buffer, &anyAddress);
229+
buffer->putShort((int16_t)ntohs(anyAddress.ia.sin_port));
230+
buffer->putByte((int8_t)1);
231+
SerializeHelper::serializeString("tcp", buffer, control);
232+
buffer->putShort((int16_t)0); // initial channel count
233+
234+
vector<SearchInstance::shared_pointer> toSend;
235+
{
236+
Lock guard(m_channelMutex);
237+
toSend.reserve(m_channels.size());
238+
239+
for(m_channels_t::iterator channelsIter = m_channels.begin();
240+
channelsIter != m_channels.end(); channelsIter++)
241+
{
242+
SearchInstance::shared_pointer inst(channelsIter->second.lock());
243+
if(!inst) continue;
244+
toSend.push_back(inst);
245+
}
246+
}
247+
248+
vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin();
249+
for (; siter != toSend.end(); siter++)
250+
{
251+
generateSearchRequestMessage(*siter, buffer, control);
252+
}
253+
}
254+
199255
void ChannelSearchManager::initializeSendBuffer()
200256
{
201257
// for now OK, since it is only set here
@@ -236,15 +292,40 @@ void ChannelSearchManager::flushSendBuffer()
236292
{
237293
Lock guard(m_mutex);
238294

295+
// UDP transport
239296
Transport::shared_pointer tt = m_context.lock()->getSearchTransport();
240297
BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast<BlockingUDPTransport>(tt);
241298

299+
// UDP search
242300
m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x80); // unicast, no reply required
243301
ut->send(&m_sendBuffer, inetAddressType_unicast);
244302

245303
m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x00); // b/m-cast, no reply required
246304
ut->send(&m_sendBuffer, inetAddressType_broadcast_multicast);
247305

306+
// Name server transport
307+
if(m_nsTransport)
308+
{
309+
// Reset transport after max. number of attempts is reached.
310+
if (m_nsSearchCounter >= MAX_NAME_SERVER_SEARCH_COUNT)
311+
{
312+
LOG(logLevelDebug, "Resetting name server transport after %d search attempts", m_nsSearchCounter);
313+
releaseNameServerTransport(true);
314+
}
315+
}
316+
317+
if(!m_nsTransport)
318+
{
319+
m_nsTransport = m_context.lock()->getNameServerSearchTransport();
320+
}
321+
322+
// Name server search
323+
if(m_nsTransport)
324+
{
325+
m_nsSearchCounter++;
326+
LOG(logLevelDebug, "Initiating name server search for %d channels, search attempt %d", int(m_channels.size()), m_nsSearchCounter);
327+
m_nsTransport->enqueueSendRequest(shared_from_this());
328+
}
248329
initializeSendBuffer();
249330
}
250331

@@ -253,7 +334,6 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
253334
ByteBuffer* requestMessage, TransportSendControl* control)
254335
{
255336
epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
256-
257337
dataCount++;
258338

259339
/*
@@ -262,6 +342,8 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
262342
*/
263343

264344
const std::string& name(channel->getSearchInstanceName());
345+
LOG(logLevelDebug, "Searching for channel: %s", name.c_str());
346+
265347
// not nice...
266348
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
267349
if(((int)requestMessage->getRemaining()) < addedPayloadSize)

src/remote/codec.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,8 +1608,8 @@ void BlockingServerTCPTransportCodec::destroyAllChannels() {
16081608
{
16091609
LOG(
16101610
logLevelDebug,
1611-
"Transport to %s still has %zu channel(s) active and closing...",
1612-
_socketName.c_str(), _channels.size());
1611+
"Transport to %s still has %lu channel(s) active and closing...",
1612+
_socketName.c_str(), static_cast<unsigned long>(_channels.size()));
16131613
}
16141614

16151615
_channels_t temp;
@@ -1761,7 +1761,7 @@ bool BlockingClientTCPTransportCodec::acquire(ClientChannelImpl::shared_pointer
17611761

17621762
if (IS_LOGGABLE(logLevelDebug))
17631763
{
1764-
LOG(logLevelDebug, "Acquiring transport to %s.", _socketName.c_str());
1764+
LOG(logLevelDebug, "Acquiring transport to %s for channel cid %d.", _socketName.c_str(), client->getID());
17651765
}
17661766

17671767
_owners[client->getID()] = ClientChannelImpl::weak_pointer(client);
@@ -1789,8 +1789,8 @@ void BlockingClientTCPTransportCodec::internalClose() {
17891789
{
17901790
LOG(
17911791
logLevelDebug,
1792-
"Transport to %s still has %zu client(s) active and closing...",
1793-
_socketName.c_str(), refs);
1792+
"Transport to %s still has %lu client(s) active and closing...",
1793+
_socketName.c_str(), static_cast<unsigned long>(refs));
17941794
}
17951795

17961796
TransportClientMap_t::iterator it = _owners.begin();
@@ -1828,6 +1828,11 @@ void BlockingClientTCPTransportCodec::release(pvAccessID clientID) {
18281828
}
18291829
}
18301830

1831+
bool BlockingClientTCPTransportCodec::isUsed()
1832+
{
1833+
return (_owners.size() > 0);
1834+
}
1835+
18311836
void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer,
18321837
TransportSendControl* control)
18331838
{

src/remote/pv/channelSearchManager.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class SearchInstance {
5353

5454
class ChannelSearchManager :
5555
public epics::pvData::TimerCallback,
56+
public TransportSender,
5657
public std::tr1::enable_shared_from_this<ChannelSearchManager>
5758
{
5859
public:
@@ -99,13 +100,19 @@ class ChannelSearchManager :
99100
/// Timer stooped callback.
100101
virtual void timerStopped() OVERRIDE FINAL;
101102

103+
// Transport sender interface.
104+
virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL;
105+
102106
/**
103107
* Private constructor.
104108
* @param context
105109
*/
106110
ChannelSearchManager(Context::shared_pointer const & context);
107111
void activate();
108112

113+
// Releases name server transport.
114+
void releaseNameServerTransport(bool forceRelease=false);
115+
109116
private:
110117

111118
bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush);
@@ -140,6 +147,16 @@ class ChannelSearchManager :
140147
*/
141148
int32_t m_sequenceNumber;
142149

150+
/**
151+
* Name server search attempt counter.
152+
*/
153+
int m_nsSearchCounter;
154+
155+
/**
156+
* Name server transport
157+
*/
158+
Transport::shared_pointer m_nsTransport;
159+
143160
/**
144161
* Send byte buffer (frame)
145162
*/

src/remote/pv/codec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,8 @@ class BlockingClientTCPTransportCodec :
612612

613613
virtual void release(pvAccessID clientId) OVERRIDE FINAL;
614614

615+
virtual bool isUsed() OVERRIDE FINAL;
616+
615617
virtual void send(epics::pvData::ByteBuffer* buffer,
616618
TransportSendControl* control) OVERRIDE FINAL;
617619

src/remote/pv/remote.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include <osiSock.h>
1919

2020
#include <pv/serialize.h>
21-
#include <pv/pvType.h>
2221
#include <pv/byteBuffer.h>
2322
#include <pv/timer.h>
2423
#include <pv/pvData.h>
@@ -185,6 +184,11 @@ class epicsShareClass Transport : public epics::pvData::DeserializableControl {
185184
*/
186185
virtual void release(pvAccessID clientId) = 0;
187186

187+
/**
188+
* Is transport used
189+
*/
190+
virtual bool isUsed() {return false;}
191+
188192
/**
189193
* Get protocol type (tcp, udp, ssl, etc.).
190194
* @return protocol type.
@@ -305,6 +309,8 @@ class Context {
305309

306310
virtual std::tr1::shared_ptr<Channel> getChannel(pvAccessID id) = 0;
307311
virtual Transport::shared_pointer getSearchTransport() = 0;
312+
virtual Transport::shared_pointer getNameServerSearchTransport() {return Transport::shared_pointer();}
313+
virtual void releaseNameServerSearchTransport(const Transport::shared_pointer& nsTransport = Transport::shared_pointer()) {}
308314
};
309315

310316
/**

src/remote/transportRegistry.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void TransportRegistry::clear()
127127
if(temp.empty())
128128
return;
129129

130-
LOG(logLevelDebug, "Context still has %zu transport(s) active and closing...", temp.size());
130+
LOG(logLevelDebug, "Context still has %lu transport(s) active and closing...", static_cast<unsigned long>(temp.size()));
131131

132132
for(transports_t::iterator it(temp.begin()), end(temp.end());
133133
it != end; ++it)

0 commit comments

Comments
 (0)