|
39 | 39 | import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc; |
40 | 40 | import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus; |
41 | 41 |
|
| 42 | +import java.io.File; |
| 43 | +import java.io.FileInputStream; |
42 | 44 | import java.io.IOException; |
43 | | -import java.nio.ByteBuffer; |
44 | | -import java.nio.channels.FileChannel; |
45 | | -import java.util.Objects; |
| 45 | +import java.nio.file.Files; |
46 | 46 | import java.util.concurrent.TimeUnit; |
47 | 47 |
|
48 | 48 | import static org.apache.skywalking.apm.agent.core.conf.Config.AsyncProfiler.DATA_CHUNK_SIZE; |
@@ -87,69 +87,71 @@ public void statusChanged(GRPCChannelStatus status) { |
87 | 87 | this.status = status; |
88 | 88 | } |
89 | 89 |
|
90 | | - public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException, InterruptedException { |
91 | | - if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) || !channel.isOpen()) { |
| 90 | + public void sendData(AsyncProfilerTask task, File dumpFile) throws IOException, InterruptedException { |
| 91 | + if (status != GRPCChannelStatus.CONNECTED) { |
92 | 92 | return; |
93 | 93 | } |
94 | 94 |
|
95 | | - int size = Math.toIntExact(channel.size()); |
96 | | - final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); |
97 | | - StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( |
98 | | - GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS |
99 | | - ).collect(new ClientResponseObserver<AsyncProfilerData, AsyncProfilerCollectionResponse>() { |
100 | | - ClientCallStreamObserver<AsyncProfilerData> requestStream; |
101 | | - |
102 | | - @Override |
103 | | - public void beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) { |
104 | | - this.requestStream = requestStream; |
105 | | - } |
| 95 | + try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) { |
| 96 | + long fileSize = Files.size(dumpFile.toPath()); |
| 97 | + int size = Math.toIntExact(fileSize); |
| 98 | + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); |
| 99 | + StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( |
| 100 | + GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS |
| 101 | + ).collect(new ClientResponseObserver<AsyncProfilerData, AsyncProfilerCollectionResponse>() { |
| 102 | + ClientCallStreamObserver<AsyncProfilerData> requestStream; |
| 103 | + |
| 104 | + @Override |
| 105 | + public void beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) { |
| 106 | + this.requestStream = requestStream; |
| 107 | + } |
106 | 108 |
|
107 | | - @Override |
108 | | - public void onNext(AsyncProfilerCollectionResponse value) { |
109 | | - if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) { |
110 | | - LOGGER.warn("JFR is too large to be received by the oap server"); |
111 | | - } else { |
112 | | - ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); |
113 | | - try { |
114 | | - while (channel.read(buf) > 0) { |
115 | | - buf.flip(); |
116 | | - AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() |
117 | | - .setContent(ByteString.copyFrom(buf)) |
118 | | - .build(); |
119 | | - requestStream.onNext(asyncProfilerData); |
120 | | - buf.clear(); |
| 109 | + @Override |
| 110 | + public void onNext(AsyncProfilerCollectionResponse value) { |
| 111 | + if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) { |
| 112 | + LOGGER.warn("JFR is too large to be received by the oap server"); |
| 113 | + } else { |
| 114 | + byte[] buf = new byte[DATA_CHUNK_SIZE]; |
| 115 | + try { |
| 116 | + int bytesRead; |
| 117 | + while ((bytesRead = fileInputStream.read(buf)) != -1) { |
| 118 | + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() |
| 119 | + .setContent(ByteString.copyFrom(buf, 0, bytesRead)) |
| 120 | + .build(); |
| 121 | + requestStream.onNext(asyncProfilerData); |
| 122 | + } |
| 123 | + } catch (IOException e) { |
| 124 | + LOGGER.error("Failed to read JFR file and failed to upload to oap", e); |
121 | 125 | } |
122 | | - } catch (IOException e) { |
123 | | - LOGGER.error("Failed to read JFR file and failed to upload to oap", e); |
124 | 126 | } |
125 | | - } |
126 | | - |
127 | | - requestStream.onCompleted(); |
128 | | - } |
129 | 127 |
|
130 | | - @Override |
131 | | - public void onError(Throwable t) { |
132 | | - status.finished(); |
133 | | - LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception."); |
134 | | - ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); |
135 | | - } |
| 128 | + requestStream.onCompleted(); |
| 129 | + } |
136 | 130 |
|
137 | | - @Override |
138 | | - public void onCompleted() { |
139 | | - status.finished(); |
140 | | - } |
141 | | - }); |
142 | | - AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() |
143 | | - .setService(Config.Agent.SERVICE_NAME) |
144 | | - .setServiceInstance(Config.Agent.INSTANCE_NAME) |
145 | | - .setType(AsyncProfilingStatus.PROFILING_SUCCESS) |
146 | | - .setContentSize(size) |
147 | | - .setTaskId(task.getTaskId()) |
148 | | - .build(); |
149 | | - AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); |
150 | | - dataStreamObserver.onNext(asyncProfilerData); |
| 131 | + @Override |
| 132 | + public void onError(Throwable t) { |
| 133 | + status.finished(); |
| 134 | + LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception."); |
| 135 | + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); |
| 136 | + } |
151 | 137 |
|
152 | | - status.wait4Finish(); |
| 138 | + @Override |
| 139 | + public void onCompleted() { |
| 140 | + status.finished(); |
| 141 | + } |
| 142 | + }); |
| 143 | + AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() |
| 144 | + .setService(Config.Agent.SERVICE_NAME) |
| 145 | + .setServiceInstance(Config.Agent.INSTANCE_NAME) |
| 146 | + .setType(AsyncProfilingStatus.PROFILING_SUCCESS) |
| 147 | + .setContentSize(size) |
| 148 | + .setTaskId(task.getTaskId()) |
| 149 | + .build(); |
| 150 | + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); |
| 151 | + dataStreamObserver.onNext(asyncProfilerData); |
| 152 | + |
| 153 | + status.wait4Finish(); |
| 154 | + } |
153 | 155 | } |
154 | 156 |
|
155 | 157 | public void sendError(AsyncProfilerTask task, String errorMessage) { |
|
0 commit comments