Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client diagnostics - Add sync APIs to RemoteFetcher #18807

Merged
merged 11 commits into from
Feb 16, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,39 @@
</EmbeddedResource>
</ItemGroup>

<!-- Common source from Azure.Core -->
<ItemGroup>
<Compile Include="$(AzureCoreSharedSources)ArrayBufferWriter.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)ClientDiagnostics.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)ContentTypeUtilities.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)DiagnosticScope.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)DiagnosticScopeFactory.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)HttpMessageSanitizer.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)TaskExtensions.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)OperationHelpers.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)Argument.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)AzureResourceProviderNamespaceAttribute.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
</ItemGroup>

<Import Project="$(RepoRoot)\sdk\core\Azure.Core\src\Azure.Core.props" />
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ public enum DependencyResolutionOption
/// Do not process external dependencies.
/// </summary>
Disabled,

/// <summary>
/// Enable external dependencies.
/// </summary>
Enabled,

/// <summary>
/// Try to get external dependencies using .expanded.json.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
using System.Collections.Generic;
using System.Threading;
using System.Globalization;
using Azure.Core.Pipeline;

namespace Azure.Iot.ModelsRepository.Fetchers
{
internal class LocalModelFetcher : IModelFetcher
{
private readonly bool _tryExpanded;
private readonly ClientDiagnostics _clientDiagnostics;

public LocalModelFetcher(ResolverClientOptions clientOptions)
public LocalModelFetcher(ClientDiagnostics clientDiagnostics, ResolverClientOptions clientOptions)
{
_clientDiagnostics = clientDiagnostics;
_tryExpanded = clientOptions.DependencyResolution == DependencyResolutionOption.TryFromExpanded;
}

Expand All @@ -27,35 +30,48 @@ public Task<FetchResult> FetchAsync(string dtmi, Uri repositoryUri, Cancellation

public FetchResult Fetch(string dtmi, Uri repositoryUri, CancellationToken cancellationToken = default)
{
var work = new Queue<string>();
using DiagnosticScope scope = _clientDiagnostics.CreateScope("LocalModelFetcher.Fetch");
scope.Start();

if (_tryExpanded)
try
{
work.Enqueue(GetPath(dtmi, repositoryUri, true));
}
var work = new Queue<string>();

work.Enqueue(GetPath(dtmi, repositoryUri, false));
if (_tryExpanded)
{
work.Enqueue(GetPath(dtmi, repositoryUri, true));
}

string fnfError = string.Empty;
while (work.Count != 0 && !cancellationToken.IsCancellationRequested)
{
string tryContentPath = work.Dequeue();
ResolverEventSource.Shared.FetchingModelContent(tryContentPath);
work.Enqueue(GetPath(dtmi, repositoryUri, false));

if (File.Exists(tryContentPath))
string fnfError = string.Empty;
while (work.Count != 0 && !cancellationToken.IsCancellationRequested)
azabbasi marked this conversation as resolved.
Show resolved Hide resolved
{
return new FetchResult()
cancellationToken.ThrowIfCancellationRequested();

string tryContentPath = work.Dequeue();
ResolverEventSource.Instance.FetchingModelContent(tryContentPath);

if (File.Exists(tryContentPath))
{
Definition = File.ReadAllText(tryContentPath, Encoding.UTF8),
Path = tryContentPath
};
return new FetchResult
{
Definition = File.ReadAllText(tryContentPath, Encoding.UTF8),
Path = tryContentPath
};
}

ResolverEventSource.Instance.ErrorFetchingModelContent(tryContentPath);
fnfError = string.Format(CultureInfo.CurrentCulture, ServiceStrings.ErrorFetchingModelContent, tryContentPath);
}

ResolverEventSource.Shared.ErrorFetchingModelContent(tryContentPath);
fnfError = string.Format(CultureInfo.InvariantCulture, StandardStrings.ErrorFetchingModelContent, tryContentPath);
throw new RequestFailedException(fnfError, new FileNotFoundException(fnfError));
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}

throw new FileNotFoundException(fnfError);
}

private static string GetPath(string dtmi, Uri repositoryUri, bool expanded = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,110 @@ namespace Azure.Iot.ModelsRepository.Fetchers
internal class RemoteModelFetcher : IModelFetcher
{
private readonly HttpPipeline _pipeline;
private readonly ClientDiagnostics _clientDiagnostics;
private readonly bool _tryExpanded;

public RemoteModelFetcher(ResolverClientOptions clientOptions)
public RemoteModelFetcher(ClientDiagnostics clientDiagnostics, ResolverClientOptions clientOptions)
{
_pipeline = CreatePipeline(clientOptions);
_tryExpanded = clientOptions.DependencyResolution == DependencyResolutionOption.TryFromExpanded;
_clientDiagnostics = clientDiagnostics;
}

public FetchResult Fetch(string dtmi, Uri repositoryUri, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
using DiagnosticScope scope = _clientDiagnostics.CreateScope("RemoteModelFetcher.Fetch");
scope.Start();
try
{
Queue<string> work = PrepareWork(dtmi, repositoryUri);

public async Task<FetchResult> FetchAsync(string dtmi, Uri repositoryUri, CancellationToken cancellationToken = default)
{
Queue<string> work = new Queue<string>();
string remoteFetchError = string.Empty;

if (_tryExpanded)
while (work.Count != 0)
{
cancellationToken.ThrowIfCancellationRequested();

string tryContentPath = work.Dequeue();
ResolverEventSource.Instance.FetchingModelContent(tryContentPath);

try
{
string content = EvaluatePath(tryContentPath, cancellationToken);
return new FetchResult
{
Definition = content,
Path = tryContentPath
};
}
catch (Exception)
{
remoteFetchError = string.Format(CultureInfo.CurrentCulture, StandardStrings.ErrorFetchingModelContent, tryContentPath);
}
}

throw new RequestFailedException(remoteFetchError);
}
catch (Exception ex)
{
work.Enqueue(GetPath(dtmi, repositoryUri, true));
scope.Failed(ex);
throw;
}
}

work.Enqueue(GetPath(dtmi, repositoryUri, false));

string remoteFetchError = string.Empty;
while (work.Count != 0 && !cancellationToken.IsCancellationRequested)
public async Task<FetchResult> FetchAsync(string dtmi, Uri repositoryUri, CancellationToken cancellationToken = default)
{
using DiagnosticScope scope = _clientDiagnostics.CreateScope("RemoteModelFetcher.Fetch");
scope.Start();
try
{
string tryContentPath = work.Dequeue();
ResolverEventSource.Shared.FetchingModelContent(tryContentPath);
Queue<string> work = PrepareWork(dtmi, repositoryUri);

string remoteFetchError = string.Empty;

string content = await EvaluatePathAsync(tryContentPath, cancellationToken).ConfigureAwait(false);
if (!string.IsNullOrEmpty(content))
while (work.Count != 0)
{
return new FetchResult()
cancellationToken.ThrowIfCancellationRequested();

string tryContentPath = work.Dequeue();
ResolverEventSource.Instance.FetchingModelContent(tryContentPath);

try
{
Definition = content,
Path = tryContentPath
};
string content = await EvaluatePathAsync(tryContentPath, cancellationToken).ConfigureAwait(false);
return new FetchResult()
{
Definition = content,
Path = tryContentPath
};
}
catch (Exception)
{
remoteFetchError = string.Format(CultureInfo.CurrentCulture, StandardStrings.ErrorFetchingModelContent, tryContentPath);
}
}

ResolverEventSource.Shared.ErrorFetchingModelContent(tryContentPath);
remoteFetchError = string.Format(CultureInfo.CurrentCulture, StandardStrings.ErrorFetchingModelContent, tryContentPath);
throw new RequestFailedException(remoteFetchError);
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}
}

private Queue<string> PrepareWork(string dtmi, Uri repositoryUri)
{
Queue<string> work = new Queue<string>();

throw new RequestFailedException(remoteFetchError);
if (_tryExpanded)
{
work.Enqueue(GetPath(dtmi, repositoryUri, true));
}

work.Enqueue(GetPath(dtmi, repositoryUri, false));

return work;
}

private static string GetPath(string dtmi, Uri repositoryUri, bool expanded = false)
Expand All @@ -69,32 +128,89 @@ private static string GetPath(string dtmi, Uri repositoryUri, bool expanded = fa
return DtmiConventions.DtmiToQualifiedPath(dtmi, absoluteUri, expanded);
}

private async Task<string> EvaluatePathAsync(string path, CancellationToken cancellationToken)
private string EvaluatePath(string path, CancellationToken cancellationToken = default)
{
Request request = _pipeline.CreateRequest();
request.Method = RequestMethod.Get;
request.Uri = new RequestUriBuilder();
request.Uri.Reset(new Uri(path));
using DiagnosticScope scope = _clientDiagnostics.CreateScope("RemoteModelFetcher.EvaluatePath");
scope.Start();

try
{
using HttpMessage message = CreateGetRequest(path);

Response response = await _pipeline.SendRequestAsync(request, cancellationToken).ConfigureAwait(false);
_pipeline.Send(message, cancellationToken);

if (response.Status >= 200 && response.Status <= 299)
switch (message.Response.Status)
{
case 200:
{
return GetContent(message.Response.ContentStream);
}
default:
throw _clientDiagnostics.CreateRequestFailedException(message.Response);
}
}
catch (Exception ex)
{
return await GetContentAsync(response.ContentStream, cancellationToken).ConfigureAwait(false);
scope.Failed(ex);
throw;
}

return null;
}

private static async Task<string> GetContentAsync(Stream content, CancellationToken cancellationToken)
private async Task<string> EvaluatePathAsync(string path, CancellationToken cancellationToken = default)
{
using (JsonDocument json = await JsonDocument.ParseAsync(content, default, cancellationToken).ConfigureAwait(false))
using DiagnosticScope scope = _clientDiagnostics.CreateScope("RemoteModelFetcher.EvaluatePath");
scope.Start();

try
{
using HttpMessage message = CreateGetRequest(path);

await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);

switch (message.Response.Status)
{
case 200:
{
return await GetContentAsync(message.Response.ContentStream, cancellationToken).ConfigureAwait(false);
}
default:
throw _clientDiagnostics.CreateRequestFailedException(message.Response);
}
}
catch (Exception ex)
{
JsonElement root = json.RootElement;
return root.GetRawText();
scope.Failed(ex);
throw;
}
}

private HttpMessage CreateGetRequest(string path)
{
HttpMessage message = _pipeline.CreateMessage();
Request request = message.Request;
request.Method = RequestMethod.Get;
var uri = new RequestUriBuilder();
uri.Reset(new Uri(path));
request.Uri = uri;

return message;
}

private static string GetContent(Stream content)
{
using JsonDocument json = JsonDocument.Parse(content);
JsonElement root = json.RootElement;
return root.GetRawText();
}

private static async Task<string> GetContentAsync(Stream content, CancellationToken cancellationToken)
{
using JsonDocument json = await JsonDocument.ParseAsync(content, default, cancellationToken).ConfigureAwait(false);

JsonElement root = json.RootElement;
return root.GetRawText();
}

private static HttpPipeline CreatePipeline(ResolverClientOptions options)
{
return HttpPipelineBuilder.Build(options);
Expand Down
Loading