Skip to content

Commit 6909bfa

Browse files
committed
[HDFS-14292] Added ExecutorService to DataXceiverServer
1 parent 585cebf commit 6909bfa

File tree

16 files changed

+484
-248
lines changed

16 files changed

+484
-248
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
import java.io.InputStream;
2222
import java.io.OutputStream;
2323
import java.net.Socket;
24+
import java.net.URI;
25+
import java.net.URISyntaxException;
2426
import java.nio.channels.ReadableByteChannel;
2527

2628
import org.apache.hadoop.net.unix.DomainSocket;
29+
import org.apache.http.client.utils.URIBuilder;
2730

2831
/**
2932
* Represents a peer that we communicate with by using a basic Socket
@@ -35,12 +38,18 @@ public class BasicInetPeer implements Peer {
3538
private final OutputStream out;
3639
private final InputStream in;
3740
private final boolean isLocal;
41+
private final URI localURI;
42+
private final URI remoteURI;
3843

3944
public BasicInetPeer(Socket socket) throws IOException {
4045
this.socket = socket;
4146
this.out = socket.getOutputStream();
4247
this.in = socket.getInputStream();
4348
this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
49+
this.localURI = getURI(socket.getLocalAddress().getHostAddress(),
50+
socket.getLocalPort());
51+
this.remoteURI =
52+
getURI(socket.getInetAddress().getHostAddress(), socket.getPort());
4453
}
4554

4655
@Override
@@ -101,6 +110,16 @@ public String getLocalAddressString() {
101110
return socket.getLocalSocketAddress().toString();
102111
}
103112

113+
@Override
114+
public URI getRemoteURI() {
115+
return this.remoteURI;
116+
}
117+
118+
@Override
119+
public URI getLocalURI() {
120+
return this.localURI;
121+
}
122+
104123
@Override
105124
public InputStream getInputStream() throws IOException {
106125
return in;
@@ -116,11 +135,6 @@ public boolean isLocal() {
116135
return isLocal;
117136
}
118137

119-
@Override
120-
public String toString() {
121-
return "BasicInetPeer(" + socket.toString() + ")";
122-
}
123-
124138
@Override
125139
public DomainSocket getDomainSocket() {
126140
return null;
@@ -130,4 +144,29 @@ public DomainSocket getDomainSocket() {
130144
public boolean hasSecureChannel() {
131145
return false;
132146
}
147+
148+
@Override
149+
public String toString() {
150+
return "BasicInetPeer [isLocal=" + isLocal + ", localURI=" + localURI
151+
+ ", remoteURI=" + remoteURI + "]";
152+
}
153+
154+
/**
155+
* Given a host name and port, create a DN URI. Turn checked exception into
156+
* runtime. Exception should never happen because the inputs are captures from
157+
* an exiting socket and not parsed from an external source.
158+
*
159+
* @param host Host for URI
160+
* @param port Port for URI
161+
* @return A URI
162+
*/
163+
private URI getURI(final String host, final int port) {
164+
try {
165+
return new URIBuilder().setScheme("hdfs+dn")
166+
.setHost(host)
167+
.setPort(port).build();
168+
} catch (URISyntaxException e) {
169+
throw new IllegalArgumentException(e);
170+
}
171+
}
133172
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import java.io.IOException;
2121
import java.io.InputStream;
2222
import java.io.OutputStream;
23+
import java.net.URI;
24+
import java.net.URISyntaxException;
2325
import java.nio.channels.ReadableByteChannel;
2426

25-
import org.apache.hadoop.net.unix.DomainSocket;
2627
import org.apache.hadoop.classification.InterfaceAudience;
28+
import org.apache.hadoop.net.unix.DomainSocket;
2729

2830
/**
2931
* Represents a peer that we communicate with by using blocking I/O
@@ -35,12 +37,21 @@ public class DomainPeer implements Peer {
3537
private final OutputStream out;
3638
private final InputStream in;
3739
private final ReadableByteChannel channel;
40+
private final boolean isLocal;
41+
private final URI localURI;
42+
private final URI remoteURI;
3843

3944
public DomainPeer(DomainSocket socket) {
4045
this.socket = socket;
4146
this.out = socket.getOutputStream();
4247
this.in = socket.getInputStream();
4348
this.channel = socket.getChannel();
49+
50+
// For a domain socket, both clients share the same socket file and only
51+
// local communication is supported
52+
this.isLocal = true;
53+
this.localURI = getURI(socket.getPath());
54+
this.remoteURI = this.localURI;
4455
}
4556

4657
@Override
@@ -89,6 +100,16 @@ public String getLocalAddressString() {
89100
return "<local>";
90101
}
91102

103+
@Override
104+
public URI getRemoteURI() {
105+
return this.remoteURI;
106+
}
107+
108+
@Override
109+
public URI getLocalURI() {
110+
return this.localURI;
111+
}
112+
92113
@Override
93114
public InputStream getInputStream() throws IOException {
94115
return in;
@@ -101,13 +122,7 @@ public OutputStream getOutputStream() throws IOException {
101122

102123
@Override
103124
public boolean isLocal() {
104-
/* UNIX domain sockets can only be used for local communication. */
105-
return true;
106-
}
107-
108-
@Override
109-
public String toString() {
110-
return "DomainPeer(" + getRemoteAddressString() + ")";
125+
return this.isLocal;
111126
}
112127

113128
@Override
@@ -129,4 +144,31 @@ public boolean hasSecureChannel() {
129144
//
130145
return true;
131146
}
147+
148+
@Override
149+
public String toString() {
150+
return "DomainPeer [isLocal=" + isLocal + ", localURI=" + localURI
151+
+ ", remoteURI=" + remoteURI + "]";
152+
}
153+
154+
/**
155+
* Given a host name and port, create a DN URI. Turn checked exception into
156+
* runtime. Exception should never happen because the inputs are captures from
157+
* an exiting socket and not parsed from an external source.
158+
*
159+
* Processes reference Unix domain sockets as file system inodes, so two
160+
* processes can communicate by opening the same socket. Therefore the URI
161+
* host is always "localhost" and the URI path is the file path to the socket
162+
* file.
163+
*
164+
* @param path The path to the Unix domain socket file
165+
* @return A URI
166+
*/
167+
private URI getURI(final String path) {
168+
try {
169+
return new URI("hdfs+dn+unix", "127.0.0.1", path, null);
170+
} catch (URISyntaxException e) {
171+
throw new IllegalArgumentException(e);
172+
}
173+
}
132174
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.io.InputStream;
2626
import java.io.OutputStream;
27+
import java.net.URI;
2728
import java.nio.channels.ReadableByteChannel;
2829

2930
/**
@@ -110,6 +111,16 @@ public String getLocalAddressString() {
110111
return enclosedPeer.getLocalAddressString();
111112
}
112113

114+
@Override
115+
public URI getRemoteURI() {
116+
return enclosedPeer.getRemoteURI();
117+
}
118+
119+
@Override
120+
public URI getLocalURI() {
121+
return enclosedPeer.getLocalURI();
122+
}
123+
113124
@Override
114125
public InputStream getInputStream() throws IOException {
115126
return in;

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import java.io.InputStream;
2222
import java.io.OutputStream;
2323
import java.net.Socket;
24+
import java.net.URI;
25+
import java.net.URISyntaxException;
2426
import java.nio.channels.ReadableByteChannel;
2527

2628
import org.apache.hadoop.net.SocketInputStream;
2729
import org.apache.hadoop.net.SocketOutputStream;
2830
import org.apache.hadoop.net.unix.DomainSocket;
31+
import org.apache.http.client.utils.URIBuilder;
2932

3033
/**
3134
* Represents a peer that we communicate with by using non-blocking I/O
@@ -46,11 +49,18 @@ public class NioInetPeer implements Peer {
4649

4750
private final boolean isLocal;
4851

52+
private final URI localURI;
53+
private final URI remoteURI;
54+
4955
public NioInetPeer(Socket socket) throws IOException {
5056
this.socket = socket;
5157
this.in = new SocketInputStream(socket.getChannel(), 0);
5258
this.out = new SocketOutputStream(socket.getChannel(), 0);
5359
this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
60+
this.localURI = getURI(socket.getLocalAddress().getHostAddress(),
61+
socket.getLocalPort());
62+
this.remoteURI =
63+
getURI(socket.getInetAddress().getHostAddress(), socket.getPort());
5464
}
5565

5666
@Override
@@ -104,6 +114,16 @@ public String getLocalAddressString() {
104114
return socket.getLocalSocketAddress().toString();
105115
}
106116

117+
@Override
118+
public URI getRemoteURI() {
119+
return this.remoteURI;
120+
}
121+
122+
@Override
123+
public URI getLocalURI() {
124+
return this.localURI;
125+
}
126+
107127
@Override
108128
public InputStream getInputStream() throws IOException {
109129
return in;
@@ -119,11 +139,6 @@ public boolean isLocal() {
119139
return isLocal;
120140
}
121141

122-
@Override
123-
public String toString() {
124-
return "NioInetPeer(" + socket.toString() + ")";
125-
}
126-
127142
@Override
128143
public DomainSocket getDomainSocket() {
129144
return null;
@@ -133,4 +148,29 @@ public DomainSocket getDomainSocket() {
133148
public boolean hasSecureChannel() {
134149
return false;
135150
}
151+
152+
@Override
153+
public String toString() {
154+
return "NioInetPeer [isLocal=" + isLocal + ", localURI=" + localURI
155+
+ ", remoteURI=" + remoteURI + "]";
156+
}
157+
158+
/**
159+
* Given a host name and port, create a DN URI. Turn checked exception into
160+
* runtime. Exception should never happen because the inputs are captures from
161+
* an exiting socket and not parsed from an external source.
162+
*
163+
* @param host Host for URI
164+
* @param port Port for URI
165+
* @return A URI
166+
*/
167+
private URI getURI(final String host, final int port) {
168+
try {
169+
return new URIBuilder().setScheme("hdfs+dn")
170+
.setHost(host)
171+
.setPort(port).build();
172+
} catch (URISyntaxException e) {
173+
throw new IllegalArgumentException(e);
174+
}
175+
}
136176
}

0 commit comments

Comments
 (0)