Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.

Commit 96dc411

Browse files
lrhncommit-bot@chromium.org
authored andcommitted
Update _NativeSocket.startConnection to avoid state errors.
The existing code is undocumented and has caused state errors where a future is attempted completed more than once. This rewrites the code, formalizes and documents the expected behavior, which is slightly different from the previous behavior wrt. scheduling new connections when existing connections fail. Change-Id: I369fc5b70920954af4a066a179d751b72da35800 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/174471 Reviewed-by: Nate Bosch <nbosch@google.com> Commit-Queue: Lasse R.H. Nielsen <lrn@google.com>
1 parent d4f3c02 commit 96dc411

File tree

1 file changed

+116
-78
lines changed

1 file changed

+116
-78
lines changed

sdk/lib/_internal/vm/bin/socket_patch.dart

Lines changed: 116 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -574,16 +574,28 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
574574

575575
static Future<ConnectionTask<_NativeSocket>> startConnect(
576576
dynamic host, int port, dynamic sourceAddress) {
577+
// Looks up [sourceAddress] to one or more IP addresses,
578+
// then tries connecting to each one until a connection succeeds.
579+
// Attempts are staggered by a minimum delay, so a new
580+
// attempt isn't made until either a previous attempt has *failed*,
581+
// or the delay has passed.
582+
// This ensures that at most *n* uncompleted connections can be
583+
// active after *n* &times; *delay* time has passed.
577584
if (host is String) {
578585
host = escapeLinkLocalAddress(host);
579586
}
580587
_throwOnBadPort(port);
581-
if (sourceAddress != null && sourceAddress is! _InternetAddress) {
582-
if (sourceAddress is String) {
583-
sourceAddress = new InternetAddress(sourceAddress);
584-
}
585-
}
586-
return new Future.value(host).then((host) {
588+
_InternetAddress? source;
589+
if (sourceAddress is _InternetAddress) {
590+
source = sourceAddress;
591+
} else if (sourceAddress is String) {
592+
source = new _InternetAddress.fromString(sourceAddress);
593+
}
594+
// Should we throw if sourceAddress is not one of:
595+
// null, _InternetAddress or String?
596+
// Is it somehow ensured upstream
597+
// that only those three types will reach here?
598+
return new Future.value(host).then<List<InternetAddress>>((host) {
587599
if (host is _InternetAddress) return [host];
588600
return lookup(host).then((addresses) {
589601
if (addresses.isEmpty) {
@@ -592,23 +604,43 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
592604
return addresses;
593605
});
594606
}).then((addresses) {
607+
assert(addresses.isNotEmpty);
608+
// Completer for result.
595609
var completer = new Completer<_NativeSocket>();
596-
var it = (addresses as List<InternetAddress>).iterator;
610+
// Index of next address in [addresses] to try.
611+
var index = 0;
612+
// Error, set if an error occurs.
613+
// Keeps first error if multiple errors occour.
597614
var error = null;
598-
var connecting = new HashMap();
599-
615+
// Active timers for on-going connection attempts.
616+
// Contains all sockets which haven't received and initial
617+
// write or error event.
618+
var connecting = <_NativeSocket>{};
619+
// Timer counting down from the last connection attempt.
620+
// Reset when a new connection is attempted,
621+
// which happens either when a previous timer runs out,
622+
// or when a previous connection attempt fails.
623+
Timer? timer;
624+
625+
// Attempt to connect to the next address in [addresses].
626+
//
627+
// Called initially, then when either a connection attempt fails,
628+
// or an amount of time has passed since the last connection
629+
// was attempted.
600630
void connectNext() {
601-
if (!it.moveNext()) {
631+
timer?.cancel();
632+
if (index >= addresses.length) {
602633
if (connecting.isEmpty) {
603634
assert(error != null);
635+
assert(!completer.isCompleted);
604636
completer.completeError(error);
605637
}
606638
return;
607639
}
608-
final _InternetAddress address = it.current as _InternetAddress;
640+
final address = addresses[index++] as _InternetAddress;
609641
var socket = new _NativeSocket.normal(address);
610642
var result;
611-
if (sourceAddress == null) {
643+
if (source == null) {
612644
if (address.type == InternetAddressType.unix) {
613645
result = socket.nativeCreateUnixDomainConnect(
614646
address.address, _Namespace._namespace);
@@ -617,97 +649,103 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
617649
address._in_addr, port, address._scope_id);
618650
}
619651
} else {
620-
assert(sourceAddress is _InternetAddress);
621652
if (address.type == InternetAddressType.unix) {
622-
assert(sourceAddress.type == InternetAddressType.unix);
653+
assert(source.type == InternetAddressType.unix);
623654
result = socket.nativeCreateUnixDomainBindConnect(
624-
address.address, sourceAddress.address, _Namespace._namespace);
655+
address.address, source.address, _Namespace._namespace);
625656
} else {
626-
result = socket.nativeCreateBindConnect(address._in_addr, port,
627-
sourceAddress._in_addr, address._scope_id);
657+
result = socket.nativeCreateBindConnect(
658+
address._in_addr, port, source._in_addr, address._scope_id);
628659
}
629660
}
630661
if (result is OSError) {
631662
// Keep first error, if present.
632663
if (error == null) {
633664
int errorCode = result.errorCode;
634-
if (sourceAddress != null &&
665+
if (source != null &&
635666
errorCode != null &&
636667
socket.isBindError(errorCode)) {
637-
error = createError(result, "Bind failed", sourceAddress);
668+
error = createError(result, "Bind failed", source);
638669
} else {
639670
error = createError(result, "Connection failed", address, port);
640671
}
641672
}
642-
connectNext();
643-
} else {
644-
// Query the local port for error messages.
645-
try {
646-
socket.port;
647-
} catch (e) {
648-
if (error == null) {
649-
error = createError(e, "Connection failed", address, port);
650-
}
651-
connectNext();
673+
connectNext(); // Try again after failure to connect.
674+
return;
675+
}
676+
// Query the local port for error messages.
677+
try {
678+
socket.port;
679+
} catch (e) {
680+
if (error == null) {
681+
error = createError(e, "Connection failed", address, port);
652682
}
653-
// Set up timer for when we should retry the next address
654-
// (if any).
655-
var duration =
656-
address.isLoopback ? _retryDurationLoopback : _retryDuration;
657-
var timer = new Timer(duration, connectNext);
658-
659-
connecting[socket] = timer;
660-
// Setup handlers for receiving the first write event which
661-
// indicate that the socket is fully connected.
662-
socket.setHandlers(write: () {
663-
timer.cancel();
664-
connecting.remove(socket);
665-
// From 'man 2 connect':
666-
// After select(2) indicates writability, use getsockopt(2) to read
667-
// the SO_ERROR option at level SOL_SOCKET to determine whether
668-
// connect() completed successfully (SO_ERROR is zero) or
669-
// unsuccessfully.
670-
OSError osError = socket.nativeGetError();
671-
if (osError.errorCode != 0) {
672-
socket.close();
673-
if (error == null) error = osError;
674-
if (connecting.isEmpty) connectNext();
675-
return;
676-
}
677-
socket.setListening(read: false, write: false);
678-
completer.complete(socket);
679-
connecting.forEach((s, t) {
680-
t.cancel();
681-
s.close();
682-
s.setHandlers();
683-
s.setListening(read: false, write: false);
684-
});
685-
connecting.clear();
686-
}, error: (e, st) {
687-
timer.cancel();
688-
socket.close();
689-
// Keep first error, if present.
690-
if (error == null) error = e;
691-
connecting.remove(socket);
692-
if (connecting.isEmpty) connectNext();
693-
});
694-
socket.setListening(read: false, write: true);
683+
connectNext(); // Try again after failure to connect.
684+
return;
695685
}
686+
687+
// Try again if no response (failure or success) within a duration.
688+
// If this occurs, the socket is still trying to connect, and might
689+
// succeed or fail later.
690+
var duration =
691+
address.isLoopback ? _retryDurationLoopback : _retryDuration;
692+
timer = new Timer(duration, connectNext);
693+
694+
connecting.add(socket);
695+
// Setup handlers for receiving the first write event which
696+
// indicate that the socket is fully connected.
697+
socket.setHandlers(write: () {
698+
// First remote response on connection.
699+
// If error, drop the socket and go to the next address.
700+
// If success, complete with the socket
701+
// and stop all other open connection attempts.
702+
connecting.remove(socket);
703+
// From 'man 2 connect':
704+
// After select(2) indicates writability, use getsockopt(2) to read
705+
// the SO_ERROR option at level SOL_SOCKET to determine whether
706+
// connect() completed successfully (SO_ERROR is zero) or
707+
// unsuccessfully.
708+
OSError osError = socket.nativeGetError();
709+
if (osError.errorCode != 0) {
710+
socket.close();
711+
error ??= osError;
712+
connectNext(); // Try again after failure to connect.
713+
return;
714+
}
715+
// Connection success!
716+
// Stop all other connecting sockets and timers.
717+
timer!.cancel();
718+
socket.setListening(read: false, write: false);
719+
for (var s in connecting) {
720+
s.close();
721+
s.setHandlers();
722+
s.setListening(read: false, write: false);
723+
}
724+
connecting.clear();
725+
completer.complete(socket);
726+
}, error: (e, st) {
727+
connecting.remove(socket);
728+
socket.close();
729+
socket.setHandlers();
730+
socket.setListening(read: false, write: false);
731+
// Keep first error, if present.
732+
error ??= e;
733+
connectNext(); // Try again after failure to connect.
734+
});
735+
socket.setListening(read: false, write: true);
696736
}
697737

698738
void onCancel() {
699-
connecting.forEach((s, t) {
700-
t.cancel();
739+
timer?.cancel();
740+
for (var s in connecting) {
701741
s.close();
702742
s.setHandlers();
703743
s.setListening(read: false, write: false);
704-
if (error == null) {
705-
error = createError(null,
706-
"Connection attempt cancelled, host: ${host}, port: ${port}");
707-
}
708-
});
744+
}
709745
connecting.clear();
710746
if (!completer.isCompleted) {
747+
error ??= createError(null,
748+
"Connection attempt cancelled, host: ${host}, port: ${port}");
711749
completer.completeError(error);
712750
}
713751
}

0 commit comments

Comments
 (0)