Skip to content

Commit c66a516

Browse files
committed
[INTERNAL][CORE] Prepares Stream access
This patch introduces an new class / interface which is just used for the ITransmitter and IReceiver. This is necessary to distinguish between plan stream connections and internal stuff.
1 parent 6c77832 commit c66a516

18 files changed

+458
-335
lines changed

core/src/saros/net/internal/BinaryChannelConnection.java

+14-18
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @author coezbek
3131
* @author srossbach
3232
*/
33-
public class BinaryChannelConnection implements IByteStreamConnection {
33+
public class BinaryChannelConnection implements IPacketConnection {
3434

3535
private static final Logger LOG = Logger.getLogger(BinaryChannelConnection.class);
3636

@@ -111,24 +111,16 @@ public void run() {
111111

112112
private IBinaryXMPPExtensionReceiver receiver;
113113

114-
public BinaryChannelConnection(
115-
JID localAddress,
116-
JID remoteAddress,
117-
String connectionID,
118-
ByteStream stream,
119-
StreamMode mode,
120-
IByteStreamConnectionListener listener)
121-
throws IOException {
114+
public BinaryChannelConnection(ByteStream stream, IByteStreamConnectionListener listener) {
122115
this.listener = listener;
123-
this.localAddress = localAddress;
124-
this.remoteAddress = remoteAddress;
125-
this.connectionID = connectionID;
126-
this.stream = stream;
127-
this.stream.setReadTimeout(0); // keep connection alive
128-
this.mode = mode;
116+
// FIXME
117+
this.localAddress = (JID) stream.getLocalAddress();
118+
// FIXME
119+
this.remoteAddress = (JID) stream.getRemoteAddress();
129120

130-
outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream()));
131-
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream()));
121+
this.connectionID = stream.getId();
122+
this.mode = stream.getMode();
123+
this.stream = stream;
132124
}
133125

134126
@Override
@@ -139,13 +131,17 @@ public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver
139131
}
140132

141133
@Override
142-
public synchronized void initialize() {
134+
public synchronized void initialize() throws IOException {
143135
if (initialized) return;
144136

145137
/*
146138
* it is ok to start the receiver a bit later because the data will be
147139
* already buffered by SMACK or the OS
148140
*/
141+
stream.setReadTimeout(0); // keep connection alive
142+
outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream()));
143+
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream()));
144+
149145
receiveThread = new ReceiverThread();
150146
receiveThread.setName("BinaryChannel-" + remoteAddress.getName());
151147
receiveThread.start();

core/src/saros/net/internal/DataTransferManager.java

+52-16
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import saros.context.IContextKeyBindings.Socks5StreamService;
1919
import saros.net.ConnectionState;
2020
import saros.net.IConnectionManager;
21+
import saros.net.stream.ByteStream;
2122
import saros.net.stream.IStreamService;
23+
import saros.net.stream.IStreamServiceListener;
2224
import saros.net.stream.StreamMode;
2325
import saros.net.xmpp.IConnectionListener;
2426
import saros.net.xmpp.JID;
@@ -28,17 +30,14 @@
2830
/**
2931
* This class is responsible for handling all transfers of binary data. It maintains a map of
3032
* established connections and tries to reuse them.
31-
*
32-
* @author srossbach
33-
* @author coezbek
34-
* @author jurke
3533
*/
3634
@Component(module = "net")
3735
public class DataTransferManager implements IConnectionListener, IConnectionManager {
3836

3937
private static final Logger LOG = Logger.getLogger(DataTransferManager.class);
4038

41-
private static final String DEFAULT_CONNECTION_ID = "default";
39+
// package private for testing purposes
40+
static final String DEFAULT_CONNECTION_ID = "default";
4241

4342
private static final String IN = "in";
4443

@@ -65,6 +64,12 @@ public class DataTransferManager implements IConnectionListener, IConnectionMana
6564
private final CopyOnWriteArrayList<IByteStreamConnectionListener> connectionListeners =
6665
new CopyOnWriteArrayList<>();
6766

67+
private final CopyOnWriteArrayList<IPacketConnectionListener> packetConnectionListeners =
68+
new CopyOnWriteArrayList<>();
69+
70+
private final IStreamServiceListener streamServiceListener =
71+
(stream) -> createAndAnnouncePacketConnection(stream, true);
72+
6873
private final IByteStreamConnectionListener byteStreamConnectionListener =
6974
new IByteStreamConnectionListener() {
7075

@@ -81,11 +86,6 @@ public void connectionChanged(
8186
toConnectionIDToken(
8287
connectionId, incomingRequest ? IN : OUT, connection.getRemoteAddress());
8388

84-
/// TODO we currently have to announce not initialized connections otherwise the IReceiver
85-
// will miss updates
86-
87-
notfiyconnectionChanged(id, connection, incomingRequest);
88-
8989
LOG.debug(
9090
"bytestream connection changed "
9191
+ connection
@@ -123,7 +123,16 @@ public void connectionChanged(
123123
}
124124
}
125125

126-
connection.initialize();
126+
try {
127+
connection.initialize();
128+
} catch (IOException e) {
129+
LOG.error("failed to initialize connection: " + connection);
130+
connection.close();
131+
connectionPool.remove(id);
132+
return;
133+
}
134+
135+
notfiyconnectionChanged(id, connection, incomingRequest);
127136
}
128137

129138
@Override
@@ -234,6 +243,14 @@ public void removeConnectionListener(final IByteStreamConnectionListener listene
234243
connectionListeners.remove(listener);
235244
}
236245

246+
public void addPacketConnectionListener(final IPacketConnectionListener listener) {
247+
packetConnectionListeners.addIfAbsent(listener);
248+
}
249+
250+
public void removePacketConnectionListener(final IPacketConnectionListener listener) {
251+
packetConnectionListeners.remove(listener);
252+
}
253+
237254
private IByteStreamConnection connectInternal(String connectionID, JID peer) throws IOException {
238255

239256
IByteStreamConnection connection = null;
@@ -265,6 +282,8 @@ private IByteStreamConnection connectInternal(String connectionID, JID peer) thr
265282
final ArrayList<IStreamService> currentStreamServices =
266283
new ArrayList<IStreamService>(streamServices);
267284

285+
ByteStream byteStream = null;
286+
268287
for (IStreamService streamService : currentStreamServices) {
269288
LOG.info(
270289
"establishing connection to "
@@ -274,7 +293,7 @@ private IByteStreamConnection connectInternal(String connectionID, JID peer) thr
274293
+ " using stream service "
275294
+ streamService);
276295
try {
277-
connection = streamService.connect(connectionID, peer);
296+
byteStream = streamService.connect(connectionID, peer);
278297
break;
279298
} catch (IOException e) {
280299
LOG.warn("failed to connect to " + peer + " using stream service: " + streamService, e);
@@ -298,9 +317,8 @@ private IByteStreamConnection connectInternal(String connectionID, JID peer) thr
298317
}
299318
}
300319

301-
if (connection != null) {
302-
byteStreamConnectionListener.connectionChanged(connectionID, connection, false);
303-
320+
if (byteStream != null) {
321+
connection = createAndAnnouncePacketConnection(byteStream, false);
304322
return connection;
305323
}
306324

@@ -347,7 +365,7 @@ private void prepareConnection(final Connection connection) {
347365
connectionPool.open();
348366

349367
for (IStreamService streamService : streamServices)
350-
streamService.initialize(xmppConnection, byteStreamConnectionListener);
368+
streamService.initialize(xmppConnection, streamServiceListener);
351369
}
352370

353371
private void disposeConnection() {
@@ -431,4 +449,22 @@ private void notfiyconnectionChanged(
431449
}
432450
}
433451
}
452+
453+
private IByteStreamConnection createAndAnnouncePacketConnection(
454+
final ByteStream byteStream, final boolean isIncoming) {
455+
final IPacketConnection connection =
456+
new BinaryChannelConnection(byteStream, byteStreamConnectionListener);
457+
458+
for (final IPacketConnectionListener listener : packetConnectionListeners) {
459+
try {
460+
listener.connectionEstablished(connection);
461+
} catch (RuntimeException e) {
462+
LOG.error("invoking connectionEstablished() on listener: " + listener + " failed", e);
463+
}
464+
}
465+
466+
byteStreamConnectionListener.connectionChanged(byteStream.getId(), connection, isIncoming);
467+
468+
return connection;
469+
}
434470
}

core/src/saros/net/internal/IByteStreamConnection.java

+1-13
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,7 @@ public interface IByteStreamConnection {
1717
* Initializes the byte stream connection. After the initialization is performed the byte stream
1818
* connection must be able to send and receive data.
1919
*/
20-
public void initialize();
21-
22-
/**
23-
* If this call returns the data has been send successfully, otherwise an IOException is thrown
24-
* with the reason why the transfer failed.
25-
*
26-
* @param data The data to be sent.
27-
* @throws IOException if the send failed
28-
* @blocking Send the given data as a blocking operation.
29-
*/
30-
public void send(TransferDescription data, byte[] content) throws IOException;
20+
public void initialize() throws IOException;
3121

3222
/**
3323
* Returns the connection id of this connection.
@@ -37,6 +27,4 @@ public interface IByteStreamConnection {
3727
public String getConnectionID();
3828

3929
public StreamMode getMode();
40-
41-
public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver);
4230
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package saros.net.internal;
2+
3+
import java.io.IOException;
4+
import saros.net.IReceiver;
5+
import saros.net.ITransmitter;
6+
7+
/**
8+
* A packet connection is internally used as an abstraction layer to coordinate the transmission of
9+
* and receiving of packets between the {@link IReceiver} and {@link ITransmitter}.
10+
*/
11+
public interface IPacketConnection extends IByteStreamConnection {
12+
13+
/** Sends the given data along with the given description. */
14+
public void send(TransferDescription description, byte[] data) throws IOException;
15+
16+
/**
17+
* Sets the receiver for incoming {@link BinaryXMPPExtension packet extensions}.
18+
*
19+
* @param receiver
20+
*/
21+
public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver);
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package saros.net.internal;
2+
3+
/** Listener interface for {@link IPacketConnection packet connections}. */
4+
public interface IPacketConnectionListener {
5+
6+
/**
7+
* Gets called when a new packet connection was established. The connection is <b>not</b>
8+
* initialized at this point.
9+
*
10+
* @param connection
11+
*/
12+
public void connectionEstablished(IPacketConnection connection);
13+
}

core/src/saros/net/internal/XMPPReceiver.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,10 @@ public XMPPReceiver(
8989
this.parser = new MXParser();
9090

9191
connectionService.addListener(connectionListener);
92-
dataTransferManager.addConnectionListener(
93-
new IByteStreamConnectionListener() {
92+
dataTransferManager.addPacketConnectionListener(
93+
new IPacketConnectionListener() {
9494
@Override
95-
public void connectionChanged(
96-
final String connectionId,
97-
final IByteStreamConnection connection,
98-
final boolean incomingRequest) {
95+
public void connectionEstablished(final IPacketConnection connection) {
9996
connection.setBinaryXMPPExtensionReceiver(XMPPReceiver.this);
10097
}
10198
});

core/src/saros/net/internal/XMPPTransmitter.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ public void send(String connectionID, JID recipient, PacketExtension extension)
9191

9292
if (connection == null) connection = dataManager.connect(recipient);
9393

94+
if (!(connection instanceof IPacketConnection))
95+
throw new IOException(
96+
"connection id '" + connectionID + "' references an unmanaged stream connection");
97+
9498
/*
9599
* The TransferDescription can be created out of the session, the name
96100
* and namespace of the packet extension and standard values and thus
@@ -109,7 +113,7 @@ public void send(String connectionID, JID recipient, PacketExtension extension)
109113
transferDescription.setCompressContent(true);
110114
}
111115

112-
sendPacketExtension(connection, transferDescription, data);
116+
sendPacketExtension((IPacketConnection) connection, transferDescription, data);
113117
}
114118

115119
@Override
@@ -189,7 +193,7 @@ private synchronized boolean isConnectionInvalid() {
189193
}
190194

191195
private void sendPacketExtension(
192-
final IByteStreamConnection connection, final TransferDescription description, byte[] payload)
196+
final IPacketConnection connection, final TransferDescription description, byte[] payload)
193197
throws IOException {
194198

195199
boolean sendPacket = true;

core/src/saros/net/stream/ByteStream.java

+8
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,12 @@ public interface ByteStream {
1616
public int getReadTimeout() throws IOException;
1717

1818
public void setReadTimeout(int timeout) throws IOException;
19+
20+
public Object getLocalAddress();
21+
22+
public Object getRemoteAddress();
23+
24+
public StreamMode getMode();
25+
26+
public String getId();
1927
}

0 commit comments

Comments
 (0)