Skip to content

Commit 52c57cd

Browse files
committed
Add support for PortForward.
1 parent 39bb616 commit 52c57cd

File tree

3 files changed

+290
-1
lines changed

3 files changed

+290
-1
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.examples;
14+
15+
import io.kubernetes.client.ApiClient;
16+
import io.kubernetes.client.ApiException;
17+
import io.kubernetes.client.Configuration;
18+
import io.kubernetes.client.PortForward;
19+
import io.kubernetes.client.apis.CoreV1Api;
20+
import io.kubernetes.client.models.V1Pod;
21+
import io.kubernetes.client.models.V1PodList;
22+
import io.kubernetes.client.util.Config;
23+
24+
import com.google.common.io.ByteStreams;
25+
26+
import java.io.BufferedReader;
27+
import java.io.InputStreamReader;
28+
import java.io.IOException;
29+
import java.io.OutputStream;
30+
import java.net.ServerSocket;
31+
import java.net.Socket;
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
35+
36+
/**
37+
* A simple example of how to use the Java API
38+
*
39+
* Easiest way to run this:
40+
* mvn exec:java -Dexec.mainClass="io.kubernetes.client.examples.PortForwardExample"
41+
* from inside $REPO_DIR/examples
42+
*
43+
* Then:
44+
* curl localhost:8080
45+
* from a different terminal (but be quick about it, the socket times out pretty fast...)
46+
*
47+
*/
48+
public class PortForwardExample {
49+
public static void main(String[] args) throws IOException, ApiException, InterruptedException {
50+
ApiClient client = Config.defaultClient();
51+
Configuration.setDefaultApiClient(client);
52+
53+
PortForward forward = new PortForward();
54+
List<Integer> ports = new ArrayList<>();
55+
ports.add(80);
56+
final PortForward.PortForwardResult result =
57+
forward.forward("default", "nginx-4217019353-fg6zx", ports);
58+
59+
ServerSocket ss = new ServerSocket(8080);
60+
61+
final Socket s = ss.accept();
62+
System.out.println("Connected!");
63+
64+
new Thread(new Runnable() {
65+
public void run() {
66+
try {
67+
ByteStreams.copy(result.getInputStream(80), s.getOutputStream());
68+
} catch (IOException ex) {
69+
ex.printStackTrace();
70+
} catch (Exception ex) {
71+
ex.printStackTrace();
72+
}
73+
}
74+
}).start();
75+
76+
new Thread(new Runnable() {
77+
public void run() {
78+
try {
79+
ByteStreams.copy(s.getInputStream(), result.getOutboundStream(80));
80+
} catch (IOException ex) {
81+
ex.printStackTrace();
82+
} catch (Exception ex) {
83+
ex.printStackTrace();
84+
}
85+
}
86+
}).start();
87+
88+
Thread.sleep(10 * 1000);
89+
90+
System.exit(0);
91+
}
92+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package io.kubernetes.client;
2+
3+
import io.kubernetes.client.Configuration;
4+
import io.kubernetes.client.models.V1Pod;
5+
import io.kubernetes.client.util.WebSockets;
6+
import io.kubernetes.client.util.WebSocketStreamHandler;
7+
8+
import java.io.InputStream;
9+
import java.io.IOException;
10+
import java.io.OutputStream;
11+
import java.util.ArrayList;
12+
import java.util.HashMap;
13+
import java.util.List;
14+
15+
/**
16+
* Utility class for setting up port-forwarding connections.
17+
* Uses the WebSockets API, not the SPDY API (which the Go client uses)
18+
*
19+
* The protocol is undocumented as far as I can tell, but the PR that added
20+
* it is here:
21+
* https://github.com/kubernetes/kubernetes/pull/33684
22+
*
23+
* And the protocol is:
24+
*
25+
* ws://server/api/v1/namespaces/<namespace>/pods/<pod>/portforward?ports=80&ports=8080
26+
*
27+
* I/O for first port (80) is on Channel 0
28+
* Err for first port (80) is on Channel 1
29+
* I/O for second port (8080) is on Channel 2
30+
* Err for second port (8080) is on Channel 2
31+
* <and so on for remaining ports>
32+
*
33+
* The first two bytes of each output stream is the port that is being forwarded
34+
* in little-endian format.
35+
*/
36+
public class PortForward {
37+
private ApiClient apiClient;
38+
39+
/**
40+
* Simple PortForward API constructor, uses default configuration
41+
*/
42+
public PortForward() {
43+
this(Configuration.getDefaultApiClient());
44+
}
45+
46+
/**
47+
* PortForward API Constructor
48+
* @param apiClient The api client to use.
49+
*/
50+
public PortForward(ApiClient apiClient) {
51+
this.apiClient = apiClient;
52+
}
53+
54+
/**
55+
* Get the API client for these PortForward operations.
56+
* @returns The API client that will be used.
57+
*/
58+
public ApiClient getApiClient() {
59+
return apiClient;
60+
}
61+
62+
/**
63+
* Set the API client for subsequent PortForward operations.
64+
* @param apiClient The new API client to use.
65+
*/
66+
public void setApiClient(ApiClient apiClient) {
67+
this.apiClient = apiClient;
68+
}
69+
70+
private String makePath(String namespace, String name) {
71+
return "/api/v1/namespaces/" +
72+
namespace +
73+
"/pods/" +
74+
name +
75+
"/portforward";
76+
}
77+
78+
/**
79+
* PortForward to a container
80+
*
81+
* @param pod The pod where the port forward is run.
82+
* @param ports The ports to forward
83+
*/
84+
public PortForwardResult forward(V1Pod pod, List<Integer> ports) throws ApiException, IOException {
85+
return forward(pod.getMetadata().getNamespace(), pod.getMetadata().getNamespace(), ports);
86+
}
87+
88+
/**
89+
* PortForward to a container.
90+
*
91+
* @param namespace The namespace of the Pod
92+
* @param name The name of the Pod
93+
* @param ports The ports to forward
94+
*/
95+
public PortForwardResult forward(String namespace, String name, List<Integer> ports) throws ApiException, IOException {
96+
String path = makePath(namespace, name);
97+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
98+
PortForwardResult result = new PortForwardResult(handler, ports);
99+
List<Pair> queryParams = new ArrayList<>();
100+
queryParams.add(new Pair("ports", "80"));
101+
WebSockets.stream(path, "GET", queryParams, apiClient, handler);
102+
103+
// Wait for streams to start.
104+
result.init();
105+
106+
return result;
107+
}
108+
109+
/**
110+
* PortForwardResult contains the result of an Attach call, it includes streams for stdout
111+
* stderr and stdin.
112+
*/
113+
public static class PortForwardResult {
114+
private WebSocketStreamHandler handler;
115+
private HashMap<Integer, Integer> streams;
116+
private List<Integer> ports;
117+
118+
public PortForwardResult(WebSocketStreamHandler handler, List<Integer> ports) throws IOException {
119+
this.handler = handler;
120+
this.streams = new HashMap<>();
121+
this.ports = ports;
122+
}
123+
124+
public void init() throws IOException {
125+
for (int i = 0; i < ports.size(); i++) {
126+
InputStream is = handler.getInputStream(i);
127+
byte[] data = new byte[2];
128+
is.read(data);
129+
int port = data[0] + data[1] * 256;
130+
streams.put(port, i);
131+
}
132+
}
133+
134+
private int findPortIndex(int portNumber) {
135+
Integer ix = streams.get(portNumber);
136+
if (ix == null) {
137+
return -1;
138+
}
139+
return ix.intValue();
140+
}
141+
142+
/**
143+
* Get the output stream for the specified port number (e.g. 80)
144+
* @param port The port number to get the stream for.
145+
*/
146+
public OutputStream getOutboundStream(int port) {
147+
int portIndex = findPortIndex(port);
148+
if (portIndex == -1) {
149+
throw new IllegalArgumentException("No such port!");
150+
}
151+
return handler.getOutputStream(portIndex * 2);
152+
}
153+
154+
/**
155+
* Get the error stream for a port number (e.g. 80)
156+
* @param port The port number to get the stream for.
157+
* @return The error stream
158+
*/
159+
public OutputStream getErrorStream(int port) {
160+
int portIndex = findPortIndex(port);
161+
if (portIndex == -1) {
162+
throw new IllegalArgumentException("No such port!");
163+
}
164+
return handler.getOutputStream(portIndex * 2 + 1);
165+
}
166+
167+
/**
168+
* Get the input stream for a port number (e.g. 80)
169+
* @param port The port number to get the stream for.
170+
* @return The error stream
171+
*/
172+
public InputStream getInputStream(int port) throws IOException {
173+
int portIndex = findPortIndex(port);
174+
if (portIndex == -1) {
175+
throw new IllegalArgumentException("No such port!");
176+
}
177+
return handler.getInputStream(portIndex * 2);
178+
}
179+
}
180+
}

util/src/main/java/io/kubernetes/client/util/WebSockets.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package io.kubernetes.client.util;
1414

1515
import com.google.common.net.HttpHeaders;
16+
import com.squareup.okhttp.Call;
1617
import com.squareup.okhttp.Request;
1718
import com.squareup.okhttp.Response;
1819
import com.squareup.okhttp.ResponseBody;
@@ -30,6 +31,7 @@
3031
import java.io.InputStream;
3132
import java.io.OutputStream;
3233
import java.io.Reader;
34+
import java.util.List;
3335
import java.util.ArrayList;
3436
import java.util.HashMap;
3537

@@ -81,13 +83,28 @@ public interface SocketListener {
8183
* @param listener The socket listener to handle socket events
8284
*/
8385
public static void stream(String path, String method, ApiClient client, SocketListener listener) throws ApiException, IOException {
86+
stream(path, method, new ArrayList<Pair>(), client, listener);
87+
}
88+
89+
public static void stream(String path, String method, List<Pair> queryParams, ApiClient client, SocketListener listener) throws ApiException, IOException {
90+
8491
HashMap<String, String> headers = new HashMap<String, String>();
8592
String allProtocols = String.format("%s,%s,%s,%s", V4_STREAM_PROTOCOL, V3_STREAM_PROTOCOL, V2_STREAM_PROTOCOL, V1_STREAM_PROTOCOL);
8693
headers.put(STREAM_PROTOCOL_HEADER, allProtocols);
8794
headers.put(HttpHeaders.CONNECTION, HttpHeaders.UPGRADE);
8895
headers.put(HttpHeaders.UPGRADE, SPDY_3_1);
8996

90-
Request request = client.buildRequest(path, method, new ArrayList<Pair>(), new ArrayList<Pair>(), null, headers, new HashMap<String, Object>(), new String[0], null);
97+
Request request = client.buildRequest(path, method, queryParams, new ArrayList<Pair>(), null, headers, new HashMap<String, Object>(), new String[0], null);
98+
streamRequest(request, client, listener);
99+
}
100+
101+
/** If we ever upgrade to okhttp 3...
102+
public static void stream(Call call, ApiClient client, SocketListener listener) {
103+
streamRequest(call.request(), client, listener);
104+
}
105+
*/
106+
107+
private static void streamRequest(Request request, ApiClient client, SocketListener listener) {
91108
WebSocketCall.create(client.getHttpClient(), request).enqueue(new Listener(listener));
92109
}
93110

0 commit comments

Comments
 (0)