3030import java .util .Set ;
3131import java .util .SortedSet ;
3232import java .util .TreeSet ;
33+ import java .util .stream .Collectors ;
3334import org .apache .hadoop .hbase .Coprocessor ;
3435import org .apache .hadoop .hbase .DoNotRetryIOException ;
3536import org .apache .hadoop .hbase .NamespaceDescriptor ;
@@ -174,8 +175,6 @@ private static final class RSGroupInfoHolder {
174175 private final RSGroupStartupWorker rsGroupStartupWorker ;
175176 // contains list of groups that were last flushed to persistent store
176177 private Set <String > prevRSGroups = new HashSet <>();
177- private final ServerEventsListenerThread serverEventsListenerThread =
178- new ServerEventsListenerThread ();
179178
180179 private RSGroupInfoManagerImpl (MasterServices masterServices ) throws IOException {
181180 this .masterServices = masterServices ;
@@ -184,11 +183,34 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException
184183 this .rsGroupStartupWorker = new RSGroupStartupWorker ();
185184 }
186185
186+ private synchronized void updateDefaultServers () {
187+ LOG .info ("Updating default servers." );
188+ Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (holder .groupName2Group );
189+ RSGroupInfo oldDefaultGroupInfo = getRSGroup (RSGroupInfo .DEFAULT_GROUP );
190+ assert oldDefaultGroupInfo != null ;
191+ RSGroupInfo newDefaultGroupInfo =
192+ new RSGroupInfo (RSGroupInfo .DEFAULT_GROUP , getDefaultServers ());
193+ newDefaultGroupInfo .addAllTables (oldDefaultGroupInfo .getTables ());
194+ newGroupMap .put (RSGroupInfo .DEFAULT_GROUP , newDefaultGroupInfo );
195+ // do not need to persist, as we do not persist default group.
196+ resetRSGroupMap (newGroupMap );
197+ LOG .info ("Updated default servers, {} servers" , newDefaultGroupInfo .getServers ().size ());
198+ }
187199
188200 private synchronized void init () throws IOException {
189201 refresh (false );
190- serverEventsListenerThread .start ();
191- masterServices .getServerManager ().registerListener (serverEventsListenerThread );
202+ masterServices .getServerManager ().registerListener (new ServerListener () {
203+
204+ @ Override
205+ public void serverAdded (ServerName serverName ) {
206+ updateDefaultServers ();
207+ }
208+
209+ @ Override
210+ public void serverRemoved (ServerName serverName ) {
211+ updateDefaultServers ();
212+ }
213+ });
192214 migrate ();
193215 }
194216
@@ -225,19 +247,11 @@ private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOEx
225247 }
226248
227249 /**
228- * @param master the master to get online servers for
229250 * @return Set of online Servers named for their hostname and port (not ServerName).
230251 */
231- private static Set <Address > getOnlineServers (final MasterServices master ) {
232- Set <Address > onlineServers = new HashSet <Address >();
233- if (master == null ) {
234- return onlineServers ;
235- }
236-
237- for (ServerName server : master .getServerManager ().getOnlineServers ().keySet ()) {
238- onlineServers .add (server .getAddress ());
239- }
240- return onlineServers ;
252+ private Set <Address > getOnlineServers () {
253+ return masterServices .getServerManager ().getOnlineServers ().keySet ().stream ()
254+ .map (ServerName ::getAddress ).collect (Collectors .toSet ());
241255 }
242256
243257 @ Override
@@ -249,8 +263,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
249263 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
250264 // rsgroup of dead servers that are to come back later).
251265 Set <Address > onlineServers =
252- dst .getName ().equals (RSGroupInfo .DEFAULT_GROUP ) ? getOnlineServers (this .masterServices )
253- : null ;
266+ dst .getName ().equals (RSGroupInfo .DEFAULT_GROUP ) ? getOnlineServers () : null ;
254267 for (Address el : servers ) {
255268 src .removeServer (el );
256269 if (onlineServers != null ) {
@@ -617,25 +630,8 @@ private void updateCacheOfRSGroups(final Set<String> currentGroups) {
617630 this .prevRSGroups .addAll (currentGroups );
618631 }
619632
620- // Called by getDefaultServers. Presume it has lock in place.
621- private List <ServerName > getOnlineRS () throws IOException {
622- if (masterServices != null ) {
623- return masterServices .getServerManager ().getOnlineServersList ();
624- }
625- LOG .debug ("Reading online RS from zookeeper" );
626- List <ServerName > servers = new ArrayList <>();
627- try {
628- for (String el : ZKUtil .listChildrenNoWatch (watcher , watcher .getZNodePaths ().rsZNode )) {
629- servers .add (ServerName .parseServerName (el ));
630- }
631- } catch (KeeperException e ) {
632- throw new IOException ("Failed to retrieve server list from zookeeper" , e );
633- }
634- return servers ;
635- }
636-
637633 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
638- private SortedSet <Address > getDefaultServers () throws IOException {
634+ private SortedSet <Address > getDefaultServers () {
639635 // Build a list of servers in other groups than default group, from rsGroupMap
640636 Set <Address > serversInOtherGroup = new HashSet <>();
641637 for (RSGroupInfo group : listRSGroups () /* get from rsGroupMap */ ) {
@@ -646,7 +642,7 @@ private SortedSet<Address> getDefaultServers() throws IOException {
646642
647643 // Get all online servers from Zookeeper and find out servers in default group
648644 SortedSet <Address > defaultServers = Sets .newTreeSet ();
649- for (ServerName serverName : getOnlineRS ()) {
645+ for (ServerName serverName : masterServices . getServerManager (). getOnlineServers (). keySet ()) {
650646 Address server = Address .fromParts (serverName .getHostname (), serverName .getPort ());
651647 if (!serversInOtherGroup .contains (server )) { // not in other groups
652648 defaultServers .add (server );
@@ -655,76 +651,6 @@ private SortedSet<Address> getDefaultServers() throws IOException {
655651 return defaultServers ;
656652 }
657653
658- // Called by ServerEventsListenerThread. Synchronize on this because redoing
659- // the rsGroupMap then writing it out.
660- private synchronized void updateDefaultServers (SortedSet <Address > servers ) {
661- Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
662- RSGroupInfo info = rsGroupMap .get (RSGroupInfo .DEFAULT_GROUP );
663- RSGroupInfo newInfo = new RSGroupInfo (info .getName (), servers );
664- HashMap <String , RSGroupInfo > newGroupMap = Maps .newHashMap (rsGroupMap );
665- newGroupMap .put (newInfo .getName (), newInfo );
666- resetRSGroupMap (newGroupMap );
667- }
668-
669- /**
670- * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
671- * servers. Notifications about server changes are received by registering {@link ServerListener}.
672- * As a listener, we need to return immediately, so the real work of updating the servers is done
673- * asynchronously in this thread.
674- */
675- private class ServerEventsListenerThread extends Thread implements ServerListener {
676- private final Logger LOG = LoggerFactory .getLogger (ServerEventsListenerThread .class );
677- private boolean changed = false ;
678-
679- ServerEventsListenerThread () {
680- setDaemon (true );
681- }
682-
683- @ Override
684- public void serverAdded (ServerName serverName ) {
685- serverChanged ();
686- }
687-
688- @ Override
689- public void serverRemoved (ServerName serverName ) {
690- serverChanged ();
691- }
692-
693- private synchronized void serverChanged () {
694- changed = true ;
695- this .notify ();
696- }
697-
698- @ Override
699- public void run () {
700- setName (ServerEventsListenerThread .class .getName () + "-" + masterServices .getServerName ());
701- SortedSet <Address > prevDefaultServers = new TreeSet <>();
702- while (isMasterRunning (masterServices )) {
703- try {
704- LOG .info ("Updating default servers." );
705- SortedSet <Address > servers = RSGroupInfoManagerImpl .this .getDefaultServers ();
706- if (!servers .equals (prevDefaultServers )) {
707- RSGroupInfoManagerImpl .this .updateDefaultServers (servers );
708- prevDefaultServers = servers ;
709- LOG .info ("Updated with servers: " + servers .size ());
710- }
711- try {
712- synchronized (this ) {
713- while (!changed ) {
714- wait ();
715- }
716- changed = false ;
717- }
718- } catch (InterruptedException e ) {
719- LOG .warn ("Interrupted" , e );
720- }
721- } catch (IOException e ) {
722- LOG .warn ("Failed to update default servers" , e );
723- }
724- }
725- }
726- }
727-
728654 private class RSGroupStartupWorker extends Thread {
729655 private final Logger LOG = LoggerFactory .getLogger (RSGroupStartupWorker .class );
730656 private volatile boolean online = false ;
0 commit comments