v5.0.0
Using the fluent API with POCOs for generated classes that cannot be changed due to code regeneration offers a structured and maintainable approach to configuring your domain model, while keeping your codebase clean and flexible.
The ModelBuilder
class provides functionalities to define mappings for entities.
The configurations made using the fluent API will override any attributes defined on the entity or its properties.
Use the Entity<T>()
method of ModelBuilder
to configure entities and define their properties.
using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Http;
public static async Task InitModelAndCreateTableAsync(CancellationToken cancellationToken = default)
{
ModelBuilder builder = new();
builder.Entity<Account>()
.HasKey(c => c.Id)
.Property(b => b.Balance);
builder.Entity<Account>()
.Property(b => b.Secret)
.Ignore();
builder.Entity<Payment>()
.HasKey(c => c.Id)
.Property(b => b.Amount)
.Decimal(precision: 10, scale: 2);
var httpClient = new HttpClient
{
BaseAddress = new Uri("http://localhost:8088")
};
var httpClientFactory = new HttpClientFactory(httpClient);
var restApiClient = new KSqlDbRestApiClient(httpClientFactory, builder);
var entityCreationMetadata = new EntityCreationMetadata(kafkaTopic: nameof(Payment), partitions: 3);
var responseMessage = await restApiClient.CreateTableAsync<Payment>(entityCreationMetadata, true, cancellationToken);
var content = await responseMessage.Content.ReadAsStringAsync(cancellationToken);
entityCreationMetadata = new EntityCreationMetadata(kafkaTopic: nameof(Account), partitions: 1)
{
Replicas = 3
};
responseMessage = await restApiClient.CreateTableAsync<Account>(entityCreationMetadata, true, cancellationToken);
}
private record Payment
{
public string Id { get; set; } = null!;
public decimal Amount { get; set; }
public string Description { get; set; } = null!;
}
private record Account
{
public string Id { get; set; } = null!;
public decimal Balance { get; set; }
public string Secret { get; set; } = null!;
}
builder
: An instance of ModelBuilder
used to configure the model.
Entity<T>
: A method that configures an entity of type T.
HasKey(expression)
: A method used to specify the primary key for the entity. It takes a lambda expression that specifies the property that make up the primary key.
Property(expression)
: A method used to specify a property of the entity. It takes a lambda expression that specifies the property.
Decimal(precision, scale)
: A method used to specify precision and scale for a decimal property. Precision specifies the maximum number of digits, and scale specifies the number of digits to the right of the decimal point.
Ignore()
: A method used to specify that a particular property of an entity should be ignored during code generation.
The Id
column was designated as the primary key for the Payments
table, reflecting its configuration through the model builder's HasKey
method for the entity:
CREATE TABLE IF NOT EXISTS Payments (
Id VARCHAR PRIMARY KEY,
Amount DECIMAL(10,2),
Description VARCHAR
) WITH ( KAFKA_TOPIC='Payment', VALUE_FORMAT='Json', PARTITIONS='3' );
The Secret
column was excluded from the generated DDL statement due to its configuration using the model builder's Ignore
method:
CREATE TABLE IF NOT EXISTS Accounts (
Id VARCHAR PRIMARY KEY,
Balance DECIMAL
) WITH ( KAFKA_TOPIC='Account', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='3' );
This setup ensures that the ksqlDB
client, context, and model configuration are properly injected throughout your application.
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDB.RestApi.Client.KSql.RestApi.Enums;
var builder = WebApplication.CreateBuilder(args);
ModelBuilder modelBuilder = new();
builder.Services.ConfigureKSqlDb(
builder.Configuration.GetConnectionString("KafkaConnection")!,
parameters =>
{
parameters
.SetAutoOffsetReset(AutoOffsetReset.Latest)
.SetIdentifierEscaping(IdentifierEscaping.Always)
.SetJsonSerializerOptions(options =>
{
options.PropertyNameCaseInsensitive = true;
});
}
).AddSingleton(modelBuilder);
v5.0.0
To apply configurations using the provided ModelBuilder
, follow these steps:
-
Define Configuration Class: Create a class that implements the appropriate configuration interface. In this case,
PaymentConfiguration
implementsIFromItemTypeConfiguration<Payment>
. This class contains configuration logic for thePayment
entity. -
Configure Properties: Within the
Configure
method of your configuration class, use the providedIEntityTypeBuilder
to configure the properties of the entity. For instance, the code snippet provided configures theAmount
property of thePayment
entity to have a precision of 14 digits and a scale of 14. -
Apply Configuration: Instantiate a
ModelBuilder
object. Then, use theApply
method to apply the configuration defined in thePaymentConfiguration
class to theModelBuilder
.
Here's the code snippet demonstrating the application of configurations:
using ksqlDb.RestApi.Client.FluentAPI.Builders.Configuration;
public class PaymentConfiguration : IFromItemTypeConfiguration<Payment>
{
public void Configure(IEntityTypeBuilder<Payment> builder)
{
builder.Property(b => b.Amount)
.Decimal(precision: 14, scale: 14);
}
}
using ksqlDb.RestApi.Client.FluentAPI.Builders;
ModelBuilder builder = new();
builder.Apply(new PaymentConfiguration());
v5.0.0
You can add a global convention to the model builder for the decimal type in the following way.
using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDb.RestApi.Client.FluentAPI.Builders.Configuration;
var modelBuilder = new ModelBuilder();
var decimalTypeConvention = new DecimalTypeConvention(14, 14);
modelBuilder.AddConvention(decimalTypeConvention);
v5.1.0
Properties of an entity can be marked as a HEADER with the model builder's fluent API as demonstrated below:
using ksqlDb.RestApi.Client.FluentAPI.Builders;
string header = "abc";
modelBuilder.Entity<PocoWithHeader>()
.Property(c => c.Header)
.WithHeader(header);
var entityCreationMetadata = new EntityCreationMetadata(kafkaTopic: nameof(PocoWithHeader), partitions: 1)
{
Replicas = 3
};
var responseMessage = await restApiClient.CreateStreamAsync<PocoWithHeader>(entityCreationMetadata, true);
private record PocoWithHeader
{
public byte[] Header { get; init; } = null!;
}
Here's the equivalent KSQL statement for the described scenario:
CREATE STREAM IF NOT EXISTS PocoWithHeaders (
Header BYTES HEADER('abc')
) WITH ( KAFKA_TOPIC='PocoWithHeader', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='3' );
The WithHeader
function within the fluent API takes precedence over the HeadersAttribute
.
v5.1.0
Properties of an entity can be marked as a HEADERS with the model builder's fluent API as demonstrated below:
using ksqlDb.RestApi.Client.Metadata;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Annotations;
modelBuilder.Entity<Movie>()
.Property(c => c.Headers)
.WithHeaders();
var statementContext = new StatementContext
{
CreationType = CreationType.Create,
KSqlEntityType = KSqlEntityType.Stream
};
var statement = new CreateEntity(modelBuilder).Print<PocoWithHeaders>(statementContext, creationMetadata, null);
[Struct]
private record KeyValuePair
{
public string Key { get; set; }
public byte[] Value { get; set; }
}
private record Movie
{
public KeyValuePair[] Headers { get; init; } = null!;
}
The StructAttribute
in ksqlDb.RestApi.Client
marks a class as a STRUCT type in ksqlDB a strongly typed structured data type org.apache.kafka.connect.data.Struct
.
Without the annotation, the type's name, KeyValuePair
, would be used in code generation.
The value in the above statement
variable is equivalent to:
CREATE STREAM Movie (
Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>> HEADERS
) WITH ( KAFKA_TOPIC='MyMovie', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='1' );
The WithHeaders
function within the fluent API takes precedence over the HeadersAttribute
.
v6.1.0
The HasColumnName
function is employed during JSON deserialization and code generation, particularly in tasks like crafting CREATE STREAM or INSERT INTO statements.
The below code demonstrates how to use the HasColumnName
method in the fluent API to override the property name Description
to Desc
during code generation:
using ksqlDB.RestApi.Client.KSql.RestApi.Enums;
modelBuilder.Entity<Payment>()
.Property(b => b.Description)
.HasColumnName("Desc");
var statement = new CreateInsert(modelBuilder)
.Generate(payment, new InsertProperties { IdentifierEscaping = IdentifierEscaping.Keywords });
The KSQL snippet illustrates an example INSERT statement with the overridden column name, showing how it corresponds to the fluent API configuration:
INSERT INTO Payments (Id, Amount, Desc)
VALUES ('1', 33, 'Purchase');
v6.3.0
The AsStruct
function designates fields in entity types as ksqlDB struct types.
The following code showcases how to use the AsStruct
method in the fluent API to infer the underlying ksqlDB
type as a struct during code generation:
private record KeyValuePair
{
public string Key { get; set; } = null!;
public byte[] Value { get; set; } = null!;
}
private record Record
{
public KeyValuePair[] Headers { get; init; } = null!;
}
ModelBuilder builder = new();
builder.Entity<Record>()
.Property(b => b.Headers)
.AsStruct();
var creationMetadata = new EntityCreationMetadata("my_topic", partitions: 3);
var ksql = new StatementGenerator(builder).CreateTable<Record>(creationMetadata, ifNotExists: true);
The KSQL snippet illustrates an example CREATE TABLE statement with the injected STRUCT
type,
showing how it corresponds to the fluent API configuration:
CREATE TABLE IF NOT EXISTS Records (
Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>>
) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3' );
For array fields that contain a type that is a struct and that again contains a type that is a struct, it's necessary to mark the inner type as a struct type as well:
private record Child : Record
{
public First[] Firsts { get; init; } = null!;
}
private record First
{
public Second[] Seconds { get; init; } = null!;
}
private record Second
{
public string Test { get; init; } = null!;
}
ModelBuilder builder = new();
builder.Entity<Child>()
.Property(b => b.Headers)
.AsStruct();
builder.Entity<Child>()
.Property(b => b.Firsts)
.AsStruct();
builder.Entity<Child>()
.Property(b => b.Firsts.FirstOrDefault()!.Seconds)
.AsStruct();
v6.4.0
The purpose of the IgnoreInDML
function is to exclude specific fields from INSERT statements.
using System.ComponentModel.DataAnnotations;
public class Actor
{
[Key]
public int Id { get; set; }
public string Name { get; set; } = null!;
}
using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Http;
var ksqlDbUrl = "http://localhost:8088";
var httpClientFactory = new HttpClientFactory(new HttpClient() { BaseAddress = new Uri(ksqlDbUrl) });
var restApiClientOptions = new KSqlDBRestApiClientOptions
{
ShouldPluralizeFromItemName = false,
};
var modelBuilder = new ModelBuilder();
modelBuilder.Entity<Actor>()
.Property(c => c.Name)
.IgnoreInDML();
var restApiClient = new KSqlDbRestApiClient(httpClientFactory, modelBuilder, restApiClientOptions);
var actor = new Actor { Id = 1, Name = "Matthew David McConaughey" };
var ksql = restApiClient.ToInsertStatement(actor);
Generated KSQL:
INSERT INTO Actor (Id) VALUES (1);
WithHeaders
internally automatically marks the property to be ignored in DML statements.
v6.4.0
The RowTime
property in the registered Record
entity will be automatically excluded from CREATE
statements based on its name convention and long
type.
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Annotations;
public class Record
{
/// <summary>
/// Row timestamp, inferred from the underlying Kafka record if not overridden.
/// </summary>
[PseudoColumn]
public long RowTime { get; set; }
}
var modelBuilder = new ModelBuilder();
modelBuilder.Entity<Record>();
v6.5.0
The AsPseudoColumn
function designates fields or properties in entity types as ksqlDB
pseudocolumns..
Pseudocolumn identifiers
are not backticked in case of IdentifierEscaping.Always
or IdentifierEscaping.Keywords
.
builder.Entity<Record>()
.Property(c => c.RowTime)
.AsPseudoColumn();
public class Record
{
public long RowTime { get; }
}
Valid pseudocolumn names are:
- Headers
- RowOffset
- RowPartition
- RowTime