-
-
Notifications
You must be signed in to change notification settings - Fork 157
Implement the Happy Eye Balls RFC's #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,322 @@ | ||
<?php | ||
|
||
namespace React\Socket; | ||
|
||
use React\Dns\Model\Message; | ||
use React\Dns\Resolver\ResolverInterface; | ||
use React\EventLoop\LoopInterface; | ||
use React\EventLoop\TimerInterface; | ||
use React\Promise; | ||
use React\Promise\CancellablePromiseInterface; | ||
|
||
/** | ||
* @internal | ||
*/ | ||
final class HappyEyeBallsConnectionBuilder | ||
{ | ||
const CONNECT_INTERVAL = 0.1; | ||
const RESOLVE_WAIT = 0.5; | ||
|
||
public $loop; | ||
public $connector; | ||
public $resolver; | ||
public $uri; | ||
public $host; | ||
public $resolved = array( | ||
Message::TYPE_A => false, | ||
Message::TYPE_AAAA => false, | ||
); | ||
public $resolverPromises = array(); | ||
public $connectionPromises = array(); | ||
public $connectQueue = array(); | ||
public $timer; | ||
public $parts; | ||
public $ipsCount = 0; | ||
public $failureCount = 0; | ||
public $resolve; | ||
public $reject; | ||
|
||
public function __construct(LoopInterface $loop, ConnectorInterface $connector, ResolverInterface $resolver, $uri, $host, $parts) | ||
{ | ||
$this->loop = $loop; | ||
$this->connector = $connector; | ||
$this->resolver = $resolver; | ||
$this->uri = $uri; | ||
$this->host = $host; | ||
$this->parts = $parts; | ||
} | ||
|
||
public function connect() | ||
{ | ||
$that = $this; | ||
return new Promise\Promise(function ($resolve, $reject) use ($that) { | ||
$lookupResolve = function ($type) use ($that, $resolve, $reject) { | ||
return function (array $ips) use ($that, $type, $resolve, $reject) { | ||
unset($that->resolverPromises[$type]); | ||
$that->resolved[$type] = true; | ||
|
||
$that->mixIpsIntoConnectQueue($ips); | ||
|
||
if ($that->timer instanceof TimerInterface) { | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check causes the bug discussed above. The timer will only be started when more IPs are available when the first connection attempt (IPv6) is started. If the IPv4 address resolves afterwards, then this timer is not running and it will immediately try the IPv4 connection instead of waiting first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you take a look at this again? I can't reproduce this locally anymore, but I don't see any change here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was solved in the check method. |
||
|
||
$that->check($resolve, $reject); | ||
}; | ||
}; | ||
|
||
$ipv4Deferred = null; | ||
$timer = null; | ||
$that->resolverPromises[Message::TYPE_AAAA] = $that->resolve(Message::TYPE_AAAA, $reject)->then($lookupResolve(Message::TYPE_AAAA))->then(function () use (&$ipv4Deferred) { | ||
if ($ipv4Deferred instanceof Promise\Deferred) { | ||
$ipv4Deferred->resolve(); | ||
} | ||
}); | ||
clue marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://tools.ietf.org/html/rfc8305#section-3 appears to be a bit unclear, but to me it suggests both queries should be started simultaneously. If This appears to differ from the current implementation which always waits for 50ms to start the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, my intention was to add that later on. But I suppose we can just as easily add it now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO I read that as send them both out as fast as possible after each other. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented |
||
$that->resolverPromises[Message::TYPE_A] = $that->resolve(Message::TYPE_A, $reject)->then(function ($ips) use ($that, &$ipv4Deferred, &$timer) { | ||
if ($that->resolved[Message::TYPE_AAAA] === true) { | ||
return Promise\resolve($ips); | ||
} | ||
|
||
/** | ||
* Delay A lookup by 50ms sending out connection to IPv4 addresses when IPv6 records haven't | ||
* resolved yet as per RFC. | ||
* | ||
* @link https://tools.ietf.org/html/rfc8305#section-3 | ||
*/ | ||
$ipv4Deferred = new Promise\Deferred(); | ||
$deferred = new Promise\Deferred(); | ||
|
||
$timer = $that->loop->addTimer($that::RESOLVE_WAIT, function () use ($deferred, $ips) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Promise cancellation doesn't seem to cancel this timer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed that |
||
$ipv4Deferred = null; | ||
$deferred->resolve($ips); | ||
}); | ||
|
||
$ipv4Deferred->promise()->then(function () use ($that, &$timer, $deferred, $ips) { | ||
$that->loop->cancelTimer($timer); | ||
$deferred->resolve($ips); | ||
}); | ||
clue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return $deferred->promise(); | ||
})->then($lookupResolve(Message::TYPE_A)); | ||
}, function ($_, $reject) use ($that, &$timer) { | ||
$that->cleanUp(); | ||
|
||
if ($timer instanceof TimerInterface) { | ||
$that->loop->cancelTimer($timer); | ||
} | ||
|
||
$reject(new \RuntimeException('Connection to ' . $that->uri . ' cancelled during DNS lookup')); | ||
|
||
$_ = $reject = null; | ||
}); | ||
} | ||
|
||
/** | ||
* @internal | ||
*/ | ||
public function resolve($type, $reject) | ||
{ | ||
$that = $this; | ||
return $that->resolver->resolveAll($that->host, $type)->then(null, function () use ($type, $reject, $that) { | ||
unset($that->resolverPromises[$type]); | ||
$that->resolved[$type] = true; | ||
|
||
if ($that->hasBeenResolved() === false) { | ||
return; | ||
} | ||
|
||
if ($that->ipsCount === 0) { | ||
$that->resolved = null; | ||
$that->resolverPromises = null; | ||
$reject(new \RuntimeException('Connection to ' . $that->uri . ' failed during DNS lookup: DNS error')); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* @internal | ||
*/ | ||
public function check($resolve, $reject) | ||
{ | ||
if (\count($this->connectQueue) === 0 && $this->resolved[Message::TYPE_A] === true && $this->resolved[Message::TYPE_AAAA] === true && $this->timer instanceof TimerInterface) { | ||
$this->loop->cancelTimer($this->timer); | ||
$this->timer = null; | ||
} | ||
|
||
if (\count($this->connectQueue) === 0) { | ||
return; | ||
} | ||
|
||
$ip = \array_shift($this->connectQueue); | ||
|
||
$that = $this; | ||
$that->connectionPromises[$ip] = $this->attemptConnection($ip)->then(function ($connection) use ($that, $ip, $resolve) { | ||
unset($that->connectionPromises[$ip]); | ||
|
||
$that->cleanUp(); | ||
|
||
$resolve($connection); | ||
}, function () use ($that, $ip, $resolve, $reject) { | ||
unset($that->connectionPromises[$ip]); | ||
|
||
$that->failureCount++; | ||
|
||
if ($that->hasBeenResolved() === false) { | ||
return; | ||
} | ||
|
||
if ($that->ipsCount === $that->failureCount) { | ||
$that->cleanUp(); | ||
|
||
$reject(new \RuntimeException('All attempts to connect to "' . $that->host . '" have failed')); | ||
} | ||
}); | ||
|
||
/** | ||
* As long as we haven't connected yet keep popping an IP address of the connect queue until one of them | ||
* succeeds or they all fail. We will wait 100ms between connection attempts as per RFC. | ||
* | ||
* @link https://tools.ietf.org/html/rfc8305#section-5 | ||
*/ | ||
if ((\count($this->connectQueue) > 0 || ($this->resolved[Message::TYPE_A] === false || $this->resolved[Message::TYPE_AAAA] === false)) && $this->timer === null) { | ||
$this->timer = $this->loop->addPeriodicTimer(self::CONNECT_INTERVAL, function () use ($that, $resolve, $reject) { | ||
$that->check($resolve, $reject); | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* @internal | ||
*/ | ||
public function attemptConnection($ip) | ||
{ | ||
$promise = null; | ||
$that = $this; | ||
|
||
return new Promise\Promise( | ||
function ($resolve, $reject) use (&$promise, $that, $ip) { | ||
$uri = ''; | ||
|
||
// prepend original scheme if known | ||
if (isset($that->parts['scheme'])) { | ||
$uri .= $that->parts['scheme'] . '://'; | ||
} | ||
|
||
if (\strpos($ip, ':') !== false) { | ||
// enclose IPv6 addresses in square brackets before appending port | ||
$uri .= '[' . $ip . ']'; | ||
} else { | ||
$uri .= $ip; | ||
} | ||
|
||
// append original port if known | ||
if (isset($that->parts['port'])) { | ||
$uri .= ':' . $that->parts['port']; | ||
} | ||
|
||
// append orignal path if known | ||
if (isset($that->parts['path'])) { | ||
$uri .= $that->parts['path']; | ||
} | ||
|
||
// append original query if known | ||
if (isset($that->parts['query'])) { | ||
$uri .= '?' . $that->parts['query']; | ||
} | ||
|
||
// append original hostname as query if resolved via DNS and if | ||
// destination URI does not contain "hostname" query param already | ||
$args = array(); | ||
\parse_str(isset($that->parts['query']) ? $that->parts['query'] : '', $args); | ||
if ($that->host !== $ip && !isset($args['hostname'])) { | ||
$uri .= (isset($that->parts['query']) ? '&' : '?') . 'hostname=' . \rawurlencode($that->host); | ||
} | ||
|
||
// append original fragment if known | ||
if (isset($that->parts['fragment'])) { | ||
$uri .= '#' . $that->parts['fragment']; | ||
} | ||
|
||
$promise = $that->connector->connect($uri); | ||
$promise->then($resolve, $reject); | ||
}, | ||
function ($_, $reject) use (&$promise, $that) { | ||
// cancellation should reject connection attempt | ||
// (try to) cancel pending connection attempt | ||
$reject(new \RuntimeException('Connection to ' . $that->uri . ' cancelled during connection attempt')); | ||
|
||
if ($promise instanceof CancellablePromiseInterface) { | ||
// overwrite callback arguments for PHP7+ only, so they do not show | ||
// up in the Exception trace and do not cause a possible cyclic reference. | ||
$_ = $reject = null; | ||
|
||
$promise->cancel(); | ||
$promise = null; | ||
} | ||
} | ||
); | ||
} | ||
|
||
/** | ||
* @internal | ||
*/ | ||
public function cleanUp() | ||
{ | ||
/** @var CancellablePromiseInterface $promise */ | ||
foreach ($this->connectionPromises as $index => $connectionPromise) { | ||
if ($connectionPromise instanceof CancellablePromiseInterface) { | ||
$connectionPromise->cancel(); | ||
} | ||
} | ||
|
||
/** @var CancellablePromiseInterface $promise */ | ||
foreach ($this->resolverPromises as $index => $resolverPromise) { | ||
if ($resolverPromise instanceof CancellablePromiseInterface) { | ||
$resolverPromise->cancel(); | ||
} | ||
} | ||
|
||
if ($this->timer instanceof TimerInterface) { | ||
$this->loop->cancelTimer($this->timer); | ||
$this->timer = null; | ||
} | ||
} | ||
|
||
/** | ||
* @internal | ||
*/ | ||
public function hasBeenResolved() | ||
{ | ||
foreach ($this->resolved as $typeHasBeenResolved) { | ||
if ($typeHasBeenResolved === false) { | ||
return false; | ||
} | ||
} | ||
|
||
return true; | ||
} | ||
|
||
/** | ||
* Mixes an array of IP addresses into the connect queue in such a way they alternate when attempting to connect. | ||
* The goal behind it is first attempt to connect to IPv6, then to IPv4, then to IPv6 again until one of those | ||
* attempts succeeds. | ||
* | ||
* @link https://tools.ietf.org/html/rfc8305#section-4 | ||
* | ||
* @internal | ||
*/ | ||
public function mixIpsIntoConnectQueue(array $ips) | ||
{ | ||
$this->ipsCount += \count($ips); | ||
$connectQueueStash = $this->connectQueue; | ||
$this->connectQueue = array(); | ||
while (\count($connectQueueStash) > 0 || \count($ips) > 0) { | ||
if (\count($ips) > 0) { | ||
$this->connectQueue[] = \array_shift($ips); | ||
} | ||
if (\count($connectQueueStash) > 0) { | ||
$this->connectQueue[] = \array_shift($connectQueueStash); | ||
} | ||
} | ||
clue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.