Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Broker Redirects for Session and Messages #3103

Merged
merged 19 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/Runner.Common/BrokerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public interface IBrokerServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);

Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate);
Task<TaskAgentSession> CreateSessionAsync(CancellationToken cancellationToken, TaskAgentSession session);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate);
}

public sealed class BrokerServer : RunnerService, IBrokerServer
Expand All @@ -44,11 +45,20 @@ private void CheckConnection()
}
}

public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate)
public Task<TaskAgentSession> CreateSessionAsync(CancellationToken cancellationToken, TaskAgentSession session)
{
CheckConnection();
var jobMessage = RetryRequest<TaskAgentSession>(
async () => await _brokerHttpClient.CreateSessionAsync(session, cancellationToken), cancellationToken);

return jobMessage;
luketomlinson marked this conversation as resolved.
Show resolved Hide resolved
}

public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate)
{
CheckConnection();
var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken);
async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken);

return jobMessage;
}
Expand Down
1 change: 1 addition & 0 deletions src/Runner.Listener/BrokerMessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
try
{
message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token,
null,
luketomlinson marked this conversation as resolved.
Show resolved Hide resolved
runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
Expand Down
36 changes: 33 additions & 3 deletions src/Runner.Listener/MessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;

namespace GitHub.Runner.Listener
{
Expand All @@ -33,6 +34,7 @@ public sealed class MessageListener : RunnerService, IMessageListener
private RunnerSettings _settings;
private ITerminal _term;
private IRunnerServer _runnerServer;
private IBrokerServer _brokerServer;
private TaskAgentSession _session;
private TimeSpan _getNextMessageRetryInterval;
private bool _accessTokenRevoked = false;
Expand All @@ -42,13 +44,15 @@ public sealed class MessageListener : RunnerService, IMessageListener
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private VssCredentials _creds;

public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);

_term = HostContext.GetService<ITerminal>();
_runnerServer = HostContext.GetService<IRunnerServer>();
_brokerServer = hostContext.GetService<IBrokerServer>();
}

public async Task<Boolean> CreateSessionAsync(CancellationToken token)
Expand All @@ -64,7 +68,7 @@ public async Task<Boolean> CreateSessionAsync(CancellationToken token)
// Create connection.
Trace.Info("Loading Credentials");
var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials();
_creds = credMgr.LoadCredentials();

var agent = new TaskAgentReference
{
Expand All @@ -86,7 +90,7 @@ public async Task<Boolean> CreateSessionAsync(CancellationToken token)
try
{
Trace.Info("Connecting to the Runner Server...");
await _runnerServer.ConnectAsync(new Uri(serverUrl), creds);
await _runnerServer.ConnectAsync(new Uri(serverUrl), _creds);
Trace.Info("VssConnection created");

_term.WriteLine();
Expand All @@ -98,6 +102,14 @@ public async Task<Boolean> CreateSessionAsync(CancellationToken token)
taskAgentSession,
token);

// if (_session.SessionMigrationURI != null)
luketomlinson marked this conversation as resolved.
Show resolved Hide resolved
// {
// Trace.Info($"Runner session is in migration mode: Creating Broker session with SessionMigrationURI: {0}", _session.SessionMigrationURI);
// var brokerServer = HostContext.GetService<IBrokerServer>();
// await brokerServer.ConnectAsync(_session.SessionMigrationURI, _creds);
// _session = await brokerServer.CreateSessionAsync(token, taskAgentSession);
// }

Trace.Info($"Session created.");
if (encounteringError)
{
Expand All @@ -124,7 +136,7 @@ public async Task<Boolean> CreateSessionAsync(CancellationToken token)
Trace.Error("Catch exception during create session.");
Trace.Error(ex);

if (ex is VssOAuthTokenRequestException vssOAuthEx && creds.Federated is VssOAuthCredential vssOAuthCred)
if (ex is VssOAuthTokenRequestException vssOAuthEx && _creds.Federated is VssOAuthCredential vssOAuthCred)
{
// "invalid_client" means the runner registration has been deleted from the server.
if (string.Equals(vssOAuthEx.Error, "invalid_client", StringComparison.OrdinalIgnoreCase))
Expand Down Expand Up @@ -228,6 +240,24 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
// Decrypt the message body if the session is using encryption
message = DecryptMessage(message);


if (message != null && message.MessageType == BrokerMigrationMessage.MessageType)
{
Trace.Info("BrokerMigration message received. Polling Broker for messages...");

var migrationMessage = JsonUtility.FromString<BrokerMigrationMessage>(message.Body);
var brokerServer = HostContext.GetService<IBrokerServer>();

await brokerServer.ConnectAsync(migrationMessage.BrokerBaseUrl, _creds);
message = await brokerServer.GetRunnerMessageAsync(token,
luketomlinson marked this conversation as resolved.
Show resolved Hide resolved
_session.SessionId,
runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
_settings.DisableUpdate);
}

luketomlinson marked this conversation as resolved.
Show resolved Hide resolved
if (message != null)
{
_lastMessageId = message.MessageId;
Expand Down
38 changes: 38 additions & 0 deletions src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Runtime.Serialization;

namespace GitHub.DistributedTask.WebApi
{
/// <summary>
/// Message that tells the runner to redirect itself to BrokerListener for messages.
/// (Note that we use a special Message instead of a simple 302. This is because
/// the runner will need to apply the runner's token to the request, and it is
/// a security best practice to *not* blindly add sensitive data to redirects
/// 302s.)
/// </summary>
[DataContract]
public class BrokerMigrationMessage
{
public static readonly string MessageType = "BrokerMigration";

public BrokerMigrationMessage()
{
}

public BrokerMigrationMessage(
Uri brokerUrl)
{
this.BrokerBaseUrl = brokerUrl;
}

/// <summary>
/// The base url for the broker listener
/// </summary>
[DataMember]
public Uri BrokerBaseUrl
{
get;
internal set;
}
}
}
7 changes: 7 additions & 0 deletions src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,12 @@ public bool UseFipsEncryption
get;
set;
}

[DataMember(EmitDefaultValue = false, IsRequired = false)]
public BrokerMigrationMessage BrokerMigrationMessage
{
get;
set;
}
}
}
33 changes: 33 additions & 0 deletions src/Sdk/WebApi/WebApi/BrokerHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public BrokerHttpClient(
}

public async Task<TaskAgentMessage> GetRunnerMessageAsync(
Guid? sessionId,
string runnerVersion,
TaskAgentStatus? status,
string os = null,
Expand All @@ -69,6 +70,11 @@ public async Task<TaskAgentMessage> GetRunnerMessageAsync(

List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();

if (sessionId != null)
{
queryParams.Add("sessionId", sessionId.Value.ToString());
}

if (status != null)
{
queryParams.Add("status", status.Value.ToString());
Expand Down Expand Up @@ -111,5 +117,32 @@ public async Task<TaskAgentMessage> GetRunnerMessageAsync(

throw new Exception($"Failed to get job message: {result.Error}");
}

public async Task<TaskAgentSession> CreateSessionAsync(
TaskAgentSession session,
CancellationToken cancellationToken = default)
{

var requestUri = new Uri(Client.BaseAddress, "session");
var requestContent = new ObjectContent<TaskAgentSession>(session, new VssJsonMediaTypeFormatter(true));

var result = await SendAsync<TaskAgentSession>(
new HttpMethod("POST"),
requestUri: requestUri,
content: requestContent,
cancellationToken: cancellationToken);

if (result.IsSuccess)
{
return result.Value;
}
luketomlinson marked this conversation as resolved.
Show resolved Hide resolved

if (result.StatusCode == HttpStatusCode.Forbidden)
{
throw new AccessDeniedException(result.Error);
}

throw new Exception($"Failed to create broker session: {result.Error}");
}
}
}
Loading