From 77410c7ec912f90bc111ff85a59bb2eaf82689ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Hofman?= Date: Mon, 23 Sep 2024 22:56:25 +0200 Subject: [PATCH] SNOW-1629635 fix connection pool allocation (#1024) --- .../ConnectionMultiplePoolsAsyncIT.cs | 2 +- .../ConnectionMultiplePoolsIT.cs | 2 +- .../ConnectionPoolChangedSessionIT.cs | 2 +- .../ConnectionPoolCommonIT.cs | 2 +- .../ConnectionSinglePoolCacheAsyncIT.cs | 2 +- .../ConnectionSinglePoolCacheIT.cs | 2 +- .../UnitTests/ConnectionCacheManagerTest.cs | 2 +- .../UnitTests/ConnectionPoolManagerTest.cs | 2 +- Snowflake.Data.Tests/Util/PoolConfig.cs | 2 +- .../Client/SnowflakeDbConnectionPool.cs | 15 +- .../Core/SFBlockingChunkDownloaderV3.cs | 476 +++++++++--------- .../Core/Session/SFSessionProperty.cs | 4 +- Snowflake.Data/Core/Tools/Diagnostics.cs | 62 +++ 13 files changed, 324 insertions(+), 251 deletions(-) create mode 100644 Snowflake.Data/Core/Tools/Diagnostics.cs diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs index aa5d431ed..e263cb9d3 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsAsyncIT.cs @@ -22,7 +22,7 @@ public class ConnectionMultiplePoolsAsyncIT: SFBaseTestAsync [SetUp] public new void BeforeTest() { - SnowflakeDbConnectionPool.SetConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); SnowflakeDbConnectionPool.ClearAllPools(); } diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs index 4b7ec61f0..7cfad7d91 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionMultiplePoolsIT.cs @@ -22,7 +22,7 @@ public class ConnectionMultiplePoolsIT: SFBaseTest [SetUp] public new void BeforeTest() { - SnowflakeDbConnectionPool.SetConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); SnowflakeDbConnectionPool.ClearAllPools(); } diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolChangedSessionIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolChangedSessionIT.cs index 801916cb0..6a484252d 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolChangedSessionIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolChangedSessionIT.cs @@ -48,7 +48,7 @@ public class ConnectionPoolChangedSessionIT : SFBaseTest public static void BeforeAllTests() { s_previousPoolConfigRestorer = new PoolConfig(); - SnowflakeDbConnectionPool.SetConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); } [SetUp] diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs index 6a0745b23..9bf78ffbe 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionPoolCommonIT.cs @@ -32,7 +32,7 @@ public ConnectionPoolCommonIT(ConnectionPoolType connectionPoolTypeUnderTest) [SetUp] public new void BeforeTest() { - SnowflakeDbConnectionPool.SetConnectionPoolVersion(_connectionPoolTypeUnderTest); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(_connectionPoolTypeUnderTest); SnowflakeDbConnectionPool.ClearAllPools(); if (_connectionPoolTypeUnderTest == ConnectionPoolType.SingleConnectionCache) { diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheAsyncIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheAsyncIT.cs index 2adb1a1e0..5d4365159 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheAsyncIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheAsyncIT.cs @@ -20,7 +20,7 @@ public class ConnectionSinglePoolCacheAsyncIT: SFBaseTestAsync [SetUp] public new void BeforeTest() { - SnowflakeDbConnectionPool.SetConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); SnowflakeDbConnectionPool.ClearAllPools(); } diff --git a/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs b/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs index 956f7f00c..1d73d2c81 100644 --- a/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/ConnectionSinglePoolCacheIT.cs @@ -21,7 +21,7 @@ public class ConnectionSinglePoolCacheIT: SFBaseTest [SetUp] public new void BeforeTest() { - SnowflakeDbConnectionPool.SetConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); SnowflakeDbConnectionPool.ClearAllPools(); SnowflakeDbConnectionPool.SetPooling(true); } diff --git a/Snowflake.Data.Tests/UnitTests/ConnectionCacheManagerTest.cs b/Snowflake.Data.Tests/UnitTests/ConnectionCacheManagerTest.cs index 589565ddf..0a991ee79 100644 --- a/Snowflake.Data.Tests/UnitTests/ConnectionCacheManagerTest.cs +++ b/Snowflake.Data.Tests/UnitTests/ConnectionCacheManagerTest.cs @@ -16,7 +16,7 @@ public class ConnectionCacheManagerTest public static void BeforeAllTests() { s_poolConfig = new PoolConfig(); - SnowflakeDbConnectionPool.SetConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); SessionPool.SessionFactory = new MockSessionFactory(); } diff --git a/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs b/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs index 70efa47fb..b53487d60 100644 --- a/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs +++ b/Snowflake.Data.Tests/UnitTests/ConnectionPoolManagerTest.cs @@ -30,7 +30,7 @@ class ConnectionPoolManagerTest public static void BeforeAllTests() { s_poolConfig = new PoolConfig(); - SnowflakeDbConnectionPool.SetConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(ConnectionPoolType.MultipleConnectionPool); SessionPool.SessionFactory = new MockSessionFactory(); } diff --git a/Snowflake.Data.Tests/Util/PoolConfig.cs b/Snowflake.Data.Tests/Util/PoolConfig.cs index 4291c2f81..e8c56cdc0 100644 --- a/Snowflake.Data.Tests/Util/PoolConfig.cs +++ b/Snowflake.Data.Tests/Util/PoolConfig.cs @@ -25,7 +25,7 @@ public PoolConfig() public void Reset() { - SnowflakeDbConnectionPool.SetConnectionPoolVersion(_connectionPoolType); + SnowflakeDbConnectionPool.ForceConnectionPoolVersion(_connectionPoolType); if (_connectionPoolType == ConnectionPoolType.MultipleConnectionPool) return; // for multiple connection pool setting parameters for all the pools doesn't work by design SnowflakeDbConnectionPool.SetMaxPoolSize(_maxPoolSize); diff --git a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs index 617c07ebd..fcee66e1a 100644 --- a/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnectionPool.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Snowflake.Data.Core; using Snowflake.Data.Core.Session; +using Snowflake.Data.Core.Tools; using Snowflake.Data.Log; namespace Snowflake.Data.Client @@ -25,7 +26,7 @@ private static IConnectionManager ConnectionManager { if (s_connectionManager != null) return s_connectionManager; - SetConnectionPoolVersion(DefaultConnectionPoolType); + SetConnectionPoolVersion(DefaultConnectionPoolType, false); return s_connectionManager; } } @@ -122,13 +123,16 @@ public static bool GetPooling() public static void SetOldConnectionPoolVersion() { - SetConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); + ForceConnectionPoolVersion(ConnectionPoolType.SingleConnectionCache); } - internal static void SetConnectionPoolVersion(ConnectionPoolType requestedPoolType) + private static void SetConnectionPoolVersion(ConnectionPoolType requestedPoolType, bool force) { lock (s_connectionManagerInstanceLock) { + if (s_connectionManager != null && !force) + return; + Diagnostics.LogDiagnostics(); s_connectionManager?.ClearAllPools(); if (requestedPoolType == ConnectionPoolType.MultipleConnectionPool) { @@ -143,6 +147,11 @@ internal static void SetConnectionPoolVersion(ConnectionPoolType requestedPoolTy } } + internal static void ForceConnectionPoolVersion(ConnectionPoolType requestedPoolType) + { + SetConnectionPoolVersion(requestedPoolType, true); + } + internal static ConnectionPoolType GetConnectionPoolVersion() { if (ConnectionManager != null) diff --git a/Snowflake.Data/Core/SFBlockingChunkDownloaderV3.cs b/Snowflake.Data/Core/SFBlockingChunkDownloaderV3.cs index 52fc754b5..282c502b1 100755 --- a/Snowflake.Data/Core/SFBlockingChunkDownloaderV3.cs +++ b/Snowflake.Data/Core/SFBlockingChunkDownloaderV3.cs @@ -1,237 +1,239 @@ -/* - * Copyright (c) 2012-2019 Snowflake Computing Inc. All rights reserved. - */ - -using System; -using System.IO.Compression; -using System.IO; -using System.Collections; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using System.Net.Http; -using Newtonsoft.Json; -using System.Diagnostics; -using Newtonsoft.Json.Serialization; -using Snowflake.Data.Log; - -namespace Snowflake.Data.Core -{ - class SFBlockingChunkDownloaderV3 : IChunkDownloader - { - static private SFLogger logger = SFLoggerFactory.GetLogger(); - - private List chunkDatas = new List(); - - private string qrmk; - - private int nextChunkToDownloadIndex; - - private int nextChunkToConsumeIndex; - - // External cancellation token, used to stop donwload - private CancellationToken externalCancellationToken; - - private readonly int prefetchSlot; - - private readonly IRestRequester _RestRequester; - - private readonly SFSessionProperties sessionProperies; - - private Dictionary chunkHeaders; - - private readonly SFBaseResultSet ResultSet; - - private readonly List chunkInfos; - - private readonly List> taskQueues; - - public SFBlockingChunkDownloaderV3(int colCount, - List chunkInfos, string qrmk, - Dictionary chunkHeaders, - CancellationToken cancellationToken, - SFBaseResultSet ResultSet, - ResultFormat resultFormat) - { - this.qrmk = qrmk; - this.chunkHeaders = chunkHeaders; - this.nextChunkToDownloadIndex = 0; - this.ResultSet = ResultSet; - this._RestRequester = ResultSet.sfStatement.SfSession.restRequester; - this.sessionProperies = ResultSet.sfStatement.SfSession.properties; - this.prefetchSlot = Math.Min(chunkInfos.Count, GetPrefetchThreads(ResultSet)); - this.chunkInfos = chunkInfos; - this.nextChunkToConsumeIndex = 0; - this.taskQueues = new List>(); - externalCancellationToken = cancellationToken; - - for (int i=0; i sessionParameters = resultSet.sfStatement.SfSession.ParameterMap; - String val = (String)sessionParameters[SFSessionParameter.CLIENT_PREFETCH_THREADS]; - return Int32.Parse(val); - } - - public async Task GetNextChunkAsync() - { - logger.Info($"NextChunkToConsume: {nextChunkToConsumeIndex}, NextChunkToDownload: {nextChunkToDownloadIndex}"); - if (nextChunkToConsumeIndex < chunkInfos.Count) - { - Task chunk = taskQueues[nextChunkToConsumeIndex % prefetchSlot]; - - if (nextChunkToDownloadIndex < chunkInfos.Count && nextChunkToConsumeIndex > 0) - { - BaseResultChunk reusableChunk = chunkDatas[nextChunkToDownloadIndex % prefetchSlot]; - reusableChunk.Reset(chunkInfos[nextChunkToDownloadIndex], nextChunkToDownloadIndex); - - taskQueues[nextChunkToDownloadIndex % prefetchSlot] = DownloadChunkAsync(new DownloadContextV3() - { - chunk = reusableChunk, - qrmk = this.qrmk, - chunkHeaders = this.chunkHeaders, - cancellationToken = externalCancellationToken - }); - nextChunkToDownloadIndex++; - - // in case of one slot we need to return the chunk already downloaded - if (prefetchSlot == 1) - { - chunk = taskQueues[0]; - } - } - nextChunkToConsumeIndex++; - return await chunk; - } - else - { - return await Task.FromResult(null); - } - } - - private async Task DownloadChunkAsync(DownloadContextV3 downloadContext) - { - BaseResultChunk chunk = downloadContext.chunk; - int backOffInSec = 1; - bool retry = false; - int retryCount = 0; - int maxRetry = int.Parse(sessionProperies[SFSessionProperty.MAXHTTPRETRIES]); - - do - { - retry = false; - - S3DownloadRequest downloadRequest = - new S3DownloadRequest() - { - Url = new UriBuilder(chunk.Url).Uri, - qrmk = downloadContext.qrmk, - // s3 download request timeout to one hour - RestTimeout = TimeSpan.FromHours(1), - HttpTimeout = Timeout.InfiniteTimeSpan, // Disable timeout for each request - chunkHeaders = downloadContext.chunkHeaders, - sid = ResultSet.sfStatement.SfSession.sessionId - }; - - using (var httpResponse = await _RestRequester.GetAsync(downloadRequest, downloadContext.cancellationToken) - .ConfigureAwait(continueOnCapturedContext: false)) - using (Stream stream = await httpResponse.Content.ReadAsStreamAsync() - .ConfigureAwait(continueOnCapturedContext: false)) - { - // retry on chunk downloading since the retry logic in HttpClient.RetryHandler - // doesn't cover this. The GET request could be succeeded but network error - // still could happen during reading chunk data from stream and that needs - // retry as well. - try - { - IEnumerable encoding; - if (httpResponse.Content.Headers.TryGetValues("Content-Encoding", out encoding)) - { - if (String.Compare(encoding.First(), "gzip", true) == 0) - { - using (Stream streamGzip = new GZipStream(stream, CompressionMode.Decompress)) - { - await ParseStreamIntoChunk(streamGzip, chunk).ConfigureAwait(false); - } - } - else - { - await ParseStreamIntoChunk(stream, chunk).ConfigureAwait(false); - } - } - else - { - await ParseStreamIntoChunk(stream, chunk).ConfigureAwait(false); - } - } - catch (Exception e) - { - if ((maxRetry <= 0) || (retryCount < maxRetry)) - { - retry = true; - // reset the chunk before retry in case there could be garbage - // data left from last attempt - chunk.ResetForRetry(); - await Task.Delay(TimeSpan.FromSeconds(backOffInSec), downloadContext.cancellationToken).ConfigureAwait(false); - ++retryCount; - // Set next backoff time - backOffInSec = backOffInSec * 2; - if (backOffInSec > HttpUtil.MAX_BACKOFF) - { - backOffInSec = HttpUtil.MAX_BACKOFF; - } - } - else - { - //parse error - throw new Exception("parse stream to Chunk error. " + e); - } - } - } - } while (retry); - logger.Info($"Succeed downloading chunk #{chunk.ChunkIndex}"); - return chunk; - } - - private async Task ParseStreamIntoChunk(Stream content, BaseResultChunk resultChunk) - { - IChunkParser parser = ChunkParserFactory.Instance.GetParser(resultChunk.ResultFormat, content); - await parser.ParseChunk(resultChunk); - } - } - - class DownloadContextV3 - { - public BaseResultChunk chunk { get; set; } - - public string qrmk { get; set; } - - public Dictionary chunkHeaders { get; set; } - - public CancellationToken cancellationToken { get; set; } - } -} +/* + * Copyright (c) 2012-2019 Snowflake Computing Inc. All rights reserved. + */ + +using System; +using System.IO.Compression; +using System.IO; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Net.Http; +using Newtonsoft.Json; +using System.Diagnostics; +using Newtonsoft.Json.Serialization; +using Snowflake.Data.Log; + +namespace Snowflake.Data.Core +{ + class SFBlockingChunkDownloaderV3 : IChunkDownloader + { + static private SFLogger logger = SFLoggerFactory.GetLogger(); + + private List chunkDatas = new List(); + + private string qrmk; + + private int nextChunkToDownloadIndex; + + private int nextChunkToConsumeIndex; + + // External cancellation token, used to stop donwload + private CancellationToken externalCancellationToken; + + private readonly int prefetchSlot; + + private readonly IRestRequester _RestRequester; + + private readonly SFSessionProperties sessionProperies; + + private Dictionary chunkHeaders; + + private readonly SFBaseResultSet ResultSet; + + private readonly List chunkInfos; + + private readonly List> taskQueues; + + public SFBlockingChunkDownloaderV3(int colCount, + List chunkInfos, string qrmk, + Dictionary chunkHeaders, + CancellationToken cancellationToken, + SFBaseResultSet ResultSet, + ResultFormat resultFormat) + { + this.qrmk = qrmk; + this.chunkHeaders = chunkHeaders; + this.nextChunkToDownloadIndex = 0; + this.ResultSet = ResultSet; + this._RestRequester = ResultSet.sfStatement.SfSession.restRequester; + this.sessionProperies = ResultSet.sfStatement.SfSession.properties; + this.prefetchSlot = Math.Min(chunkInfos.Count, GetPrefetchThreads(ResultSet)); + this.chunkInfos = chunkInfos; + this.nextChunkToConsumeIndex = 0; + this.taskQueues = new List>(); + externalCancellationToken = cancellationToken; + + for (int i=0; i sessionParameters = resultSet.sfStatement.SfSession.ParameterMap; + String val = (String)sessionParameters[SFSessionParameter.CLIENT_PREFETCH_THREADS]; + return Int32.Parse(val); + } + + public async Task GetNextChunkAsync() + { + logger.Info($"NextChunkToConsume: {nextChunkToConsumeIndex}, NextChunkToDownload: {nextChunkToDownloadIndex}"); + if (nextChunkToConsumeIndex < chunkInfos.Count) + { + Task chunk = taskQueues[nextChunkToConsumeIndex % prefetchSlot]; + + if (nextChunkToDownloadIndex < chunkInfos.Count && nextChunkToConsumeIndex > 0) + { + BaseResultChunk reusableChunk = chunkDatas[nextChunkToDownloadIndex % prefetchSlot]; + reusableChunk.Reset(chunkInfos[nextChunkToDownloadIndex], nextChunkToDownloadIndex); + + taskQueues[nextChunkToDownloadIndex % prefetchSlot] = DownloadChunkAsync(new DownloadContextV3() + { + chunk = reusableChunk, + qrmk = this.qrmk, + chunkHeaders = this.chunkHeaders, + cancellationToken = externalCancellationToken + }); + nextChunkToDownloadIndex++; + + // in case of one slot we need to return the chunk already downloaded + if (prefetchSlot == 1) + { + chunk = taskQueues[0]; + } + } + nextChunkToConsumeIndex++; + return await chunk; + } + else + { + return await Task.FromResult(null); + } + } + + private async Task DownloadChunkAsync(DownloadContextV3 downloadContext) + { + BaseResultChunk chunk = downloadContext.chunk; + int backOffInSec = 1; + bool retry = false; + int retryCount = 0; + int maxRetry = int.Parse(sessionProperies[SFSessionProperty.MAXHTTPRETRIES]); + + do + { + retry = false; + + S3DownloadRequest downloadRequest = + new S3DownloadRequest() + { + Url = new UriBuilder(chunk.Url).Uri, + qrmk = downloadContext.qrmk, + // s3 download request timeout to one hour + RestTimeout = TimeSpan.FromHours(1), + HttpTimeout = Timeout.InfiniteTimeSpan, // Disable timeout for each request + chunkHeaders = downloadContext.chunkHeaders, + sid = ResultSet.sfStatement.SfSession.sessionId + }; + + using (var httpResponse = await _RestRequester.GetAsync(downloadRequest, downloadContext.cancellationToken) + .ConfigureAwait(continueOnCapturedContext: false)) + using (Stream stream = await httpResponse.Content.ReadAsStreamAsync() + .ConfigureAwait(continueOnCapturedContext: false)) + { + // retry on chunk downloading since the retry logic in HttpClient.RetryHandler + // doesn't cover this. The GET request could be succeeded but network error + // still could happen during reading chunk data from stream and that needs + // retry as well. + try + { + IEnumerable encoding; + if (httpResponse.Content.Headers.TryGetValues("Content-Encoding", out encoding)) + { + if (String.Compare(encoding.First(), "gzip", true) == 0) + { + using (Stream streamGzip = new GZipStream(stream, CompressionMode.Decompress)) + { + await ParseStreamIntoChunk(streamGzip, chunk).ConfigureAwait(false); + } + } + else + { + await ParseStreamIntoChunk(stream, chunk).ConfigureAwait(false); + } + } + else + { + await ParseStreamIntoChunk(stream, chunk).ConfigureAwait(false); + } + } + catch (Exception e) + { + if ((maxRetry <= 0) || (retryCount < maxRetry)) + { + logger.Debug($"Retry {retryCount}/{maxRetry} of parse stream to chunk error: " + e.Message); + retry = true; + // reset the chunk before retry in case there could be garbage + // data left from last attempt + chunk.ResetForRetry(); + await Task.Delay(TimeSpan.FromSeconds(backOffInSec), downloadContext.cancellationToken).ConfigureAwait(false); + ++retryCount; + // Set next backoff time + backOffInSec = backOffInSec * 2; + if (backOffInSec > HttpUtil.MAX_BACKOFF) + { + backOffInSec = HttpUtil.MAX_BACKOFF; + } + } + else + { + //parse error + logger.Error("Failed retries of parse stream to chunk error: " + e.Message); + throw new Exception("Parse stream to chunk error: " + e.Message); + } + } + } + } while (retry); + logger.Info($"Succeed downloading chunk #{chunk.ChunkIndex}"); + return chunk; + } + + private async Task ParseStreamIntoChunk(Stream content, BaseResultChunk resultChunk) + { + IChunkParser parser = ChunkParserFactory.Instance.GetParser(resultChunk.ResultFormat, content); + await parser.ParseChunk(resultChunk); + } + } + + class DownloadContextV3 + { + public BaseResultChunk chunk { get; set; } + + public string qrmk { get; set; } + + public Dictionary chunkHeaders { get; set; } + + public CancellationToken cancellationToken { get; set; } + } +} diff --git a/Snowflake.Data/Core/Session/SFSessionProperty.cs b/Snowflake.Data/Core/Session/SFSessionProperty.cs index bfbe71a2a..07896ae14 100644 --- a/Snowflake.Data/Core/Session/SFSessionProperty.cs +++ b/Snowflake.Data/Core/Session/SFSessionProperty.cs @@ -213,9 +213,9 @@ internal static SFSessionProperties ParseConnectionString(string connectionStrin typeof(SFSessionProperty), keys[i].ToUpper()); properties.Add(p, values[i]); } - catch (ArgumentException e) + catch (ArgumentException) { - logger.Warn($"Property {keys[i]} not found ignored.", e); + logger.Debug($"Property {keys[i]} not found ignored."); } } diff --git a/Snowflake.Data/Core/Tools/Diagnostics.cs b/Snowflake.Data/Core/Tools/Diagnostics.cs new file mode 100644 index 000000000..dc0b19593 --- /dev/null +++ b/Snowflake.Data/Core/Tools/Diagnostics.cs @@ -0,0 +1,62 @@ +using System; +using System.Diagnostics; +using System.Reflection; +using System.Runtime; +using System.Runtime.InteropServices; +using System.Text; +using Snowflake.Data.Log; + +namespace Snowflake.Data.Core.Tools +{ + internal class Diagnostics + { + private const int PadRight = -25; + private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger(); + + public static void LogDiagnostics() => s_logger.Info(GetDiagnosticInfo()); + + private static string GetDiagnosticInfo() + { + StringBuilder info = new StringBuilder("System Diagnostics:\n"); + info.AppendLine($"{"OS", PadRight}: {OsName()}"); + info.AppendLine($"{"OS Description", PadRight}: {RuntimeInformation.OSDescription}"); + info.AppendLine($"{"OS Architecture", PadRight}: {RuntimeInformation.OSArchitecture}"); + info.AppendLine($"{"OS Version", PadRight}: {Environment.OSVersion}"); + info.AppendLine($"{"OS x64", PadRight}: {Environment.Is64BitOperatingSystem}"); + info.AppendLine($"{"Processor Architecture", PadRight}: {RuntimeInformation.ProcessArchitecture}"); + info.AppendLine($"{"Processor Count", PadRight}: {Environment.ProcessorCount}"); + info.AppendLine($"{".NET Framework", PadRight}: {RuntimeInformation.FrameworkDescription}"); + info.AppendLine($"{"CLR Runtime Version", PadRight}: {Environment.Version}"); + info.AppendLine($"{"App x64", PadRight}: {Environment.Is64BitProcess}"); + info.AppendLine($"{"GC Server Mode", PadRight}: {GCSettings.IsServerGC}"); + info.AppendLine($"{"GC LOH Compaction Mode", PadRight}: {GCSettings.LargeObjectHeapCompactionMode}"); + info.AppendLine($"{"GC Latency Mode", PadRight}: {GCSettings.LatencyMode}"); + info.AppendLine($"{"GC Total Memory", PadRight}: {GC.GetTotalMemory(false)}"); + AppendAssemblyInfo(info, Assembly.GetEntryAssembly(), "App"); + AppendAssemblyInfo(info, Assembly.GetExecutingAssembly(), "Driver"); + return info.ToString(); + } + + private static void AppendAssemblyInfo(StringBuilder info, Assembly assembly, string assemblyTag) + { + if (assembly != null) + { + var assemblyVersion = FileVersionInfo.GetVersionInfo(assembly.Location); + info.AppendLine($"{assemblyTag + " Name", PadRight}: {assemblyVersion.InternalName}"); + info.AppendLine($"{assemblyTag + " File", PadRight}: {assemblyVersion.FileName}"); + info.AppendLine($"{assemblyTag + " Version", PadRight}: {assemblyVersion.FileVersion}"); + } + } + + private static string OsName() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) + return "UNIX"; + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + return "WINDOWS"; + if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + return "OSX"; + return "Unknown"; + } + } +}