|
87 | 87 | import org.apache.hadoop.hbase.client.LogEntry;
|
88 | 88 | import org.apache.hadoop.hbase.client.Mutation;
|
89 | 89 | import org.apache.hadoop.hbase.client.OnlineLogRecord;
|
| 90 | +import org.apache.hadoop.hbase.client.Operation; |
90 | 91 | import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
|
91 | 92 | import org.apache.hadoop.hbase.client.Put;
|
92 | 93 | import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
|
129 | 130 | import org.apache.hadoop.hbase.util.VersionInfo;
|
130 | 131 | import org.apache.hadoop.ipc.RemoteException;
|
131 | 132 | import org.apache.yetus.audience.InterfaceAudience;
|
| 133 | +import org.slf4j.Logger; |
| 134 | +import org.slf4j.LoggerFactory; |
132 | 135 |
|
133 | 136 | import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
|
134 | 137 | import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
|
|
229 | 232 | @InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class
|
230 | 233 | public final class ProtobufUtil {
|
231 | 234 |
|
| 235 | + private static final Logger LOG = LoggerFactory.getLogger(ProtobufUtil.class.getName()); |
| 236 | + |
232 | 237 | private ProtobufUtil() {
|
233 | 238 | }
|
234 | 239 |
|
@@ -1072,6 +1077,22 @@ public static ClientProtos.Scan toScan(final Scan scan) throws IOException {
|
1072 | 1077 | return scanBuilder.build();
|
1073 | 1078 | }
|
1074 | 1079 |
|
| 1080 | + public static List<Operation> toMulti(MultiRequest multiRequest) { |
| 1081 | + return multiRequest.getRegionActionList().stream().map(RegionAction::getActionList) |
| 1082 | + .flatMap(Collection::stream).map(action -> { |
| 1083 | + try { |
| 1084 | + if (action.hasGet()) { |
| 1085 | + return ProtobufUtil.toGet(action.getGet()); |
| 1086 | + } else if (action.hasMutation()) { |
| 1087 | + return ProtobufUtil.toMutation(action.getMutation()); |
| 1088 | + } |
| 1089 | + } catch (IOException e) { |
| 1090 | + LOG.warn("Unable to convert RegionAction to client model: {}", action, e); |
| 1091 | + } |
| 1092 | + return null; |
| 1093 | + }).filter(Objects::nonNull).collect(Collectors.toList()); |
| 1094 | + } |
| 1095 | + |
1075 | 1096 | /**
|
1076 | 1097 | * Convert a protocol buffer Scan to a client Scan
|
1077 | 1098 | * @param proto the protocol buffer Scan to convert
|
@@ -2139,41 +2160,62 @@ private static String getStringForByteString(ByteString bs) {
|
2139 | 2160 |
|
2140 | 2161 | /**
|
2141 | 2162 | * Return SlowLogParams to maintain recent online slowlog responses
|
2142 |
| - * @param message Message object {@link Message} |
| 2163 | + * @param message Message object {@link Message} |
| 2164 | + * @param slowLogOperationMessagePayloadEnabled whether to include the {@link Message} in the |
| 2165 | + * payload |
2143 | 2166 | * @return SlowLogParams with regionName(for filter queries) and params
|
2144 | 2167 | */
|
2145 |
| - public static SlowLogParams getSlowLogParams(Message message) { |
| 2168 | + public static SlowLogParams getSlowLogParams(Message message, |
| 2169 | + boolean slowLogOperationMessagePayloadEnabled) { |
2146 | 2170 | if (message == null) {
|
2147 | 2171 | return null;
|
2148 | 2172 | }
|
2149 | 2173 | if (message instanceof ScanRequest) {
|
2150 | 2174 | ScanRequest scanRequest = (ScanRequest) message;
|
2151 | 2175 | String regionName = getStringForByteString(scanRequest.getRegion().getValue());
|
2152 | 2176 | String params = TextFormat.shortDebugString(message);
|
2153 |
| - return new SlowLogParams(regionName, params); |
| 2177 | + if (slowLogOperationMessagePayloadEnabled) { |
| 2178 | + return new SlowLogParams(regionName, params, scanRequest.getScan()); |
| 2179 | + } else { |
| 2180 | + return new SlowLogParams(regionName, params); |
| 2181 | + } |
2154 | 2182 | } else if (message instanceof MutationProto) {
|
2155 | 2183 | MutationProto mutationProto = (MutationProto) message;
|
2156 | 2184 | String params = "type= " + mutationProto.getMutateType().toString();
|
2157 |
| - return new SlowLogParams(params); |
| 2185 | + if (slowLogOperationMessagePayloadEnabled) { |
| 2186 | + return new SlowLogParams(null, params, mutationProto); |
| 2187 | + } else { |
| 2188 | + return new SlowLogParams(params); |
| 2189 | + } |
2158 | 2190 | } else if (message instanceof GetRequest) {
|
2159 | 2191 | GetRequest getRequest = (GetRequest) message;
|
2160 | 2192 | String regionName = getStringForByteString(getRequest.getRegion().getValue());
|
2161 | 2193 | String params =
|
2162 | 2194 | "region= " + regionName + ", row= " + getStringForByteString(getRequest.getGet().getRow());
|
2163 |
| - return new SlowLogParams(regionName, params); |
| 2195 | + if (slowLogOperationMessagePayloadEnabled) { |
| 2196 | + return new SlowLogParams(regionName, params, getRequest.getGet()); |
| 2197 | + } else { |
| 2198 | + return new SlowLogParams(regionName, params); |
| 2199 | + } |
2164 | 2200 | } else if (message instanceof MultiRequest) {
|
2165 | 2201 | MultiRequest multiRequest = (MultiRequest) message;
|
2166 |
| - int actionsCount = multiRequest.getRegionActionList().stream() |
2167 |
| - .mapToInt(ClientProtos.RegionAction::getActionCount).sum(); |
2168 | 2202 | RegionAction actions = multiRequest.getRegionActionList().get(0);
|
2169 | 2203 | String regionName = getStringForByteString(actions.getRegion().getValue());
|
2170 |
| - String params = "region= " + regionName + ", for " + actionsCount + " action(s)"; |
2171 |
| - return new SlowLogParams(regionName, params); |
| 2204 | + String params = getShortTextFormat(multiRequest); |
| 2205 | + if (slowLogOperationMessagePayloadEnabled) { |
| 2206 | + return new SlowLogParams(regionName, params, multiRequest); |
| 2207 | + } else { |
| 2208 | + return new SlowLogParams(regionName, params); |
| 2209 | + } |
2172 | 2210 | } else if (message instanceof MutateRequest) {
|
2173 | 2211 | MutateRequest mutateRequest = (MutateRequest) message;
|
2174 | 2212 | String regionName = getStringForByteString(mutateRequest.getRegion().getValue());
|
2175 | 2213 | String params = "region= " + regionName;
|
2176 |
| - return new SlowLogParams(regionName, params); |
| 2214 | + if (slowLogOperationMessagePayloadEnabled) { |
| 2215 | + return new SlowLogParams(regionName, params, mutateRequest.getMutation()); |
| 2216 | + } else { |
| 2217 | + return new SlowLogParams(regionName, params); |
| 2218 | + } |
2177 | 2219 | } else if (message instanceof CoprocessorServiceRequest) {
|
2178 | 2220 | CoprocessorServiceRequest coprocessorServiceRequest = (CoprocessorServiceRequest) message;
|
2179 | 2221 | String params = "coprocessorService= " + coprocessorServiceRequest.getCall().getServiceName()
|
@@ -3376,10 +3418,23 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
|
3376 | 3418 | .setQueueTime(slowLogPayload.getQueueTime()).setRegionName(slowLogPayload.getRegionName())
|
3377 | 3419 | .setResponseSize(slowLogPayload.getResponseSize())
|
3378 | 3420 | .setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime())
|
3379 |
| - .setUserName(slowLogPayload.getUserName()).build(); |
| 3421 | + .setUserName(slowLogPayload.getUserName()) |
| 3422 | + .setScan(catchAll(() -> ProtobufUtil.toScan(slowLogPayload.getScan()))) |
| 3423 | + .setMulti(catchAll(() -> ProtobufUtil.toMulti(slowLogPayload.getMulti()))) |
| 3424 | + .setGet(catchAll(() -> ProtobufUtil.toGet(slowLogPayload.getGet()))) |
| 3425 | + .setMutate(catchAll(() -> ProtobufUtil.toMutation(slowLogPayload.getMutate()))).build(); |
3380 | 3426 | return onlineLogRecord;
|
3381 | 3427 | }
|
3382 | 3428 |
|
| 3429 | + private static <T> T catchAll(Callable<T> callable) { |
| 3430 | + try { |
| 3431 | + return callable.call(); |
| 3432 | + } catch (Exception e) { |
| 3433 | + LOG.warn(""); |
| 3434 | + } |
| 3435 | + return null; |
| 3436 | + } |
| 3437 | + |
3383 | 3438 | /**
|
3384 | 3439 | * Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord}
|
3385 | 3440 | * @param logEntry slowlog response protobuf instance
|
|
0 commit comments