@@ -71,12 +71,14 @@ static const int MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1;
7171static const int MAX_FRAMES_AT_ONCE = 10 ;
7272static const int DELAY_BETWEEN_FRAMES_MS = 50 ;
7373
74+ static const int MAX_NAME_SERVER_SEARCH_COUNT = 3 ;
7475
7576ChannelSearchManager::ChannelSearchManager (Context::shared_pointer const & context) :
7677 m_context (context),
7778 m_responseAddress (), // initialized in activate()
7879 m_canceled (),
7980 m_sequenceNumber (0 ),
81+ m_nsSearchCounter (0 ),
8082 m_sendBuffer (MAX_UDP_UNFRAGMENTED_SEND),
8183 m_channels (),
8284 m_lastTimeSent (),
@@ -135,6 +137,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
135137 if (m_canceled.get ())
136138 return ;
137139
140+ LOG (logLevelDebug, " Registering search instance: %s" , channel->getSearchInstanceName ().c_str ());
138141 bool immediateTrigger;
139142 {
140143 Lock guard (m_channelMutex);
@@ -154,6 +157,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
154157
155158void ChannelSearchManager::unregisterSearchInstance (SearchInstance::shared_pointer const & channel)
156159{
160+ LOG (logLevelDebug, " Unregistering search instance: %s" , channel->getSearchInstanceName ().c_str ());
157161 Lock guard (m_channelMutex);
158162 pvAccessID id = channel->getSearchInstanceID ();
159163 m_channels.erase (id);
@@ -180,6 +184,7 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
180184 SearchInstance::shared_pointer si (channelsIter->second .lock ());
181185
182186 // remove from search list
187+ LOG (logLevelDebug, " Removing cid %d from the channel map" , cid);
183188 m_channels.erase (cid);
184189
185190 guard.unlock ();
@@ -188,6 +193,30 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
188193 if (si)
189194 si->searchResponse (guid, minorRevision, serverAddress);
190195 }
196+ // Release all NS connections if there are
197+ // no more channels to search for
198+ releaseNameServerTransport ();
199+ }
200+
201+ void ChannelSearchManager::releaseNameServerTransport (bool forceRelease)
202+ {
203+ bool releaseAllConnections = true ;
204+ if (m_channels.size () == 0 )
205+ {
206+ // No more channels to search for, release all connections
207+ m_nsSearchCounter = 0 ;
208+ m_context.lock ()->releaseNameServerSearchTransport (m_nsTransport, releaseAllConnections);
209+ m_nsTransport.reset ();
210+ }
211+ else if (forceRelease)
212+ {
213+ // There are channels to search for, release only connection
214+ // that is currently used
215+ releaseAllConnections = false ;
216+ m_nsSearchCounter = 0 ;
217+ m_context.lock ()->releaseNameServerSearchTransport (m_nsTransport, releaseAllConnections);
218+ m_nsTransport.reset ();
219+ }
191220}
192221
193222void ChannelSearchManager::newServerDetected ()
@@ -196,6 +225,46 @@ void ChannelSearchManager::newServerDetected()
196225 callback ();
197226}
198227
228+ void ChannelSearchManager::send (epics::pvData::ByteBuffer* buffer, TransportSendControl* control)
229+ {
230+ control->startMessage (CMD_SEARCH, 4 +1 +3 +16 +2 +1 +4 +2 );
231+ buffer->putInt (m_sequenceNumber);
232+ buffer->putByte ((int8_t )QOS_REPLY_REQUIRED); // CAST_POSITION
233+ buffer->putByte ((int8_t )0 ); // reserved
234+ buffer->putShort ((int16_t )0 ); // reserved
235+
236+ osiSockAddr anyAddress;
237+ memset (&anyAddress, 0 , sizeof (anyAddress));
238+ anyAddress.ia .sin_family = AF_INET;
239+ anyAddress.ia .sin_port = htons (0 );
240+ anyAddress.ia .sin_addr .s_addr = htonl (INADDR_ANY);
241+ encodeAsIPv6Address (buffer, &anyAddress);
242+ buffer->putShort ((int16_t )ntohs (anyAddress.ia .sin_port ));
243+ buffer->putByte ((int8_t )1 );
244+ SerializeHelper::serializeString (" tcp" , buffer, control);
245+ buffer->putShort ((int16_t )0 ); // initial channel count
246+
247+ vector<SearchInstance::shared_pointer> toSend;
248+ {
249+ Lock guard (m_channelMutex);
250+ toSend.reserve (m_channels.size ());
251+
252+ for (m_channels_t ::iterator channelsIter = m_channels.begin ();
253+ channelsIter != m_channels.end (); channelsIter++)
254+ {
255+ SearchInstance::shared_pointer inst (channelsIter->second .lock ());
256+ if (!inst) continue ;
257+ toSend.push_back (inst);
258+ }
259+ }
260+
261+ vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin ();
262+ for (; siter != toSend.end (); siter++)
263+ {
264+ generateSearchRequestMessage (*siter, buffer, control);
265+ }
266+ }
267+
199268void ChannelSearchManager::initializeSendBuffer ()
200269{
201270 // for now OK, since it is only set here
@@ -236,15 +305,41 @@ void ChannelSearchManager::flushSendBuffer()
236305{
237306 Lock guard (m_mutex);
238307
308+ // UDP transport
239309 Transport::shared_pointer tt = m_context.lock ()->getSearchTransport ();
240310 BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast<BlockingUDPTransport>(tt);
241311
312+ // UDP search
242313 m_sendBuffer.putByte (CAST_POSITION, (int8_t )0x80 ); // unicast, no reply required
243314 ut->send (&m_sendBuffer, inetAddressType_unicast);
244315
245316 m_sendBuffer.putByte (CAST_POSITION, (int8_t )0x00 ); // b/m-cast, no reply required
246317 ut->send (&m_sendBuffer, inetAddressType_broadcast_multicast);
247318
319+ // Name server transport
320+ if (m_nsTransport)
321+ {
322+ // Reset transport (current connection only)
323+ // after max. number of attempts is reached.
324+ if (m_nsSearchCounter >= MAX_NAME_SERVER_SEARCH_COUNT)
325+ {
326+ LOG (logLevelDebug, " Resetting name server transport after %d search attempts" , m_nsSearchCounter);
327+ releaseNameServerTransport (true );
328+ }
329+ }
330+
331+ if (!m_nsTransport)
332+ {
333+ m_nsTransport = m_context.lock ()->getNameServerSearchTransport ();
334+ }
335+
336+ // Name server search
337+ if (m_nsTransport)
338+ {
339+ m_nsSearchCounter++;
340+ LOG (logLevelDebug, " Initiating name server search for %d channels, search attempt %d" , int (m_channels.size ()), m_nsSearchCounter);
341+ m_nsTransport->enqueueSendRequest (shared_from_this ());
342+ }
248343 initializeSendBuffer ();
249344}
250345
@@ -253,7 +348,6 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
253348 ByteBuffer* requestMessage, TransportSendControl* control)
254349{
255350 epics::pvData::int16 dataCount = requestMessage->getShort (DATA_COUNT_POSITION);
256-
257351 dataCount++;
258352
259353 /*
@@ -262,6 +356,8 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
262356 */
263357
264358 const std::string& name (channel->getSearchInstanceName ());
359+ LOG (logLevelDebug, " Searching for channel: %s" , name.c_str ());
360+
265361 // not nice...
266362 const int addedPayloadSize = sizeof (int32)/sizeof (int8) + (1 + sizeof (int32)/sizeof (int8) + name.length ());
267363 if (((int )requestMessage->getRemaining ()) < addedPayloadSize)
0 commit comments