Skip to content

Latest commit

 

History

History
452 lines (341 loc) · 13.7 KB

modelbuilder.md

File metadata and controls

452 lines (341 loc) · 13.7 KB

Model builder

v5.0.0

Using the fluent API with POCOs for generated classes that cannot be changed due to code regeneration offers a structured and maintainable approach to configuring your domain model, while keeping your codebase clean and flexible.

The ModelBuilder class provides functionalities to define mappings for entities. The configurations made using the fluent API will override any attributes defined on the entity or its properties. Use the Entity<T>() method of ModelBuilder to configure entities and define their properties.

using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Http;

public static async Task InitModelAndCreateTableAsync(CancellationToken cancellationToken = default)
{
  ModelBuilder builder = new();

  builder.Entity<Account>()
    .HasKey(c => c.Id)
    .Property(b => b.Balance);

  builder.Entity<Account>()
    .Property(b => b.Secret)
    .Ignore();

  builder.Entity<Payment>()
    .HasKey(c => c.Id)
    .Property(b => b.Amount)
    .Decimal(precision: 10, scale: 2);

  var httpClient = new HttpClient
  {
    BaseAddress = new Uri("http://localhost:8088")
  };
  var httpClientFactory = new HttpClientFactory(httpClient);
  var restApiClient = new KSqlDbRestApiClient(httpClientFactory, builder);
  
  var entityCreationMetadata = new EntityCreationMetadata(kafkaTopic: nameof(Payment), partitions: 3);
  
  var responseMessage = await restApiClient.CreateTableAsync<Payment>(entityCreationMetadata, true, cancellationToken);
  var content = await responseMessage.Content.ReadAsStringAsync(cancellationToken);

  entityCreationMetadata = new EntityCreationMetadata(kafkaTopic: nameof(Account), partitions: 1)
  {
    Replicas = 3
  };
  responseMessage = await restApiClient.CreateTableAsync<Account>(entityCreationMetadata, true, cancellationToken);
}
private record Payment
{
  public string Id { get; set; } = null!;
  public decimal Amount { get; set; }
  public string Description { get; set; } = null!;
}

private record Account
{
  public string Id { get; set; } = null!;
  public decimal Balance { get; set; }
  public string Secret { get; set; } = null!;
}

builder: An instance of ModelBuilder used to configure the model.

Entity<T>: A method that configures an entity of type T.

HasKey(expression): A method used to specify the primary key for the entity. It takes a lambda expression that specifies the property that make up the primary key.

Property(expression): A method used to specify a property of the entity. It takes a lambda expression that specifies the property.

Decimal(precision, scale): A method used to specify precision and scale for a decimal property. Precision specifies the maximum number of digits, and scale specifies the number of digits to the right of the decimal point.

Ignore(): A method used to specify that a particular property of an entity should be ignored during code generation.

The Id column was designated as the primary key for the Payments table, reflecting its configuration through the model builder's HasKey method for the entity:

CREATE TABLE IF NOT EXISTS Payments (
	Id VARCHAR PRIMARY KEY,
	Amount DECIMAL(10,2),
	Description VARCHAR
) WITH ( KAFKA_TOPIC='Payment', VALUE_FORMAT='Json', PARTITIONS='3' );

The Secret column was excluded from the generated DDL statement due to its configuration using the model builder's Ignore method:

CREATE TABLE IF NOT EXISTS Accounts (
	Id VARCHAR PRIMARY KEY,
	Balance DECIMAL
) WITH ( KAFKA_TOPIC='Account', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='3' );

Dependency injection

This setup ensures that the ksqlDB client, context, and model configuration are properly injected throughout your application.

using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDB.RestApi.Client.KSql.RestApi.Enums;

var builder = WebApplication.CreateBuilder(args);

ModelBuilder modelBuilder = new();

builder.Services.ConfigureKSqlDb(
  builder.Configuration.GetConnectionString("KafkaConnection")!,
  parameters =>
  {
    parameters
      .SetAutoOffsetReset(AutoOffsetReset.Latest)
      .SetIdentifierEscaping(IdentifierEscaping.Always)
      .SetJsonSerializerOptions(options =>
      {
        options.PropertyNameCaseInsensitive = true;
      });
  }
).AddSingleton(modelBuilder);

IFromItemTypeConfiguration

v5.0.0

To apply configurations using the provided ModelBuilder, follow these steps:

  • Define Configuration Class: Create a class that implements the appropriate configuration interface. In this case, PaymentConfiguration implements IFromItemTypeConfiguration<Payment>. This class contains configuration logic for the Payment entity.

  • Configure Properties: Within the Configure method of your configuration class, use the provided IEntityTypeBuilder to configure the properties of the entity. For instance, the code snippet provided configures the Amount property of the Payment entity to have a precision of 14 digits and a scale of 14.

  • Apply Configuration: Instantiate a ModelBuilder object. Then, use the Apply method to apply the configuration defined in the PaymentConfiguration class to the ModelBuilder.

Here's the code snippet demonstrating the application of configurations:

using ksqlDb.RestApi.Client.FluentAPI.Builders.Configuration;

public class PaymentConfiguration : IFromItemTypeConfiguration<Payment>
{
  public void Configure(IEntityTypeBuilder<Payment> builder)
  {
    builder.Property(b => b.Amount)
      .Decimal(precision: 14, scale: 14);
  }
}
using ksqlDb.RestApi.Client.FluentAPI.Builders;

ModelBuilder builder = new();
builder.Apply(new PaymentConfiguration());

ModelBuilder conventions

v5.0.0

You can add a global convention to the model builder for the decimal type in the following way.

using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDb.RestApi.Client.FluentAPI.Builders.Configuration;

var modelBuilder = new ModelBuilder();
var decimalTypeConvention = new DecimalTypeConvention(14, 14);
modelBuilder.AddConvention(decimalTypeConvention);

WithHeader

v5.1.0

Properties of an entity can be marked as a HEADER with the model builder's fluent API as demonstrated below:

using ksqlDb.RestApi.Client.FluentAPI.Builders;

string header = "abc";
modelBuilder.Entity<PocoWithHeader>()
  .Property(c => c.Header)
  .WithHeader(header);

var entityCreationMetadata = new EntityCreationMetadata(kafkaTopic: nameof(PocoWithHeader), partitions: 1)
{
  Replicas = 3
};
var responseMessage = await restApiClient.CreateStreamAsync<PocoWithHeader>(entityCreationMetadata, true);
    
private record PocoWithHeader
{
  public byte[] Header { get; init; } = null!;
}

Here's the equivalent KSQL statement for the described scenario:

CREATE STREAM IF NOT EXISTS PocoWithHeaders (
	Header BYTES HEADER('abc')
) WITH ( KAFKA_TOPIC='PocoWithHeader', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='3' );

The WithHeader function within the fluent API takes precedence over the HeadersAttribute.

WithHeaders

v5.1.0

Properties of an entity can be marked as a HEADERS with the model builder's fluent API as demonstrated below:

using ksqlDb.RestApi.Client.Metadata;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Annotations;

modelBuilder.Entity<Movie>()
  .Property(c => c.Headers)
  .WithHeaders();

var statementContext = new StatementContext
{
  CreationType = CreationType.Create,
  KSqlEntityType = KSqlEntityType.Stream
};

var statement = new CreateEntity(modelBuilder).Print<PocoWithHeaders>(statementContext, creationMetadata, null);

[Struct]
private record KeyValuePair
{
  public string Key { get; set; }
  public byte[] Value { get; set; }
}

private record Movie
{
  public KeyValuePair[] Headers { get; init; } = null!;
}

The StructAttribute in ksqlDb.RestApi.Client marks a class as a STRUCT type in ksqlDB a strongly typed structured data type org.apache.kafka.connect.data.Struct. Without the annotation, the type's name, KeyValuePair, would be used in code generation.

The value in the above statement variable is equivalent to:

CREATE STREAM Movie (
	Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>> HEADERS
) WITH ( KAFKA_TOPIC='MyMovie', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='1' );

The WithHeaders function within the fluent API takes precedence over the HeadersAttribute.

HasColumnName

v6.1.0

The HasColumnName function is employed during JSON deserialization and code generation, particularly in tasks like crafting CREATE STREAM or INSERT INTO statements.

The below code demonstrates how to use the HasColumnName method in the fluent API to override the property name Description to Desc during code generation:

using ksqlDB.RestApi.Client.KSql.RestApi.Enums;

modelBuilder.Entity<Payment>()
  .Property(b => b.Description)
  .HasColumnName("Desc");

var statement = new CreateInsert(modelBuilder)
    .Generate(payment, new InsertProperties { IdentifierEscaping = IdentifierEscaping.Keywords });

The KSQL snippet illustrates an example INSERT statement with the overridden column name, showing how it corresponds to the fluent API configuration:

INSERT INTO Payments (Id, Amount, Desc)
VALUES ('1', 33, 'Purchase');

AsStruct

v6.3.0

The AsStruct function designates fields in entity types as ksqlDB struct types.

The following code showcases how to use the AsStruct method in the fluent API to infer the underlying ksqlDB type as a struct during code generation:

private record KeyValuePair
{
  public string Key { get; set; } = null!;
  public byte[] Value { get; set; } = null!;
}

private record Record
{
  public KeyValuePair[] Headers { get; init; } = null!;
}
ModelBuilder builder = new();
 
builder.Entity<Record>()
        .Property(b => b.Headers)
        .AsStruct();
        
var creationMetadata = new EntityCreationMetadata("my_topic", partitions: 3);

var ksql = new StatementGenerator(builder).CreateTable<Record>(creationMetadata, ifNotExists: true);

The KSQL snippet illustrates an example CREATE TABLE statement with the injected STRUCT type, showing how it corresponds to the fluent API configuration:

CREATE TABLE IF NOT EXISTS Records (
	  Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>>
) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3' );

For array fields that contain a type that is a struct and that again contains a type that is a struct, it's necessary to mark the inner type as a struct type as well:

private record Child : Record
{
  public First[] Firsts { get; init; } = null!;
}

private record First
{
  public Second[] Seconds { get; init; } = null!;
}

private record Second
{
  public string Test { get; init; } = null!;
}
ModelBuilder builder = new();

builder.Entity<Child>()
        .Property(b => b.Headers)
        .AsStruct();
builder.Entity<Child>()
        .Property(b => b.Firsts)
        .AsStruct();
builder.Entity<Child>()
        .Property(b => b.Firsts.FirstOrDefault()!.Seconds)
        .AsStruct();

IgnoreInDML

v6.4.0

The purpose of the IgnoreInDML function is to exclude specific fields from INSERT statements.

using System.ComponentModel.DataAnnotations;

public class Actor
{
  [Key]
  public int Id { get; set; }
  public string Name { get; set; } = null!;
}
using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Http;

var ksqlDbUrl = "http://localhost:8088";

var httpClientFactory = new HttpClientFactory(new HttpClient() { BaseAddress = new Uri(ksqlDbUrl) });

var restApiClientOptions = new KSqlDBRestApiClientOptions
{
  ShouldPluralizeFromItemName = false,
};

var modelBuilder = new ModelBuilder();
modelBuilder.Entity<Actor>()
  .Property(c => c.Name)
  .IgnoreInDML();

var restApiClient = new KSqlDbRestApiClient(httpClientFactory, modelBuilder, restApiClientOptions);

var actor = new Actor { Id = 1, Name = "Matthew David McConaughey" };

var ksql = restApiClient.ToInsertStatement(actor);

Generated KSQL:

INSERT INTO Actor (Id) VALUES (1);

WithHeaders internally automatically marks the property to be ignored in DML statements.

RowTime

v6.4.0

The RowTime property in the registered Record entity will be automatically excluded from CREATE statements based on its name convention and long type.

using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Annotations;

public class Record
{
  /// <summary>
  /// Row timestamp, inferred from the underlying Kafka record if not overridden.
  /// </summary>
  [PseudoColumn]
  public long RowTime { get; set; }
}
var modelBuilder = new ModelBuilder();
modelBuilder.Entity<Record>();

AsPseudoColumn

v6.5.0

The AsPseudoColumn function designates fields or properties in entity types as ksqlDB pseudocolumns.. Pseudocolumn identifiers are not backticked in case of IdentifierEscaping.Always or IdentifierEscaping.Keywords.

builder.Entity<Record>()
  .Property(c => c.RowTime)
  .AsPseudoColumn();
public class Record
{
  public long RowTime { get; }
}

Valid pseudocolumn names are:

  • Headers
  • RowOffset
  • RowPartition
  • RowTime