-
Notifications
You must be signed in to change notification settings - Fork 60
Test homeserver restarts during a partial state join #378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
squahtx
wants to merge
9
commits into
main
from
squah/faster_room_joins_resume_state_resyncing_on_restart
Closed
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
fb73074
Split out function to wait until a homeserver deployment is ready to …
6eee854
Explicitly specify host ports for homeserver deployments, otherwise t…
3147a3f
Add a function to restart a deployment
9d9bf8e
Add test for restarting a homeserver during a partial state join
6f221ed
Merge remote-tracking branch 'origin/main' into squah/faster_room_joi…
c0caa5e
Don't use AwaitStateIdsRequest a second time
1771a88
Merge branch 'main' into squah/faster_room_joins_resume_state_resynci…
squahtx 9103037
Merge branch 'main' into squah/faster_room_joins_resume_state_resynci…
ec228c1
Fix style: use camelcase
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,10 +20,12 @@ import ( | |
"crypto/tls" | ||
"fmt" | ||
"log" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"runtime" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
@@ -217,6 +219,11 @@ func deployImage( | |
log.Printf("Sharing %v host environment variables with container", env) | ||
} | ||
|
||
port1, port2, err := allocateHostPorts() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
body, err := docker.ContainerCreate(ctx, &container.Config{ | ||
Image: imageID, | ||
Env: env, | ||
|
@@ -232,12 +239,14 @@ func deployImage( | |
PortBindings: nat.PortMap{ | ||
nat.Port("8008/tcp"): []nat.PortBinding{ | ||
{ | ||
HostIP: "127.0.0.1", | ||
HostIP: "127.0.0.1", | ||
HostPort: strconv.Itoa(port1), | ||
}, | ||
}, | ||
nat.Port("8448/tcp"): []nat.PortBinding{ | ||
{ | ||
HostIP: "127.0.0.1", | ||
HostIP: "127.0.0.1", | ||
HostPort: strconv.Itoa(port2), | ||
}, | ||
}, | ||
}, | ||
|
@@ -328,61 +337,6 @@ func deployImage( | |
) | ||
} | ||
|
||
var lastErr error | ||
|
||
// Inspect health status of container to check it is up | ||
stopTime := time.Now().Add(cfg.SpawnHSTimeout) | ||
iterCount := 0 | ||
if inspect.State.Health != nil { | ||
// If the container has a healthcheck, wait for it first | ||
for { | ||
iterCount += 1 | ||
if time.Now().After(stopTime) { | ||
lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr) | ||
break | ||
} | ||
inspect, err = docker.ContainerInspect(ctx, containerID) | ||
if err != nil { | ||
lastErr = fmt.Errorf("inspect container %s => error: %s", containerID, err) | ||
time.Sleep(50 * time.Millisecond) | ||
continue | ||
} | ||
if inspect.State.Health.Status != "healthy" { | ||
lastErr = fmt.Errorf("inspect container %s => health: %s", containerID, inspect.State.Health.Status) | ||
time.Sleep(50 * time.Millisecond) | ||
continue | ||
} | ||
lastErr = nil | ||
break | ||
|
||
} | ||
} | ||
|
||
// Having optionally waited for container to self-report healthy | ||
// hit /versions to check it is actually responding | ||
versionsURL := fmt.Sprintf("%s/_matrix/client/versions", baseURL) | ||
|
||
for { | ||
iterCount += 1 | ||
if time.Now().After(stopTime) { | ||
lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr) | ||
break | ||
} | ||
res, err := http.Get(versionsURL) | ||
if err != nil { | ||
lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err) | ||
time.Sleep(50 * time.Millisecond) | ||
continue | ||
} | ||
if res.StatusCode != 200 { | ||
lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status) | ||
time.Sleep(50 * time.Millisecond) | ||
continue | ||
} | ||
lastErr = nil | ||
break | ||
} | ||
|
||
d := &HomeserverDeployment{ | ||
BaseURL: baseURL, | ||
FedBaseURL: fedBaseURL, | ||
|
@@ -391,8 +345,11 @@ func deployImage( | |
ApplicationServices: asIDToRegistrationFromLabels(inspect.Config.Labels), | ||
DeviceIDs: deviceIDsFromLabels(inspect.Config.Labels), | ||
} | ||
if lastErr != nil { | ||
return d, fmt.Errorf("%s: failed to check server is up. %w", contextStr, lastErr) | ||
|
||
stopTime := time.Now().Add(cfg.SpawnHSTimeout) | ||
iterCount, err := waitForContainer(ctx, docker, d, stopTime) | ||
if err != nil { | ||
return d, fmt.Errorf("%s: failed to check server is up. %w", contextStr, err) | ||
} else { | ||
if cfg.DebugLoggingEnabled { | ||
log.Printf("%s: Server is responding after %d iterations", contextStr, iterCount) | ||
|
@@ -401,6 +358,39 @@ func deployImage( | |
return d, nil | ||
} | ||
|
||
// Picks two free ports on localhost. Does not reserve them in any way. | ||
// The returned ports must be used before the next call to `allocateHostPorts`, | ||
// otherwise the same pair of ports may be returned. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot guarantee that as homeservers are deployed in parallel. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good point, I didn't realise that we ran tests in parallel at the time. |
||
func allocateHostPorts() (int, int, error) { | ||
localhostAnyPort := net.TCPAddr{ | ||
IP: net.ParseIP("127.0.0.1"), | ||
Port: 0, | ||
} | ||
|
||
listener1, err := net.ListenTCP("tcp", &localhostAnyPort) | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
listener2, err := net.ListenTCP("tcp", &localhostAnyPort) | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
|
||
port1 := listener1.Addr().(*net.TCPAddr).Port | ||
port2 := listener2.Addr().(*net.TCPAddr).Port | ||
|
||
err = listener1.Close() | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
err = listener2.Close() | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
|
||
return port1, port2, nil | ||
} | ||
|
||
func copyToContainer(docker *client.Client, containerID, path string, data []byte) error { | ||
// Create a fake/virtual file in memory that we can copy to the container | ||
// via https://stackoverflow.com/a/52131297/796832 | ||
|
@@ -427,6 +417,99 @@ func copyToContainer(docker *client.Client, containerID, path string, data []byt | |
return nil | ||
} | ||
|
||
// Waits until a homeserver deployment is ready to serve requests. | ||
func waitForContainer(ctx context.Context, docker *client.Client, hsDep *HomeserverDeployment, stopTime time.Time) (iterCount int, err error) { | ||
var lastErr error = nil | ||
|
||
iterCount = 0 | ||
|
||
// If the container has a healthcheck, wait for it first | ||
for { | ||
iterCount += 1 | ||
if time.Now().After(stopTime) { | ||
lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr) | ||
break | ||
} | ||
inspect, err := docker.ContainerInspect(ctx, hsDep.ContainerID) | ||
if err != nil { | ||
lastErr = fmt.Errorf("inspect container %s => error: %s", hsDep.ContainerID, err) | ||
time.Sleep(50 * time.Millisecond) | ||
continue | ||
} | ||
if inspect.State.Health != nil && | ||
inspect.State.Health.Status != "healthy" { | ||
lastErr = fmt.Errorf("inspect container %s => health: %s", hsDep.ContainerID, inspect.State.Health.Status) | ||
time.Sleep(50 * time.Millisecond) | ||
continue | ||
} | ||
|
||
// The container is healthy or has no health check. | ||
lastErr = nil | ||
break | ||
} | ||
|
||
// Having optionally waited for container to self-report healthy | ||
// hit /versions to check it is actually responding | ||
versionsURL := fmt.Sprintf("%s/_matrix/client/versions", hsDep.BaseURL) | ||
|
||
for { | ||
iterCount += 1 | ||
if time.Now().After(stopTime) { | ||
lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr) | ||
break | ||
} | ||
res, err := http.Get(versionsURL) | ||
if err != nil { | ||
lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err) | ||
time.Sleep(50 * time.Millisecond) | ||
continue | ||
} | ||
if res.StatusCode != 200 { | ||
lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status) | ||
time.Sleep(50 * time.Millisecond) | ||
continue | ||
} | ||
lastErr = nil | ||
break | ||
} | ||
|
||
return iterCount, lastErr | ||
} | ||
|
||
// Restart a deployment. | ||
func (dep *Deployment) Restart() error { | ||
ctx := context.Background() | ||
|
||
for _, hsDep := range dep.HS { | ||
err := dep.Deployer.Docker.ContainerStop(ctx, hsDep.ContainerID, &dep.Config.SpawnHSTimeout) | ||
if err != nil { | ||
return fmt.Errorf("failed to restart container %s: %s", hsDep.ContainerID, err) | ||
} | ||
|
||
// Remove the container from the network. If we don't do this, | ||
// (re)starting the container fails with an error like | ||
// "Error response from daemon: endpoint with name complement_fed_1_fed.alice.hs1_1 already exists in network complement_fed_alice". | ||
kegsay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
err = dep.Deployer.Docker.NetworkDisconnect(ctx, dep.Deployer.networkID, hsDep.ContainerID, false) | ||
if err != nil { | ||
return fmt.Errorf("failed to restart container %s: %s", hsDep.ContainerID, err) | ||
} | ||
|
||
err = dep.Deployer.Docker.ContainerStart(ctx, hsDep.ContainerID, types.ContainerStartOptions{}) | ||
if err != nil { | ||
return fmt.Errorf("failed to restart container %s: %s", hsDep.ContainerID, err) | ||
} | ||
|
||
// Wait for the container to be ready. | ||
stopTime := time.Now().Add(dep.Config.SpawnHSTimeout) | ||
_, err = waitForContainer(ctx, dep.Deployer.Docker, &hsDep, stopTime) | ||
if err != nil { | ||
return fmt.Errorf("failed to restart container %s: %s", hsDep.ContainerID, err) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// RoundTripper is a round tripper that maps https://hs1 to the federation port of the container | ||
// e.g https://localhost:35352 | ||
type RoundTripper struct { | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must provide an explicit port, otherwise we'll get a different random port when the container is restarted. Which breaks everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it break everything?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BaseURL
andFedBaseURL
ofHomeserverDeployment
become incorrect because they include the port number. We can fix up those URLs. But anyCSAPI
clients will continue to use the previous port number after the restart and that's not so easy to fix up.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like a reasonably easy thing to fix? There's several possible solutions:
Restart()
which explicitly states that any clients created with this deployment need to be recreated. Or...client.CSAPI
to update the base URL and get test authors to call that callback with the updated URL. Or...client.CSAPI
instances created viafunc (d *Deployment) Client(t *testing.T, hsName, userID string) *client.CSAPI
and automatically call the callback hook when the deployment is restarted to re-point port numbers.The last option is the most preferable because:
deployment.Restart()
.In addition, I would change the function signature of restart from:
to
which will fail the test if the restart fails (returns an error).
At present, this PR cannot be accepted because it will introduce flakiness because these port numbers will race: we have to let the underlying OS decide the ports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've split the PR and made the changes in #396. Let me know if I've missed anything.