Skip to content

[HDFS-14292] Added ExecutorService to DataXceiverServer #495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.ReadableByteChannel;

import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.http.client.utils.URIBuilder;

/**
* Represents a peer that we communicate with by using a basic Socket
Expand All @@ -35,12 +38,18 @@ public class BasicInetPeer implements Peer {
private final OutputStream out;
private final InputStream in;
private final boolean isLocal;
private final URI localURI;
private final URI remoteURI;

public BasicInetPeer(Socket socket) throws IOException {
this.socket = socket;
this.out = socket.getOutputStream();
this.in = socket.getInputStream();
this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
this.localURI = getURI(socket.getLocalAddress().getHostAddress(),
socket.getLocalPort());
this.remoteURI =
getURI(socket.getInetAddress().getHostAddress(), socket.getPort());
}

@Override
Expand Down Expand Up @@ -101,6 +110,16 @@ public String getLocalAddressString() {
return socket.getLocalSocketAddress().toString();
}

@Override
public URI getRemoteURI() {
return this.remoteURI;
}

@Override
public URI getLocalURI() {
return this.localURI;
}

@Override
public InputStream getInputStream() throws IOException {
return in;
Expand All @@ -116,11 +135,6 @@ public boolean isLocal() {
return isLocal;
}

@Override
public String toString() {
return "BasicInetPeer(" + socket.toString() + ")";
}

@Override
public DomainSocket getDomainSocket() {
return null;
Expand All @@ -130,4 +144,29 @@ public DomainSocket getDomainSocket() {
public boolean hasSecureChannel() {
return false;
}

@Override
public String toString() {
return "BasicInetPeer [isLocal=" + isLocal + ", localURI=" + localURI
+ ", remoteURI=" + remoteURI + "]";
}

/**
* Given a host name and port, create a DN URI. Turn checked exception into
* runtime. Exception should never happen because the inputs are captures from
* an exiting socket and not parsed from an external source.
*
* @param host Host for URI
* @param port Port for URI
* @return A URI
*/
private URI getURI(final String host, final int port) {
try {
return new URIBuilder().setScheme("hdfs+dn")
.setHost(host)
.setPort(port).build();
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.ReadableByteChannel;

import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.net.unix.DomainSocket;

/**
* Represents a peer that we communicate with by using blocking I/O
Expand All @@ -35,12 +37,21 @@ public class DomainPeer implements Peer {
private final OutputStream out;
private final InputStream in;
private final ReadableByteChannel channel;
private final boolean isLocal;
private final URI localURI;
private final URI remoteURI;

public DomainPeer(DomainSocket socket) {
this.socket = socket;
this.out = socket.getOutputStream();
this.in = socket.getInputStream();
this.channel = socket.getChannel();

// For a domain socket, both clients share the same socket file and only
// local communication is supported
this.isLocal = true;
this.localURI = getURI(socket.getPath());
this.remoteURI = this.localURI;
}

@Override
Expand Down Expand Up @@ -89,6 +100,16 @@ public String getLocalAddressString() {
return "<local>";
}

@Override
public URI getRemoteURI() {
return this.remoteURI;
}

@Override
public URI getLocalURI() {
return this.localURI;
}

@Override
public InputStream getInputStream() throws IOException {
return in;
Expand All @@ -101,13 +122,7 @@ public OutputStream getOutputStream() throws IOException {

@Override
public boolean isLocal() {
/* UNIX domain sockets can only be used for local communication. */
return true;
}

@Override
public String toString() {
return "DomainPeer(" + getRemoteAddressString() + ")";
return this.isLocal;
}

@Override
Expand All @@ -129,4 +144,31 @@ public boolean hasSecureChannel() {
//
return true;
}

@Override
public String toString() {
return "DomainPeer [isLocal=" + isLocal + ", localURI=" + localURI
+ ", remoteURI=" + remoteURI + "]";
}

/**
* Given a host name and port, create a DN URI. Turn checked exception into
* runtime. Exception should never happen because the inputs are captures from
* an exiting socket and not parsed from an external source.
*
* Processes reference Unix domain sockets as file system inodes, so two
* processes can communicate by opening the same socket. Therefore the URI
* host is always "localhost" and the URI path is the file path to the socket
* file.
*
* @param path The path to the Unix domain socket file
* @return A URI
*/
private URI getURI(final String path) {
try {
return new URI("hdfs+dn+unix", "127.0.0.1", path, null);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.channels.ReadableByteChannel;

/**
Expand Down Expand Up @@ -110,6 +111,16 @@ public String getLocalAddressString() {
return enclosedPeer.getLocalAddressString();
}

@Override
public URI getRemoteURI() {
return enclosedPeer.getRemoteURI();
}

@Override
public URI getLocalURI() {
return enclosedPeer.getLocalURI();
}

@Override
public InputStream getInputStream() throws IOException {
return in;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.ReadableByteChannel;

import org.apache.hadoop.net.SocketInputStream;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.http.client.utils.URIBuilder;

/**
* Represents a peer that we communicate with by using non-blocking I/O
Expand All @@ -46,11 +49,18 @@ public class NioInetPeer implements Peer {

private final boolean isLocal;

private final URI localURI;
private final URI remoteURI;

public NioInetPeer(Socket socket) throws IOException {
this.socket = socket;
this.in = new SocketInputStream(socket.getChannel(), 0);
this.out = new SocketOutputStream(socket.getChannel(), 0);
this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
this.localURI = getURI(socket.getLocalAddress().getHostAddress(),
socket.getLocalPort());
this.remoteURI =
getURI(socket.getInetAddress().getHostAddress(), socket.getPort());
}

@Override
Expand Down Expand Up @@ -104,6 +114,16 @@ public String getLocalAddressString() {
return socket.getLocalSocketAddress().toString();
}

@Override
public URI getRemoteURI() {
return this.remoteURI;
}

@Override
public URI getLocalURI() {
return this.localURI;
}

@Override
public InputStream getInputStream() throws IOException {
return in;
Expand All @@ -119,11 +139,6 @@ public boolean isLocal() {
return isLocal;
}

@Override
public String toString() {
return "NioInetPeer(" + socket.toString() + ")";
}

@Override
public DomainSocket getDomainSocket() {
return null;
Expand All @@ -133,4 +148,29 @@ public DomainSocket getDomainSocket() {
public boolean hasSecureChannel() {
return false;
}

@Override
public String toString() {
return "NioInetPeer [isLocal=" + isLocal + ", localURI=" + localURI
+ ", remoteURI=" + remoteURI + "]";
}

/**
* Given a host name and port, create a DN URI. Turn checked exception into
* runtime. Exception should never happen because the inputs are captures from
* an exiting socket and not parsed from an external source.
*
* @param host Host for URI
* @param port Port for URI
* @return A URI
*/
private URI getURI(final String host, final int port) {
try {
return new URIBuilder().setScheme("hdfs+dn")
.setHost(host)
.setPort(port).build();
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
}
Loading