Skip to content

Commit

Permalink
EdgeAgent: Add logs provider (#930)
Browse files Browse the repository at this point in the history
* Add LogsProvider

* Fix build

* Fix build issue

* Change return type to IReadOnlyList instead of IEnumerable

* Fix bad merge
  • Loading branch information
varunpuranik authored Mar 12, 2019
1 parent f7e820f commit 6bc92d2
Show file tree
Hide file tree
Showing 16 changed files with 956 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;

/// <summary>
/// This interface provides the module runtime information.
Expand All @@ -21,23 +20,4 @@ public interface IRuntimeInfoProvider

Task<SystemInfo> GetSystemInfo();
}

public class SystemInfo
{
[JsonConstructor]
public SystemInfo(string operatingSystemType, string architecture, string version)
{
this.OperatingSystemType = operatingSystemType;
this.Architecture = architecture;
this.Version = version;
}

public string OperatingSystemType { get; }

public string Architecture { get; }

public string Version { get; }

static SystemInfo Empty { get; } = new SystemInfo(string.Empty, string.Empty, string.Empty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Streams" Version="1.3.11" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="System.Collections.Immutable" Version="1.5.0" />
<PackageReference Include="System.IO.FileSystem.Watcher" Version="4.3.0" />
Expand All @@ -40,4 +41,12 @@
<CodeAnalysisRuleSet>..\..\..\stylecop.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<Import Project="..\..\..\stylecop.props" />

<Target Name="ChangeAliasOfAkkaNet" BeforeTargets="FindReferenceAssembliesForReferences;ResolveReferences">
<ItemGroup>
<ReferencePath Condition="'%(FileName)' == 'Akka'">
<Aliases>akka</Aliases>
</ReferencePath>
</ItemGroup>
</Target>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Agent.Core
{
using Newtonsoft.Json;

public class SystemInfo
{
[JsonConstructor]
public SystemInfo(string operatingSystemType, string architecture, string version)
{
this.OperatingSystemType = operatingSystemType;
this.Architecture = architecture;
this.Version = version;
}

public string OperatingSystemType { get; }

public string Architecture { get; }

public string Version { get; }

static SystemInfo Empty { get; } = new SystemInfo(string.Empty, string.Empty, string.Empty);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
extern alias akka;
using akka::Akka.IO;

public interface ILogMessageParser
{
ModuleLogMessage Parse(ByteString byteString, string moduleId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;

public interface ILogsProcessor
{
Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId);

Task<IReadOnlyList<string>> GetText(Stream stream);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using System.Threading;
using System.Threading.Tasks;

public interface ILogsProvider
{
Task<byte[]> GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
extern alias akka;
using System;
using System.Text;
using System.Text.RegularExpressions;
using akka::Akka.IO;
using Microsoft.Azure.Devices.Edge.Util;

// Parses logs into message objects.
//
// Expected format -
// Each input payload should contain one frame in Docker format -
// 01 00 00 00 00 00 00 1f 52 6f 73 65 73 20 61 72 65 ...
// │ ─────┬── ─────┬───── R o s e s a r e...
// │ │ │
// └stdout │ │
// │ └─ 0x0000001f = 31 bytes (including the \n at the end)
// unused
//
// The payload itself is expected to be in this format -
// <logLevel> TimeStamp log text
// For example, this log line will be parsed as follows -
// <6> 2019-02-14 16:15:35.243 -08:00 [INF] [EdgeHub] - Version - 1.0.7-dev.BUILDNUMBER (COMMITID)
// LogLevel = 6
// TimeStamp = 2019-02-14 16:15:35.243 -08:00
// Text = [INF] [EdgeHub] - Version - 1.0.7-dev.BUILDNUMBER (COMMITID)
public class LogMessageParser : ILogMessageParser
{
const int DefaultLogLevel = 6;
const string LogRegexPattern = @"^(<(?<logLevel>\d)>)?\s*((?<timestamp>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}\s[+-]\d{2}:\d{2})\s)?\s*(?<logtext>.*)";

readonly string iotHubName;
readonly string deviceId;

public LogMessageParser(string iotHubName, string deviceId)
{
this.iotHubName = Preconditions.CheckNonWhiteSpace(iotHubName, nameof(iotHubName));
this.deviceId = Preconditions.CheckNonWhiteSpace(deviceId, nameof(deviceId));
}

public ModuleLogMessage Parse(ByteString byteString, string moduleId) =>
GetLogMessage(byteString, this.iotHubName, this.deviceId, moduleId);

internal static ModuleLogMessage GetLogMessage(ByteString arg, string iotHubName, string deviceId, string moduleId)
{
string stream = GetStream(arg[0]);
ByteString payload = arg.Slice(8);
string payloadString = payload.ToString(Encoding.UTF8);
(int logLevel, Option<DateTime> timeStamp, string logText) = ParseLogText(payloadString);
var moduleLogMessage = new ModuleLogMessage(iotHubName, deviceId, moduleId, stream, logLevel, timeStamp, logText);
return moduleLogMessage;
}

internal static string GetStream(byte streamByte) => streamByte == 2 ? "stderr" : "stdout";

internal static (int logLevel, Option<DateTime> timeStamp, string text) ParseLogText(string value)
{
var regex = new Regex(LogRegexPattern);
var match = regex.Match(value);
int logLevel = DefaultLogLevel;
string text = value;
Option<DateTime> timeStamp = Option.None<DateTime>();
if (match.Success)
{
var tsg = match.Groups["timestamp"];
if (tsg?.Length > 0)
{
if (DateTime.TryParse(tsg.Value, out DateTime dt))
{
timeStamp = Option.Some(dt);
}
}

var llg = match.Groups["logLevel"];
if (llg?.Length > 0)
{
string ll = llg.Value;
int.TryParse(ll, out logLevel);
}

var textGroup = match.Groups["logtext"];
if (textGroup?.Length > 0)
{
text = textGroup.Value;
}
}

return (logLevel, timeStamp, text);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
extern alias akka;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using akka::Akka;
using akka::Akka.Actor;
using akka::Akka.IO;
using Akka.Streams;
using Akka.Streams.Dsl;
using Microsoft.Azure.Devices.Edge.Util;

// Processes incoming logs stream and converts to the required format
//
// Docker format -
// Each input payload should contain one frame in Docker format -
// 01 00 00 00 00 00 00 1f 52 6f 73 65 73 20 61 72 65 ...
// │ ─────┬── ─────┬───── R o s e s a r e...
// │ │ │
// └stdout │ │
// │ └─ 0x0000001f = 31 bytes (including the \n at the end)
// unused
public class LogsProcessor : ILogsProcessor, IDisposable
{
static readonly Flow<ByteString, ByteString, NotUsed> FramingFlow
= Framing.LengthField(4, int.MaxValue, 4, ByteOrder.BigEndian);

readonly ActorSystem system;
readonly ActorMaterializer materializer;
readonly ILogMessageParser logMessageParser;

public LogsProcessor(ILogMessageParser logMessageParser)
{
this.logMessageParser = Preconditions.CheckNotNull(logMessageParser, nameof(logMessageParser));
this.system = ActorSystem.Create("LogsProcessor");
this.materializer = this.system.Materializer();
}

public async Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId)
{
Preconditions.CheckNotNull(stream, nameof(stream));
Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId));

var source = StreamConverters.FromInputStream(() => stream);
var seqSink = Sink.Seq<ModuleLogMessage>();
IRunnableGraph<Task<IImmutableList<ModuleLogMessage>>> graph = source
.Via(FramingFlow)
.Select(b => this.logMessageParser.Parse(b, moduleId))
.ToMaterialized(seqSink, Keep.Right);

IImmutableList<ModuleLogMessage> result = await graph.Run(this.materializer);
return result;
}

public async Task<IReadOnlyList<string>> GetText(Stream stream)
{
Preconditions.CheckNotNull(stream, nameof(stream));
var source = StreamConverters.FromInputStream(() => stream);
var seqSink = Sink.Seq<string>();
IRunnableGraph<Task<IImmutableList<string>>> graph = source
.Via(FramingFlow)
.Select(b => b.Slice(8))
.Select(b => b.ToString(Encoding.UTF8))
.ToMaterialized(seqSink, Keep.Right);

IImmutableList<string> result = await graph.Run(this.materializer);
return result;
}

public void Dispose()
{
this.system?.Dispose();
this.materializer?.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Storage;
using Microsoft.Azure.Devices.Edge.Util;

public class LogsProvider : ILogsProvider
{
readonly IRuntimeInfoProvider runtimeInfoProvider;
readonly ILogsProcessor logsProcessor;

public LogsProvider(IRuntimeInfoProvider runtimeInfoProvider, ILogsProcessor logsProcessor)
{
this.runtimeInfoProvider = Preconditions.CheckNotNull(runtimeInfoProvider, nameof(runtimeInfoProvider));
this.logsProcessor = Preconditions.CheckNotNull(logsProcessor, nameof(logsProcessor));
}

public async Task<byte[]> GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken)
{
Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, false, Option.None<int>(), cancellationToken);
byte[] logBytes = await this.GetProcessedLogs(logsStream, logOptions);
return logBytes;
}

static byte[] ProcessByContentEncoding(byte[] bytes, LogsContentEncoding contentEncoding) =>
contentEncoding == LogsContentEncoding.Gzip
? Compression.CompressToGzip(bytes)
: bytes;

async Task<byte[]> GetProcessedLogs(Stream logsStream, ModuleLogOptions logOptions)
{
byte[] logBytes = await this.ProcessByContentType(logsStream, logOptions);
logBytes = ProcessByContentEncoding(logBytes, logOptions.ContentEncoding);
return logBytes;
}

async Task<byte[]> ProcessByContentType(Stream logsStream, ModuleLogOptions logOptions)
{
switch (logOptions.ContentType)
{
case LogsContentType.Json:
IEnumerable<ModuleLogMessage> logMessages = await this.logsProcessor.GetMessages(logsStream, logOptions.Id);
return logMessages.ToBytes();

default:
IEnumerable<string> logTexts = await this.logsProcessor.GetText(logsStream);
string logTextString = logTexts.Join(string.Empty);
return logTextString.ToBytes();
}
}
}
}
Loading

0 comments on commit 6bc92d2

Please sign in to comment.