Skip to content

Commit 1b3a3f5

Browse files
authored
Merge 046c1f2 into f1268ad
2 parents f1268ad + 046c1f2 commit 1b3a3f5

31 files changed

+982
-97
lines changed

src/pva/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ INC += pv/pvaVersion.h
88
INC += pv/pvaVersionNum.h
99
INC += pv/clientFactory.h
1010

11+
pvAccess_SRCS += pvaConstants.cpp
1112
pvAccess_SRCS += pvaVersion.cpp
1213
pvAccess_SRCS += clientFactory.cpp

src/pva/pv/pvaConstants.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222
#endif
2323
#include <shareLib.h>
2424

25-
namespace epics {
26-
namespace pvAccess {
25+
namespace epics { namespace pvAccess {
2726

2827
/** PVA protocol magic number */
2928
const epics::pvData::int8 PVA_MAGIC = static_cast<epics::pvData::int8>(0xCA);
@@ -43,6 +42,9 @@ const epics::pvData::int32 PVA_SERVER_PORT = 5075;
4342
/** Default PVA beacon port. */
4443
const epics::pvData::int32 PVA_BROADCAST_PORT = 5076;
4544

45+
/** Default UDP sender port. */
46+
const epics::pvData::int32 PVA_UDP_SENDER_PORT = 0;
47+
4648
/** PVA protocol message header size. */
4749
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE = 8;
4850

@@ -86,7 +88,7 @@ epicsShareExtern const std::string PVACCESS_ALL_PROVIDERS;
8688

8789
/** Name of the system env. variable to turn on debugging. */
8890
epicsShareExtern const std::string PVACCESS_DEBUG;
89-
}
90-
}
91+
92+
}}
9193

9294
#endif /* PVACONSTANTS_H_ */

src/pva/pvaConstants.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/**
2+
* Copyright - See the COPYRIGHT that is included with this distribution.
3+
* pvAccessCPP is distributed subject to a Software License Agreement found
4+
* in file LICENSE that is included with this distribution.
5+
*/
6+
7+
#define epicsExportSharedSymbols
8+
#include <pv/pvaConstants.h>
9+
10+
namespace epics { namespace pvAccess {
11+
12+
const std::string PVACCESS_DEFAULT_PROVIDER("local");
13+
const std::string PVACCESS_ALL_PROVIDERS("<all>");
14+
const std::string PVACCESS_DEBUG("EPICS_PVA_DEBUG");
15+
16+
}}

src/pva/pvaVersion.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@ using std::string;
1616
namespace epics {
1717
namespace pvAccess {
1818

19-
const std::string PVACCESS_DEFAULT_PROVIDER("local");
20-
const std::string PVACCESS_ALL_PROVIDERS("<all>");
21-
const std::string PVACCESS_DEBUG("EPICS_PVA_DEBUG");
22-
2319
Version::Version(std::string const & productName,
2420
std::string const & implementationLangugage,
2521
int majorVersion, int minorVersion,

src/remote/abstractResponseHandler.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <pv/reftrack.h>
1111

1212
#define epicsExportSharedSymbols
13+
#include <pv/pvaConstants.h>
1314
#include <pv/remote.h>
1415
#include <pv/hexDump.h>
1516

src/remote/blockingUDPTransport.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock
435435
{
436436
if (IS_LOGGABLE(logLevelDebug))
437437
{
438-
LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
439-
length, _remoteName.c_str(), inetAddressToString(address).c_str());
438+
LOG(logLevelDebug, "UDP Tx (%lu) %s -> %s.",
439+
static_cast<unsigned long>(length), _remoteName.c_str(), inetAddressToString(address).c_str());
440440
}
441441

442442
int retval = sendto(_channel, buffer,
@@ -460,8 +460,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address)
460460

461461
if (IS_LOGGABLE(logLevelDebug))
462462
{
463-
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
464-
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
463+
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
464+
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(address).c_str());
465465
}
466466

467467
int retval = sendto(_channel, buffer->getBuffer(),
@@ -498,8 +498,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) {
498498

499499
if (IS_LOGGABLE(logLevelDebug))
500500
{
501-
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
502-
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
501+
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
502+
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
503503
}
504504

505505
int retval = sendto(_channel, buffer->getBuffer(),
@@ -580,6 +580,7 @@ void initializeUDPTransports(bool serverFlag,
580580
const ResponseHandler::shared_pointer& responseHandler,
581581
BlockingUDPTransport::shared_pointer& sendTransport,
582582
int32& listenPort,
583+
int32& senderPort,
583584
bool autoAddressList,
584585
const std::string& addressList,
585586
const std::string& ignoreAddressList)
@@ -595,7 +596,7 @@ void initializeUDPTransports(bool serverFlag,
595596
osiSockAddr anyAddress;
596597
memset(&anyAddress, 0, sizeof(anyAddress));
597598
anyAddress.ia.sin_family = AF_INET;
598-
anyAddress.ia.sin_port = htons(0);
599+
anyAddress.ia.sin_port = htons(senderPort);
599600
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
600601

601602
sendTransport = connector.connect(responseHandler, anyAddress, protoVer);
@@ -684,7 +685,7 @@ void initializeUDPTransports(bool serverFlag,
684685
}
685686
}
686687
LOG(logLevelDebug,
687-
"Broadcast address #%zu: %s. (%sunicast)", i, inetAddressToString(list[i]).c_str(),
688+
"Broadcast address #%lu: %s. (%sunicast)", static_cast<unsigned long>(i), inetAddressToString(list[i]).c_str(),
688689
isunicast[i]?"":"not ");
689690
}
690691

@@ -714,7 +715,7 @@ void initializeUDPTransports(bool serverFlag,
714715
getSocketAddressList(ignoreAddressVector, ignoreAddressList, 0, 0);
715716

716717
//
717-
// Setup UDP trasport(s) (per interface)
718+
// Setup UDP transport(s) (per interface)
718719
//
719720

720721
InetAddrVector tappedNIF;

src/remote/channelSearchManager.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
135135
if (m_canceled.get())
136136
return;
137137

138+
LOG(logLevelDebug, "Registering search instance: %s", channel->getSearchInstanceName().c_str());
138139
bool immediateTrigger;
139140
{
140141
Lock guard(m_channelMutex);
@@ -154,6 +155,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
154155

155156
void ChannelSearchManager::unregisterSearchInstance(SearchInstance::shared_pointer const & channel)
156157
{
158+
LOG(logLevelDebug, "Unregistering search instance: %s", channel->getSearchInstanceName().c_str());
157159
Lock guard(m_channelMutex);
158160
pvAccessID id = channel->getSearchInstanceID();
159161
m_channels.erase(id);
@@ -180,6 +182,7 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
180182
SearchInstance::shared_pointer si(channelsIter->second.lock());
181183

182184
// remove from search list
185+
LOG(logLevelDebug, "Removing cid %d from the channel map", cid);
183186
m_channels.erase(cid);
184187

185188
guard.unlock();
@@ -188,6 +191,15 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
188191
if(si)
189192
si->searchResponse(guid, minorRevision, serverAddress);
190193
}
194+
releaseNameServerTransport();
195+
}
196+
197+
void ChannelSearchManager::releaseNameServerTransport()
198+
{
199+
if(m_channels.size() == 0)
200+
{
201+
m_context.lock()->releaseNameServerSearchTransport();
202+
}
191203
}
192204

193205
void ChannelSearchManager::newServerDetected()
@@ -196,6 +208,46 @@ void ChannelSearchManager::newServerDetected()
196208
callback();
197209
}
198210

211+
void ChannelSearchManager::send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control)
212+
{
213+
control->startMessage(CMD_SEARCH, 4+1+3+16+2+1+4+2);
214+
buffer->putInt(m_sequenceNumber);
215+
buffer->putByte((int8_t)QOS_REPLY_REQUIRED); // CAST_POSITION
216+
buffer->putByte((int8_t)0); // reserved
217+
buffer->putShort((int16_t)0); // reserved
218+
219+
osiSockAddr anyAddress;
220+
memset(&anyAddress, 0, sizeof(anyAddress));
221+
anyAddress.ia.sin_family = AF_INET;
222+
anyAddress.ia.sin_port = htons(0);
223+
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
224+
encodeAsIPv6Address(buffer, &anyAddress);
225+
buffer->putShort((int16_t)ntohs(anyAddress.ia.sin_port));
226+
buffer->putByte((int8_t)1);
227+
SerializeHelper::serializeString("tcp", buffer, control);
228+
buffer->putShort((int16_t)0); // initial channel count
229+
230+
vector<SearchInstance::shared_pointer> toSend;
231+
{
232+
Lock guard(m_channelMutex);
233+
toSend.reserve(m_channels.size());
234+
235+
for(m_channels_t::iterator channelsIter = m_channels.begin();
236+
channelsIter != m_channels.end(); channelsIter++)
237+
{
238+
SearchInstance::shared_pointer inst(channelsIter->second.lock());
239+
if(!inst) continue;
240+
toSend.push_back(inst);
241+
}
242+
}
243+
244+
vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin();
245+
for (; siter != toSend.end(); siter++)
246+
{
247+
generateSearchRequestMessage(*siter, buffer, control);
248+
}
249+
}
250+
199251
void ChannelSearchManager::initializeSendBuffer()
200252
{
201253
// for now OK, since it is only set here
@@ -245,6 +297,13 @@ void ChannelSearchManager::flushSendBuffer()
245297
m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x00); // b/m-cast, no reply required
246298
ut->send(&m_sendBuffer, inetAddressType_broadcast_multicast);
247299

300+
// Name server search
301+
Transport::shared_pointer nsTransport = m_context.lock()->getNameServerSearchTransport();
302+
if(nsTransport)
303+
{
304+
LOG(logLevelDebug, "Initiating name server search for %d channels", int(m_channels.size()));
305+
nsTransport->enqueueSendRequest(shared_from_this());
306+
}
248307
initializeSendBuffer();
249308
}
250309

@@ -253,7 +312,6 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
253312
ByteBuffer* requestMessage, TransportSendControl* control)
254313
{
255314
epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
256-
257315
dataCount++;
258316

259317
/*
@@ -262,6 +320,8 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
262320
*/
263321

264322
const std::string& name(channel->getSearchInstanceName());
323+
LOG(logLevelDebug, "Searching for channel: %s", name.c_str());
324+
265325
// not nice...
266326
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
267327
if(((int)requestMessage->getRemaining()) < addedPayloadSize)

src/remote/codec.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,8 +1608,8 @@ void BlockingServerTCPTransportCodec::destroyAllChannels() {
16081608
{
16091609
LOG(
16101610
logLevelDebug,
1611-
"Transport to %s still has %zu channel(s) active and closing...",
1612-
_socketName.c_str(), _channels.size());
1611+
"Transport to %s still has %lu channel(s) active and closing...",
1612+
_socketName.c_str(), static_cast<unsigned long>(_channels.size()));
16131613
}
16141614

16151615
_channels_t temp;
@@ -1761,7 +1761,7 @@ bool BlockingClientTCPTransportCodec::acquire(ClientChannelImpl::shared_pointer
17611761

17621762
if (IS_LOGGABLE(logLevelDebug))
17631763
{
1764-
LOG(logLevelDebug, "Acquiring transport to %s.", _socketName.c_str());
1764+
LOG(logLevelDebug, "Acquiring transport to %s for channel cid %d.", _socketName.c_str(), client->getID());
17651765
}
17661766

17671767
_owners[client->getID()] = ClientChannelImpl::weak_pointer(client);
@@ -1789,8 +1789,8 @@ void BlockingClientTCPTransportCodec::internalClose() {
17891789
{
17901790
LOG(
17911791
logLevelDebug,
1792-
"Transport to %s still has %zu client(s) active and closing...",
1793-
_socketName.c_str(), refs);
1792+
"Transport to %s still has %lu client(s) active and closing...",
1793+
_socketName.c_str(), static_cast<unsigned long>(refs));
17941794
}
17951795

17961796
TransportClientMap_t::iterator it = _owners.begin();
@@ -1828,6 +1828,11 @@ void BlockingClientTCPTransportCodec::release(pvAccessID clientID) {
18281828
}
18291829
}
18301830

1831+
bool BlockingClientTCPTransportCodec::isUsed()
1832+
{
1833+
return (_owners.size() > 0);
1834+
}
1835+
18311836
void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer,
18321837
TransportSendControl* control)
18331838
{

src/remote/pv/blockingUDP.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class BlockingUDPTransport :
7676
}
7777

7878
virtual std::string getType() const OVERRIDE FINAL {
79-
return std::string("udp");
79+
return "udp";
8080
}
8181

8282
virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
@@ -401,6 +401,7 @@ void initializeUDPTransports(
401401
const ResponseHandler::shared_pointer& responseHandler,
402402
BlockingUDPTransport::shared_pointer& sendTransport,
403403
epics::pvData::int32& listenPort,
404+
epics::pvData::int32& senderPort,
404405
bool autoAddressList,
405406
const std::string& addressList,
406407
const std::string& ignoreAddressList);

src/remote/pv/channelSearchManager.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class SearchInstance {
5353

5454
class ChannelSearchManager :
5555
public epics::pvData::TimerCallback,
56+
public TransportSender,
5657
public std::tr1::enable_shared_from_this<ChannelSearchManager>
5758
{
5859
public:
@@ -99,13 +100,19 @@ class ChannelSearchManager :
99100
/// Timer stooped callback.
100101
virtual void timerStopped() OVERRIDE FINAL;
101102

103+
// Transport sender interface.
104+
virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL;
105+
102106
/**
103107
* Private constructor.
104108
* @param context
105109
*/
106110
ChannelSearchManager(Context::shared_pointer const & context);
107111
void activate();
108112

113+
// Releases name server transport.
114+
void releaseNameServerTransport();
115+
109116
private:
110117

111118
bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush);

0 commit comments

Comments
 (0)