Skip to content

Commit

Permalink
- Backport from master: first impl of SOS (https://issues.redhat.com/…
Browse files Browse the repository at this point in the history
…browse/JGRP-2402)

- Updated manual
  • Loading branch information
belaban committed Apr 6, 2020
1 parent 878ae6d commit 877340e
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 10 deletions.
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
basedir="${compile.dir}"
manifest="${conf.dir}/MANIFEST.MF"
includes="org/jgroups/**">
<fileset dir="${conf.dir}" includes="*.xml, jg-messages*.properties, JGROUPS_VERSION.properties *.jks">
<fileset dir="${conf.dir}" includes="*.xml, *.properties, *.cfg, *.jks">
<exclude name="log4j*.xml"/>
<exclude name="settings.xml"/>
</fileset>
Expand Down
10 changes: 10 additions & 0 deletions conf/sos.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
jmx=TP.thread_pool_,num_rej
jmx=RED.que,avg_,dropped
jmx=MERGE3.num_
jmx=FD_SOCK.num_sus
jmx=FailureDetection.num_heartbeats,num_suspect
jmx=NAKACK2.xmit_table_undelivered,xmit_table_missing
jmx=UNICAST3.xmit_r,xmit_table_undelivered,xmit_table_missing
op=GMS.printPreviousViews
jmx=UFC.aver,number_of_blo
jmx=MFC.aver,number_of_blo
13 changes: 13 additions & 0 deletions doc/manual/protocols.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2274,6 +2274,19 @@ The `RED` protocol should be placed above the transport.
${RED}


==== SOS
`SOS` is a protocol that periodically dumps a selected set of critical attributes into a file. These could for example
be the size of the thread pool, the number of retransmissions, or the number of rejected messages.

Looking at the values over time would help a support person in diagnosing the problem.

`SOS` can be placed anywhere in the stack.

JIRA: https://issues.redhat.com/browse/JGRP-2402

${SOS}


[[LockingProtocols]]
==== Locking protocols

Expand Down
16 changes: 14 additions & 2 deletions src/org/jgroups/JChannelProbeHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,17 @@ protected void handleOperation(Map<String, String> map, String operation) throws
if(index == -1)
throw new IllegalArgumentException("operation " + operation + " is missing the protocol name");
String prot_name=operation.substring(0, index);
Protocol prot=ch.getProtocolStack().findProtocol(prot_name);

Protocol prot=null;
try {
Class<? extends Protocol> cl=Util.loadProtocolClass(prot_name, this.getClass());
prot=ch.getProtocolStack().findProtocol(cl);
}
catch(Exception e) {
}

if(prot == null)
prot=ch.getProtocolStack().findProtocol(prot_name);
if(prot == null) {
log.error("protocol %s not found", prot_name);
return; // less drastic than throwing an exception...
Expand All @@ -319,6 +329,8 @@ protected void handleOperation(Map<String, String> map, String operation) throws
}

Method method=findMethod(prot, method_name, args);
if(method == null)
throw new IllegalArgumentException(String.format("method %s not found in %s", method_name, prot.getName()));
MethodCall call=new MethodCall(method);
Object[] converted_args=null;
if(args != null) {
Expand All @@ -329,7 +341,7 @@ protected void handleOperation(Map<String, String> map, String operation) throws
}
Object retval=call.invoke(prot, converted_args);
if(retval != null)
map.put(prot_name + "." + method_name, retval.toString());
map.put(prot.getName() + "." + method_name, retval.toString());
}

protected Method findMethod(Protocol prot, String method_name, String[] args) throws Exception {
Expand Down
10 changes: 8 additions & 2 deletions src/org/jgroups/protocols/FailureDetection.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ protected void startHeartbeatSender() {
lock.lock();
try {
if(!isHeartbeatSenderRunning()) {
heartbeat_sender=timer.scheduleWithFixedDelay(new HeartbeatSender(), 0, interval, TimeUnit.MILLISECONDS,
heartbeat_sender=timer.scheduleWithFixedDelay(new HeartbeatSender(this), 0, interval, TimeUnit.MILLISECONDS,
getTransport() instanceof TCP);
}
}
Expand Down Expand Up @@ -332,6 +332,12 @@ public HeartbeatHeader() {}

/** Class which periodically multicasts a HEARTBEAT message to the cluster */
class HeartbeatSender implements Runnable {
protected final FailureDetection enclosing;

HeartbeatSender(FailureDetection enclosing) {
this.enclosing=enclosing;
}

public void run() {
if(mcast_sent.compareAndSet(true, false))
; // suppress sending of heartbeat
Expand All @@ -345,7 +351,7 @@ public void run() {
}

public String toString() {
return FailureDetection.class.getSimpleName() + ": " + getClass().getSimpleName();
return String.format("%s: %s", enclosing.getClass().getSimpleName(), getClass().getSimpleName());
}
}

Expand Down
133 changes: 133 additions & 0 deletions src/org/jgroups/protocols/SOS.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package org.jgroups.protocols;

import org.jgroups.JChannelProbeHandler;
import org.jgroups.Version;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;

import java.io.*;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.jgroups.stack.DiagnosticsHandler.ProbeHandler;

/**
* Periodically fetches some attributes and writes them to a file (https://issues.redhat.com/browse/JGRP-2402)
* @author Bela Ban
* @since 4.2.2, 5.0.0
*/
@MBean(description="Periodically fetches some attributes from selected (use-configurable) protocols and writes them to a file")
public class SOS extends Protocol {

@Property(description="File to which the periodic data is written",writable=false)
protected String filename="${sos.filename:jgroups.sos}";

@Property(description="Interval in ms at which the attributes are fetched and written to the file",writable=false)
protected long interval=60_000 * 15;

@Property(description="The attributes to be fetched. In probe format ('jmx' or 'op' command)",writable=false)
protected String cmd="jmx=TP.bind_,thread_pool_ jmx=FD_ALL3.num_s op=TP.printLogicalAddressCache";

@Property(description="The configuration file containing all protocols and attributes to be dumped")
protected String config="sos.cfg";

protected Set<ProbeHandler> handlers;
private Future<?> task;


public String getFilename() {return filename;}
public SOS setFileName(String f) {filename=f; return this;}
public long getInterval() {return interval;}
public SOS setInterval(long i) {interval=i; return this;}

@ManagedOperation(description="Reads the contents of the given file and sets cmd")
public SOS setCommand(String filename) throws IOException {
cmd=Util.readFile(filename);
return this;
}

@ManagedOperation(description="Reads the attributes to be dumped from the default configuration file")
public SOS read() throws IOException {
try(InputStream input=getInput(config)) {
cmd=Util.readContents(input);
}
return this;
}

public void init() throws Exception {
super.init();
read();
}

public void start() throws Exception {
super.start();
TP tp=getTransport();
if(tp.getDiagnosticsHandler() != null)
handlers=tp.getDiagnosticsHandler().getProbeHandlers();
else
handlers=Collections.singleton(new JChannelProbeHandler(stack.getChannel()));
task=tp.getTimer().scheduleWithFixedDelay(new DumperTask(), interval, interval, TimeUnit.MILLISECONDS, false);
}

public void stop() {
super.stop();
task.cancel(true);
}

@ManagedOperation(description="Dumps attributes / invokes operations from given protocols")
public String exec() {
StringTokenizer t=new StringTokenizer(cmd);
List<String> list=new ArrayList<>();
while(t.hasMoreTokens())
list.add(t.nextToken());

String[] args=list.toArray(new String[]{});
StringBuilder sb=new StringBuilder(getMetadata());
for(DiagnosticsHandler.ProbeHandler ph: handlers) {
Map<String,String> ret=ph.handleProbe(args);
if(ret != null && !ret.isEmpty())
for(Map.Entry<String,String> e: ret.entrySet())
sb.append(String.format("\n* %s: %s", e.getKey(), e.getValue()));
}
return sb.append("\n").toString();
}

protected String getMetadata() {
TP tp=stack.getTransport();
return String.format("\nDate: %s, member: %s (%s), version: %s\nview: %s\n",
new Date(), tp.getLocalAddress(), tp.getPhysicalAddress(),
Version.printVersion(), tp.view());
}

protected InputStream getInput(String name) throws FileNotFoundException {
InputStream input=Util.getResourceAsStream(name, getClass());
if(input == null)
input=new FileInputStream(name);
if(input == null)
throw new IllegalArgumentException(String.format("config file %s not found", name));
return input;
}

protected class DumperTask implements Runnable {

public void run() {
try(OutputStream out=new FileOutputStream(filename)) {
String dump=exec();
out.write(dump.getBytes());
}
catch(Exception e) {
log.error("%s: failed dumping SOS information to %s: %s", getTransport().getLocalAddress(), filename, e);
}
}

public String toString() {
return String.format("%s: %s (%s)", SOS.class.getSimpleName(), getClass().getSimpleName(),
getTransport().getLocalAddress());
}
}
}
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ public int getInternalThreadPoolSizeLargest() {

@ManagedOperation(description="Dumps the contents of the logical address cache")
public String printLogicalAddressCache() {
return logical_addr_cache.size() + " elements:\n" + logical_addr_cache.printCache(print_function);
return logical_addr_cache.printCache(print_function);
}

@ManagedOperation(description="Prints the contents of the who-has cache")
Expand Down
21 changes: 17 additions & 4 deletions src/org/jgroups/stack/ProtocolStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,19 @@ public Map<String,Map<String,Object>> dumpStats() {


public Map<String,Map<String,Object>> dumpStats(final String protocol_name, List<String> attrs) {
List<Protocol> prots=findProtocols(protocol_name);
List<Protocol> prots=null;

try {
Class<? extends Protocol> cl=Util.loadProtocolClass(protocol_name, this.getClass());
Protocol prot=findProtocol(cl);
if(prot != null)
prots=Collections.singletonList(prot);
}
catch(Exception e) {
}

if(prots ==null)
prots=findProtocols(protocol_name);
if(prots == null || prots.isEmpty())
return null;
Map<String,Map<String,Object>> retval=new HashMap<>();
Expand All @@ -296,10 +308,11 @@ public Map<String,Map<String,Object>> dumpStats(final String protocol_name, List
it.remove();
}
}
if(retval.containsKey(protocol_name))
retval.put(protocol_name + "-" + prot.getId(), tmp);
String pname=prot.getName();
if(retval.containsKey(pname))
retval.put(pname + "-" + prot.getId(), tmp);
else
retval.put(protocol_name, tmp);
retval.put(pname, tmp);
}
return retval;
}
Expand Down

0 comments on commit 877340e

Please sign in to comment.