@@ -71,12 +71,14 @@ static const int MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1;
7171static const int MAX_FRAMES_AT_ONCE = 10 ;
7272static const int DELAY_BETWEEN_FRAMES_MS = 50 ;
7373
74+ static const int MAX_NAME_SERVER_SEARCH_COUNT = 3 ;
7475
7576ChannelSearchManager::ChannelSearchManager (Context::shared_pointer const & context) :
7677 m_context (context),
7778 m_responseAddress (), // initialized in activate()
7879 m_canceled (),
7980 m_sequenceNumber (0 ),
81+ m_nsSearchCounter (0 ),
8082 m_sendBuffer (MAX_UDP_UNFRAGMENTED_SEND),
8183 m_channels (),
8284 m_lastTimeSent (),
@@ -135,6 +137,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
135137 if (m_canceled.get ())
136138 return ;
137139
140+ LOG (logLevelDebug, " Registering search instance: %s" , channel->getSearchInstanceName ().c_str ());
138141 bool immediateTrigger;
139142 {
140143 Lock guard (m_channelMutex);
@@ -154,6 +157,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
154157
155158void ChannelSearchManager::unregisterSearchInstance (SearchInstance::shared_pointer const & channel)
156159{
160+ LOG (logLevelDebug, " Unregistering search instance: %s" , channel->getSearchInstanceName ().c_str ());
157161 Lock guard (m_channelMutex);
158162 pvAccessID id = channel->getSearchInstanceID ();
159163 m_channels.erase (id);
@@ -180,6 +184,7 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
180184 SearchInstance::shared_pointer si (channelsIter->second .lock ());
181185
182186 // remove from search list
187+ LOG (logLevelDebug, " Removing cid %d from the channel map" , cid);
183188 m_channels.erase (cid);
184189
185190 guard.unlock ();
@@ -188,6 +193,17 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
188193 if (si)
189194 si->searchResponse (guid, minorRevision, serverAddress);
190195 }
196+ releaseNameServerTransport ();
197+ }
198+
199+ void ChannelSearchManager::releaseNameServerTransport (bool forceRelease)
200+ {
201+ if (m_channels.size () == 0 || forceRelease)
202+ {
203+ m_nsSearchCounter = 0 ;
204+ m_context.lock ()->releaseNameServerSearchTransport (m_nsTransport);
205+ m_nsTransport.reset ();
206+ }
191207}
192208
193209void ChannelSearchManager::newServerDetected ()
@@ -196,6 +212,46 @@ void ChannelSearchManager::newServerDetected()
196212 callback ();
197213}
198214
215+ void ChannelSearchManager::send (epics::pvData::ByteBuffer* buffer, TransportSendControl* control)
216+ {
217+ control->startMessage (CMD_SEARCH, 4 +1 +3 +16 +2 +1 +4 +2 );
218+ buffer->putInt (m_sequenceNumber);
219+ buffer->putByte ((int8_t )QOS_REPLY_REQUIRED); // CAST_POSITION
220+ buffer->putByte ((int8_t )0 ); // reserved
221+ buffer->putShort ((int16_t )0 ); // reserved
222+
223+ osiSockAddr anyAddress;
224+ memset (&anyAddress, 0 , sizeof (anyAddress));
225+ anyAddress.ia .sin_family = AF_INET;
226+ anyAddress.ia .sin_port = htons (0 );
227+ anyAddress.ia .sin_addr .s_addr = htonl (INADDR_ANY);
228+ encodeAsIPv6Address (buffer, &anyAddress);
229+ buffer->putShort ((int16_t )ntohs (anyAddress.ia .sin_port ));
230+ buffer->putByte ((int8_t )1 );
231+ SerializeHelper::serializeString (" tcp" , buffer, control);
232+ buffer->putShort ((int16_t )0 ); // initial channel count
233+
234+ vector<SearchInstance::shared_pointer> toSend;
235+ {
236+ Lock guard (m_channelMutex);
237+ toSend.reserve (m_channels.size ());
238+
239+ for (m_channels_t ::iterator channelsIter = m_channels.begin ();
240+ channelsIter != m_channels.end (); channelsIter++)
241+ {
242+ SearchInstance::shared_pointer inst (channelsIter->second .lock ());
243+ if (!inst) continue ;
244+ toSend.push_back (inst);
245+ }
246+ }
247+
248+ vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin ();
249+ for (; siter != toSend.end (); siter++)
250+ {
251+ generateSearchRequestMessage (*siter, buffer, control);
252+ }
253+ }
254+
199255void ChannelSearchManager::initializeSendBuffer ()
200256{
201257 // for now OK, since it is only set here
@@ -236,15 +292,40 @@ void ChannelSearchManager::flushSendBuffer()
236292{
237293 Lock guard (m_mutex);
238294
295+ // UDP transport
239296 Transport::shared_pointer tt = m_context.lock ()->getSearchTransport ();
240297 BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast<BlockingUDPTransport>(tt);
241298
299+ // UDP search
242300 m_sendBuffer.putByte (CAST_POSITION, (int8_t )0x80 ); // unicast, no reply required
243301 ut->send (&m_sendBuffer, inetAddressType_unicast);
244302
245303 m_sendBuffer.putByte (CAST_POSITION, (int8_t )0x00 ); // b/m-cast, no reply required
246304 ut->send (&m_sendBuffer, inetAddressType_broadcast_multicast);
247305
306+ // Name server transport
307+ if (m_nsTransport)
308+ {
309+ // Reset transport after max. number of attempts is reached.
310+ if (m_nsSearchCounter >= MAX_NAME_SERVER_SEARCH_COUNT)
311+ {
312+ LOG (logLevelDebug, " Resetting name server transport after %d search attempts" , m_nsSearchCounter);
313+ releaseNameServerTransport (true );
314+ }
315+ }
316+
317+ if (!m_nsTransport)
318+ {
319+ m_nsTransport = m_context.lock ()->getNameServerSearchTransport ();
320+ }
321+
322+ // Name server search
323+ if (m_nsTransport)
324+ {
325+ m_nsSearchCounter++;
326+ LOG (logLevelDebug, " Initiating name server search for %d channels, search attempt %d" , int (m_channels.size ()), m_nsSearchCounter);
327+ m_nsTransport->enqueueSendRequest (shared_from_this ());
328+ }
248329 initializeSendBuffer ();
249330}
250331
@@ -253,7 +334,6 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
253334 ByteBuffer* requestMessage, TransportSendControl* control)
254335{
255336 epics::pvData::int16 dataCount = requestMessage->getShort (DATA_COUNT_POSITION);
256-
257337 dataCount++;
258338
259339 /*
@@ -262,6 +342,8 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
262342 */
263343
264344 const std::string& name (channel->getSearchInstanceName ());
345+ LOG (logLevelDebug, " Searching for channel: %s" , name.c_str ());
346+
265347 // not nice...
266348 const int addedPayloadSize = sizeof (int32)/sizeof (int8) + (1 + sizeof (int32)/sizeof (int8) + name.length ());
267349 if (((int )requestMessage->getRemaining ()) < addedPayloadSize)
0 commit comments