Skip to content

Commit

Permalink
[ksqlDB.RestApi.Client]: separated configuration for pull and push qu…
Browse files Browse the repository at this point in the history
…eries. Added SetupPullQuery configuration option tests.
  • Loading branch information
tomasfabian committed Apr 26, 2024
1 parent 86ab8c5 commit eea7263
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using ksqlDb.RestApi.Client.IntegrationTests.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using UnitTests;

Expand All @@ -10,19 +11,36 @@ public abstract class IntegrationTests : TestBase
{
protected static KSqlDbRestApiProvider RestApiProvider = null!;
protected KSqlDBContextOptions ContextOptions = null!;
protected KSqlDBContext Context = null!;
private KSqlDBContext? context;

protected KSqlDBContext Context
{
get => context ??= (CreateKSqlDbContext(EndpointType.QueryStream));
set
{
context?.Dispose();

context = value;
}
}

[TestInitialize]
public override void TestInitialize()
{
base.TestInitialize();

Context = CreateKSqlDbContext(EndpointType.QueryStream);
}

protected KSqlDBContext CreateKSqlDbContext(EndpointType endpointType)
{
ContextOptions = new KSqlDBContextOptions(KSqlDbRestApiProvider.KsqlDbUrl)
{
ShouldPluralizeFromItemName = false
ShouldPluralizeFromItemName = false,
EndpointType = endpointType
};
Context = new KSqlDBContext(ContextOptions);

return new KSqlDBContext(ContextOptions);
}

[TestCleanup]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using ksqlDb.RestApi.Client.IntegrationTests.Models;
using ksqlDb.RestApi.Client.IntegrationTests.Models.Movies;
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using NUnit.Framework;

namespace ksqlDb.RestApi.Client.IntegrationTests.KSql.Linq;
Expand Down Expand Up @@ -70,6 +71,7 @@ private static async Task TestHistogram(IQbservable<Movie> querySource)
[Test]
public async Task Histogram_QueryEndPoint()
{
Context = CreateKSqlDbContext(EndpointType.Query);
await TestHistogram(Context.CreateQueryStream<Movie>(MoviesProvider.MoviesTableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public async Task FullOuterJoin()
[Test]
public async Task FullOuterJoin_QueryEndPoint()
{
Context = CreateKSqlDbContext(ksqlDB.RestApi.Client.KSql.Query.Options.EndpointType.Query);
await FullOuterJoinTest(Context.CreateQueryStream<Movie2>(MoviesTableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static async Task ClassInitialize()
{
contextOptions = new KSqlDBContextOptions(KSqlDbRestApiProvider.KsqlDbUrl)
{
EndpointType = EndpointType.Query //TODO: test QueryStream end point
EndpointType = EndpointType.QueryStream
};

context = new KSqlDBContext(contextOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using ksqlDb.RestApi.Client.IntegrationTests.Models.Movies;
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Functions;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using NUnit.Framework;
using Assert = Microsoft.VisualStudio.TestTools.UnitTesting.Assert;
Expand Down Expand Up @@ -45,6 +46,7 @@ public async Task DateToString()
[Test]
public async Task DateToString_QueryEndPoint()
{
Context = CreateKSqlDbContext(EndpointType.Query);
await DateToStringTest(Context.CreateQueryStream<Movie>(MoviesTableName));
}

Expand Down Expand Up @@ -78,6 +80,7 @@ public async Task Entries()
[Test]
public async Task Entries_QueryEndPoint()
{
Context = CreateKSqlDbContext(EndpointType.Query);
await EntriesTest(Context.CreateQueryStream<Movie>(MoviesTableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void SetProcessingGuarantee_WasNotSet()

//Assert
Assert.ThrowsException<KeyNotFoundException>(() =>
ClassUnderTest.QueryParameters[parameterName].Should().BeEmpty());
ClassUnderTest.QueryStreamParameters[parameterName].Should().BeEmpty());
}

[Test]
Expand All @@ -89,7 +89,6 @@ public void SetProcessingGuarantee_SetToAtLeastOnce()
//Assert
string expectedValue = "at_least_once";

ClassUnderTest.QueryParameters[parameterName].Should().Be(expectedValue);
ClassUnderTest.QueryStreamParameters[parameterName].Should().Be(expectedValue);
}

Expand All @@ -106,7 +105,6 @@ public void SetProcessingGuarantee_SetToExactlyOnce()
//Assert
string expectedValue = "exactly_once";

ClassUnderTest.QueryParameters[parameterName].Should().Be(expectedValue);
ClassUnderTest.QueryStreamParameters[parameterName].Should().Be(expectedValue);
}

Expand All @@ -123,7 +121,6 @@ public void SetProcessingGuarantee_SetToExactlyOnceV2()
//Assert
string expectedValue = "exactly_once_v2";

ClassUnderTest.QueryParameters[parameterName].Should().Be(expectedValue);
ClassUnderTest.QueryStreamParameters[parameterName].Should().Be(expectedValue);
}

Expand All @@ -139,29 +136,9 @@ public void SetAutoOffsetReset()
//Assert
string expectedValue = autoOffsetReset.ToString().ToLower();

ClassUnderTest.QueryParameters[QueryParameters.AutoOffsetResetPropertyName].Should().Be(expectedValue);
ClassUnderTest.QueryStreamParameters[QueryStreamParameters.AutoOffsetResetPropertyName].Should().Be(expectedValue);
}

[Test]
public void Clone()
{
//Arrange
var processingGuarantee = ProcessingGuarantee.AtLeastOnce;
string parameterName = KSqlDbConfigs.ProcessingGuarantee;
ClassUnderTest.SetProcessingGuarantee(processingGuarantee);

//Act
var clone = ClassUnderTest.Clone();

//Assert
string expectedValue = "at_least_once";

ClassUnderTest.Url.Should().Be(TestParameters.KsqlDbUrl);

clone.QueryParameters[parameterName].Should().Be(expectedValue);
clone.QueryStreamParameters[parameterName].Should().Be(expectedValue);
}

[Test]
public void JsonSerializerOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Parameters;
using ksqlDB.RestApi.Client.KSql.RestApi.Query;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Inserts;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Properties;
using ksqlDb.RestApi.Client.Tests.Models;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using NUnit.Framework;
using UnitTests;
using EndpointType = ksqlDB.RestApi.Client.KSql.Query.Options.EndpointType;
using TestParameters = ksqlDb.RestApi.Client.Tests.Helpers.TestParameters;

namespace ksqlDb.RestApi.Client.Tests.KSql.Query.Context;
Expand Down Expand Up @@ -332,4 +336,58 @@ public async Task NothingWasAdded_SaveChangesAsync_WasNotCalled()
//Assert
response.Should().BeNull();
}

[Test]
public void DependenciesForQueryEndpointTypeWereConfigured()
{
//Arrange
KSqlDBContextOptions contextOptions = new(TestParameters.KsqlDbUrl)
{
EndpointType = EndpointType.Query
};
var context = new KSqlDBContext(contextOptions);

_ = context.CreateQueryStream<int>();

var serviceProvider = context.ServiceCollection
.BuildServiceProvider(new ServiceProviderOptions { ValidateScopes = true });

//Act
var serviceScopeFactory = serviceProvider.GetRequiredService<IServiceScopeFactory>();
var queryDbProvider = serviceScopeFactory.CreateScope().ServiceProvider.GetRequiredService<IKSqlDbProvider>();
var pushQueryParameters = serviceScopeFactory.CreateScope().ServiceProvider.GetService<IKSqlDbParameters>();
var pullQueryParameters = serviceScopeFactory.CreateScope().ServiceProvider.GetService<IPullQueryParameters>();

//Assert
queryDbProvider.Should().BeOfType<KSqlDbQueryProvider>();
pushQueryParameters.Should().BeOfType<QueryParameters>();
pullQueryParameters.Should().BeOfType<PullQueryParameters>();
}

[Test]
public void DependenciesForQueryStreamEndpointTypeWereConfigured()
{
//Arrange
KSqlDBContextOptions contextOptions = new(TestParameters.KsqlDbUrl)
{
EndpointType = EndpointType.QueryStream
};
var context = new KSqlDBContext(contextOptions);

_ = context.CreateQueryStream<int>();

var serviceProvider = context.ServiceCollection
.BuildServiceProvider(new ServiceProviderOptions { ValidateScopes = true });

//Act
var serviceScopeFactory = serviceProvider.GetRequiredService<IServiceScopeFactory>();
var queryDbProvider = serviceScopeFactory.CreateScope().ServiceProvider.GetService<IKSqlDbProvider>();
var pushQueryParameters = serviceScopeFactory.CreateScope().ServiceProvider.GetService<IKSqlDbParameters>();
var pullQueryParameters = serviceScopeFactory.CreateScope().ServiceProvider.GetService<IPullQueryParameters>();

//Assert
queryDbProvider.Should().BeOfType<KSqlDbQueryStreamProvider>();
pushQueryParameters.Should().BeOfType<QueryStreamParameters>();
pullQueryParameters.Should().BeOfType<PullQueryStreamParameters>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public void SetProcessingGuarantee()
var options = setupParameters.SetProcessingGuarantee(ProcessingGuarantee.AtLeastOnce).Options;

//Assert
options.QueryParameters[KSqlDbConfigs.ProcessingGuarantee].Should().Be("at_least_once");
options.QueryStreamParameters[KSqlDbConfigs.ProcessingGuarantee].Should().Be("at_least_once");
}

Expand All @@ -107,7 +106,6 @@ public void SetProcessingGuarantee_ThenSetupQueryStream()
}).Options;

//Assert
options.QueryParameters[KSqlDbConfigs.ProcessingGuarantee].Should().Be("at_least_once");
options.QueryStreamParameters[KSqlDbConfigs.ProcessingGuarantee].Should().Be("at_least_once");
}

Expand All @@ -125,7 +123,6 @@ public void SetAutoOffsetReset()

//Assert
string expectedValue = autoOffsetReset.ToString().ToLower();
options.QueryParameters.Properties[QueryParameters.AutoOffsetResetPropertyName].Should().Be(expectedValue);
options.QueryStreamParameters[QueryStreamParameters.AutoOffsetResetPropertyName].Should().Be(expectedValue);
}

Expand Down Expand Up @@ -250,53 +247,22 @@ public void SetEndpointType_QueryWasSet(EndpointType endpointType)

#endregion

#region Query
#region SetupPullQuery

[Test]
public void SetupQuery_OptionsQueryParameters_AutoOffsetResetIsSetToDefault()
public void SetupPullQuery_PropertyWasSet()
{
//Arrange
var setupParameters = ClassUnderTest.UseKSqlDb(TestParameters.KsqlDbUrl);

//Act
var options = setupParameters.SetupQuery(_ =>
var options = setupParameters.SetupPullQuery(opt =>
{
opt[KSqlDbConfigs.KsqlQueryPullTableScanEnabled] = "true";
}).Options;

//Assert
options.QueryParameters.Properties[QueryParameters.AutoOffsetResetPropertyName].Should().BeEquivalentTo("earliest");
options.PullQueryParameters.Properties[KSqlDbConfigs.KsqlQueryPullTableScanEnabled].Should().BeEquivalentTo("true");
}

[Test]
public void SetupQueryNotCalled_OptionsQueryParameters_AutoOffsetResetIsSetToDefault()
{
//Arrange
var setupParameters = ClassUnderTest.UseKSqlDb(TestParameters.KsqlDbUrl);
string earliestAtoOffsetReset = AutoOffsetReset.Earliest.ToString().ToLower();

//Act
var options = setupParameters.Options;

//Assert
options.QueryParameters.Properties[QueryParameters.AutoOffsetResetPropertyName].Should().BeEquivalentTo(earliestAtoOffsetReset);
}

[Test]
public void SetupQuery_AmendOptionsQueryParametersProperty_AutoOffsetResetWasChanged()
{
//Arrange
var setupParameters = ClassUnderTest.UseKSqlDb(TestParameters.KsqlDbUrl);
string latestAtoOffsetReset = AutoOffsetReset.Latest.ToString().ToLower();

//Act
var options = setupParameters.SetupQuery(c =>
{
c.Properties[QueryParameters.AutoOffsetResetPropertyName] = latestAtoOffsetReset;
}).Options;

//Assert
options.QueryParameters.Properties[QueryParameters.AutoOffsetResetPropertyName].Should().BeEquivalentTo(latestAtoOffsetReset);
}

#endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,42 @@ namespace ksqlDb.RestApi.Client.Tests.KSql.RestApi.Parameters;

public class QueryParametersExtensionsTests
{
[Test]
public void FillFrom_DestinationParametersAreSetFromSource()
{
//Arrange
var source = new QueryStreamParameters
{
Sql = "Select",
["key"] = "value"
};
var destination = new QueryStreamParameters();

//Act
destination.FillFrom(source);

//Assert
destination.Sql.Should().BeEquivalentTo(source.Sql);
destination.Properties.Count.Should().Be(source.Properties.Count);
}

[Test]
public void FillPushQueryParametersFrom_DestinationParametersAreSetFromSource()
{
//Arrange
var source = new QueryStreamParameters()
{
AutoOffsetReset = AutoOffsetReset.Latest
};
var destination = new QueryStreamParameters();

//Act
destination.FillPushQueryParametersFrom(source);

//Assert
destination.AutoOffsetReset.Should().Be(source.AutoOffsetReset);
}

[Test]
public void ToLogInfo_QueryStreamParameters_NoParameters()
{
Expand Down
Loading

0 comments on commit eea7263

Please sign in to comment.