Skip to content

Commit 9d79304

Browse files
authored
Merge 5f001f5 into f1268ad
2 parents f1268ad + 5f001f5 commit 9d79304

25 files changed

+940
-83
lines changed

src/remote/blockingUDPTransport.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock
435435
{
436436
if (IS_LOGGABLE(logLevelDebug))
437437
{
438-
LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
439-
length, _remoteName.c_str(), inetAddressToString(address).c_str());
438+
LOG(logLevelDebug, "UDP Tx (%lu) %s -> %s.",
439+
static_cast<unsigned long>(length), _remoteName.c_str(), inetAddressToString(address).c_str());
440440
}
441441

442442
int retval = sendto(_channel, buffer,
@@ -460,8 +460,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address)
460460

461461
if (IS_LOGGABLE(logLevelDebug))
462462
{
463-
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
464-
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
463+
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
464+
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(address).c_str());
465465
}
466466

467467
int retval = sendto(_channel, buffer->getBuffer(),
@@ -498,8 +498,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) {
498498

499499
if (IS_LOGGABLE(logLevelDebug))
500500
{
501-
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
502-
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
501+
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
502+
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
503503
}
504504

505505
int retval = sendto(_channel, buffer->getBuffer(),
@@ -684,7 +684,7 @@ void initializeUDPTransports(bool serverFlag,
684684
}
685685
}
686686
LOG(logLevelDebug,
687-
"Broadcast address #%zu: %s. (%sunicast)", i, inetAddressToString(list[i]).c_str(),
687+
"Broadcast address #%lu: %s. (%sunicast)", static_cast<unsigned long>(i), inetAddressToString(list[i]).c_str(),
688688
isunicast[i]?"":"not ");
689689
}
690690

@@ -714,7 +714,7 @@ void initializeUDPTransports(bool serverFlag,
714714
getSocketAddressList(ignoreAddressVector, ignoreAddressList, 0, 0);
715715

716716
//
717-
// Setup UDP trasport(s) (per interface)
717+
// Setup UDP transport(s) (per interface)
718718
//
719719

720720
InetAddrVector tappedNIF;

src/remote/channelSearchManager.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
135135
if (m_canceled.get())
136136
return;
137137

138+
LOG(logLevelDebug, "Registering search instance: %s", channel->getSearchInstanceName().c_str());
138139
bool immediateTrigger;
139140
{
140141
Lock guard(m_channelMutex);
@@ -154,6 +155,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
154155

155156
void ChannelSearchManager::unregisterSearchInstance(SearchInstance::shared_pointer const & channel)
156157
{
158+
LOG(logLevelDebug, "Unregistering search instance: %s", channel->getSearchInstanceName().c_str());
157159
Lock guard(m_channelMutex);
158160
pvAccessID id = channel->getSearchInstanceID();
159161
m_channels.erase(id);
@@ -180,6 +182,7 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
180182
SearchInstance::shared_pointer si(channelsIter->second.lock());
181183

182184
// remove from search list
185+
LOG(logLevelDebug, "Removing cid %d from the channel map", cid);
183186
m_channels.erase(cid);
184187

185188
guard.unlock();
@@ -188,6 +191,15 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
188191
if(si)
189192
si->searchResponse(guid, minorRevision, serverAddress);
190193
}
194+
releaseNameServerTransport();
195+
}
196+
197+
void ChannelSearchManager::releaseNameServerTransport()
198+
{
199+
if(m_channels.size() == 0)
200+
{
201+
m_context.lock()->releaseNameServerSearchTransport();
202+
}
191203
}
192204

193205
void ChannelSearchManager::newServerDetected()
@@ -196,6 +208,46 @@ void ChannelSearchManager::newServerDetected()
196208
callback();
197209
}
198210

211+
void ChannelSearchManager::send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control)
212+
{
213+
control->startMessage(CMD_SEARCH, 4+1+3+16+2+1+4+2);
214+
buffer->putInt(m_sequenceNumber);
215+
buffer->putByte((int8_t)QOS_REPLY_REQUIRED); // CAST_POSITION
216+
buffer->putByte((int8_t)0); // reserved
217+
buffer->putShort((int16_t)0); // reserved
218+
219+
osiSockAddr anyAddress;
220+
memset(&anyAddress, 0, sizeof(anyAddress));
221+
anyAddress.ia.sin_family = AF_INET;
222+
anyAddress.ia.sin_port = htons(0);
223+
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
224+
encodeAsIPv6Address(buffer, &anyAddress);
225+
buffer->putShort((int16_t)ntohs(anyAddress.ia.sin_port));
226+
buffer->putByte((int8_t)1);
227+
SerializeHelper::serializeString("tcp", buffer, control);
228+
buffer->putShort((int16_t)0); // initial channel count
229+
230+
vector<SearchInstance::shared_pointer> toSend;
231+
{
232+
Lock guard(m_channelMutex);
233+
toSend.reserve(m_channels.size());
234+
235+
for(m_channels_t::iterator channelsIter = m_channels.begin();
236+
channelsIter != m_channels.end(); channelsIter++)
237+
{
238+
SearchInstance::shared_pointer inst(channelsIter->second.lock());
239+
if(!inst) continue;
240+
toSend.push_back(inst);
241+
}
242+
}
243+
244+
vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin();
245+
for (; siter != toSend.end(); siter++)
246+
{
247+
generateSearchRequestMessage(*siter, buffer, control);
248+
}
249+
}
250+
199251
void ChannelSearchManager::initializeSendBuffer()
200252
{
201253
// for now OK, since it is only set here
@@ -245,6 +297,13 @@ void ChannelSearchManager::flushSendBuffer()
245297
m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x00); // b/m-cast, no reply required
246298
ut->send(&m_sendBuffer, inetAddressType_broadcast_multicast);
247299

300+
// Name server search
301+
Transport::shared_pointer nsTransport = m_context.lock()->getNameServerSearchTransport();
302+
if(nsTransport)
303+
{
304+
LOG(logLevelDebug, "Initiating name server search for %d channels", int(m_channels.size()));
305+
nsTransport->enqueueSendRequest(shared_from_this());
306+
}
248307
initializeSendBuffer();
249308
}
250309

@@ -253,7 +312,6 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
253312
ByteBuffer* requestMessage, TransportSendControl* control)
254313
{
255314
epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
256-
257315
dataCount++;
258316

259317
/*
@@ -262,6 +320,8 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
262320
*/
263321

264322
const std::string& name(channel->getSearchInstanceName());
323+
LOG(logLevelDebug, "Searching for channel: %s", name.c_str());
324+
265325
// not nice...
266326
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
267327
if(((int)requestMessage->getRemaining()) < addedPayloadSize)

src/remote/codec.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,8 +1608,8 @@ void BlockingServerTCPTransportCodec::destroyAllChannels() {
16081608
{
16091609
LOG(
16101610
logLevelDebug,
1611-
"Transport to %s still has %zu channel(s) active and closing...",
1612-
_socketName.c_str(), _channels.size());
1611+
"Transport to %s still has %lu channel(s) active and closing...",
1612+
_socketName.c_str(), static_cast<unsigned long>(_channels.size()));
16131613
}
16141614

16151615
_channels_t temp;
@@ -1761,7 +1761,7 @@ bool BlockingClientTCPTransportCodec::acquire(ClientChannelImpl::shared_pointer
17611761

17621762
if (IS_LOGGABLE(logLevelDebug))
17631763
{
1764-
LOG(logLevelDebug, "Acquiring transport to %s.", _socketName.c_str());
1764+
LOG(logLevelDebug, "Acquiring transport to %s for channel cid %d.", _socketName.c_str(), client->getID());
17651765
}
17661766

17671767
_owners[client->getID()] = ClientChannelImpl::weak_pointer(client);
@@ -1789,8 +1789,8 @@ void BlockingClientTCPTransportCodec::internalClose() {
17891789
{
17901790
LOG(
17911791
logLevelDebug,
1792-
"Transport to %s still has %zu client(s) active and closing...",
1793-
_socketName.c_str(), refs);
1792+
"Transport to %s still has %lu client(s) active and closing...",
1793+
_socketName.c_str(), static_cast<unsigned long>(refs));
17941794
}
17951795

17961796
TransportClientMap_t::iterator it = _owners.begin();
@@ -1828,6 +1828,11 @@ void BlockingClientTCPTransportCodec::release(pvAccessID clientID) {
18281828
}
18291829
}
18301830

1831+
bool BlockingClientTCPTransportCodec::isUsed()
1832+
{
1833+
return (_owners.size() > 0);
1834+
}
1835+
18311836
void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer,
18321837
TransportSendControl* control)
18331838
{

src/remote/pv/channelSearchManager.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class SearchInstance {
5353

5454
class ChannelSearchManager :
5555
public epics::pvData::TimerCallback,
56+
public TransportSender,
5657
public std::tr1::enable_shared_from_this<ChannelSearchManager>
5758
{
5859
public:
@@ -99,13 +100,19 @@ class ChannelSearchManager :
99100
/// Timer stooped callback.
100101
virtual void timerStopped() OVERRIDE FINAL;
101102

103+
// Transport sender interface.
104+
virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL;
105+
102106
/**
103107
* Private constructor.
104108
* @param context
105109
*/
106110
ChannelSearchManager(Context::shared_pointer const & context);
107111
void activate();
108112

113+
// Releases name server transport.
114+
void releaseNameServerTransport();
115+
109116
private:
110117

111118
bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush);

src/remote/pv/codec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,8 @@ class BlockingClientTCPTransportCodec :
612612

613613
virtual void release(pvAccessID clientId) OVERRIDE FINAL;
614614

615+
virtual bool isUsed() OVERRIDE FINAL;
616+
615617
virtual void send(epics::pvData::ByteBuffer* buffer,
616618
TransportSendControl* control) OVERRIDE FINAL;
617619

src/remote/pv/remote.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
#include <osiSock.h>
1919

2020
#include <pv/serialize.h>
21-
#include <pv/pvType.h>
2221
#include <pv/byteBuffer.h>
2322
#include <pv/timer.h>
2423
#include <pv/pvData.h>
2524
#include <pv/sharedPtr.h>
25+
#include <pv/pvaDefs.h>
2626

2727
#ifdef remoteEpicsExportSharedSymbols
2828
# define epicsExportSharedSymbols
@@ -32,7 +32,6 @@
3232
#include <pv/pvaConstants.h>
3333
#include <pv/configuration.h>
3434
#include <pv/fairQueue.h>
35-
#include <pv/pvaDefs.h>
3635

3736
/// TODO only here because of the Lockable
3837
#include <pv/pvAccess.h>
@@ -185,6 +184,11 @@ class epicsShareClass Transport : public epics::pvData::DeserializableControl {
185184
*/
186185
virtual void release(pvAccessID clientId) = 0;
187186

187+
/**
188+
* Is transport used
189+
*/
190+
virtual bool isUsed() {return false;}
191+
188192
/**
189193
* Get protocol type (tcp, udp, ssl, etc.).
190194
* @return protocol type.
@@ -278,6 +282,7 @@ class epicsShareClass Transport : public epics::pvData::DeserializableControl {
278282
class Channel;
279283
class SecurityPlugin;
280284
class AuthenticationRegistry;
285+
class Configuration;
281286

282287
/**
283288
* Not public IF, used by Transports, etc.
@@ -305,6 +310,8 @@ class Context {
305310

306311
virtual std::tr1::shared_ptr<Channel> getChannel(pvAccessID id) = 0;
307312
virtual Transport::shared_pointer getSearchTransport() = 0;
313+
virtual Transport::shared_pointer getNameServerSearchTransport() {return Transport::shared_pointer();}
314+
virtual void releaseNameServerSearchTransport() {}
308315
};
309316

310317
/**

src/remote/transportRegistry.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void TransportRegistry::clear()
127127
if(temp.empty())
128128
return;
129129

130-
LOG(logLevelDebug, "Context still has %zu transport(s) active and closing...", temp.size());
130+
LOG(logLevelDebug, "Context still has %lu transport(s) active and closing...", static_cast<unsigned long>(temp.size()));
131131

132132
for(transports_t::iterator it(temp.begin()), end(temp.end());
133133
it != end; ++it)

0 commit comments

Comments
 (0)