Skip to content

Commit b2a256f

Browse files
committed
HADOOP-17511 restore xfer manager integration
Change-Id: Ifd1b217861c646a9c17b56614bd1f6da570e3d15
1 parent da604b2 commit b2a256f

File tree

4 files changed

+38
-3
lines changed

4 files changed

+38
-3
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@
7979
import com.amazonaws.services.s3.transfer.model.CopyResult;
8080
import com.amazonaws.services.s3.transfer.model.UploadResult;
8181
import com.amazonaws.event.ProgressListener;
82-
83-
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
8482
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
8583
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
8684
import org.slf4j.Logger;
@@ -133,6 +131,7 @@
133131
import org.apache.hadoop.fs.statistics.DurationTracker;
134132
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
135133
import org.apache.hadoop.fs.statistics.IOStatistics;
134+
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
136135
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
137136
import org.apache.hadoop.io.IOUtils;
138137
import org.apache.hadoop.io.Text;
@@ -4105,7 +4104,8 @@ private CopyResult copyFile(String srcKey, String dstKey, long size,
41054104
getRequestFactory().newCopyObjectRequest(srcKey, dstKey, srcom);
41064105
changeTracker.maybeApplyConstraint(copyObjectRequest);
41074106
incrementStatistic(OBJECT_COPY_REQUESTS);
4108-
Copy copy = transfers.copy(copyObjectRequest);
4107+
Copy copy = transfers.copy(copyObjectRequest,
4108+
getAuditManager().createStateChangeListener());
41094109
copy.addProgressListener(progressListener);
41104110
CopyOutcome copyOutcome = CopyOutcome.waitForCopy(copy);
41114111
InterruptedException interruptedException =

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222

2323
import com.amazonaws.handlers.RequestHandler2;
24+
import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener;
2425

2526
import org.apache.hadoop.classification.InterfaceAudience;
2627
import org.apache.hadoop.service.Service;
@@ -52,4 +53,14 @@ public interface AuditManager extends Service, AuditSpanSource,
5253
*/
5354
List<RequestHandler2> createRequestHandlers();
5455

56+
/**
57+
* Return a transfer state change callback which
58+
* fixes the active span context to be that in which
59+
* the state change listener was created.
60+
* This can be used to audit the creation of the multipart
61+
* upload initiation request which the transfer manager
62+
* makes when a file to be copied is split up.
63+
* @return a state change listener.
64+
*/
65+
TransferStateChangeListener createStateChangeListener();
5566
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import com.amazonaws.Response;
2929
import com.amazonaws.SdkBaseException;
3030
import com.amazonaws.handlers.RequestHandler2;
31+
import com.amazonaws.services.s3.transfer.Transfer;
32+
import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener;
3133
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
3234
import org.slf4j.Logger;
3335
import org.slf4j.LoggerFactory;
@@ -209,6 +211,18 @@ public List<RequestHandler2> createRequestHandlers() {
209211
return requestHandlers;
210212
}
211213

214+
@Override
215+
public TransferStateChangeListener createStateChangeListener() {
216+
final AuditSpan span = getActiveThreadSpan();
217+
return new TransferStateChangeListener() {
218+
@Override
219+
public void transferStateChanged(final Transfer transfer,
220+
final Transfer.TransferState state) {
221+
setActiveThreadSpan(span);
222+
}
223+
};
224+
}
225+
212226
/**
213227
* Attach a reference to the active thread span, then
214228
* invoke the same callback on that active thread.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/NoopAuditManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.List;
2525

2626
import com.amazonaws.handlers.RequestHandler2;
27+
import com.amazonaws.services.s3.transfer.Transfer;
28+
import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener;
2729

2830
import org.apache.hadoop.classification.InterfaceAudience;
2931
import org.apache.hadoop.fs.s3a.audit.AuditManager;
@@ -61,4 +63,12 @@ public List<RequestHandler2> createRequestHandlers() {
6163
return new ArrayList<>();
6264
}
6365

66+
@Override
67+
public TransferStateChangeListener createStateChangeListener() {
68+
return new TransferStateChangeListener() {
69+
public void transferStateChanged(final Transfer transfer,
70+
final Transfer.TransferState state) {
71+
}
72+
};
73+
}
6474
}

0 commit comments

Comments
 (0)