Skip to content

Conversation

@Gezi-lzq
Copy link
Collaborator

No description provided.

@Gezi-lzq
Copy link
Collaborator Author

Gezi-lzq commented Nov 14, 2025

❯ just -f append-scenario/justfile create-topic
docker exec automq /opt/automq/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic orders --partitions 16 --config automq.table.topic.enable=true --config automq.table.topic.commit.interval.ms=1000 --config automq.table.topic.convert.value.type=by_schema_id --config automq.table.topic.transform.value.type=flatten --config automq.table.topic.namespace=default || true
Created topic orders.

❯ just -f append-scenario/justfile send-auto
Producing 10 Avro messages (auto-register) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 bash append-scenario/scripts/produce-avro-auto.sh orders 10
[append] Producing 10 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
                                                Create Table
------------------------------------------------------------------------------------------------------------
 CREATE TABLE iceberg.default.orders (
    order_id bigint NOT NULL,
    product_name varchar NOT NULL,
    order_description varchar NOT NULL,
    _kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
    _kafka_key varchar COMMENT 'Kafka record key',
    _kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata'
 )
 WITH (
    format = 'PARQUET',
    format_version = 2,
    location = 's3://warehouse/default/orders',
    object_store_layout_enabled = true
 )
(1 row)

❯ just -f append-scenario/justfile send-auto-v2
Producing 10 Avro messages (OrderV2 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV2.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 10
[append] Producing 10 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
                                                Create Table
-------------------------------------------------------------------------------------------------------------
 CREATE TABLE iceberg.default.orders (
    order_id bigint NOT NULL,
    product_name varchar NOT NULL,
    order_description varchar,
    _kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
    _kafka_key varchar COMMENT 'Kafka record key',
    _kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
    f_v2 varchar
 )
 WITH (
    format = 'PARQUET',
    format_version = 2,
    location = 's3://warehouse/default/orders',
    object_store_layout_enabled = true
 )
(1 row)

❯ just -f append-scenario/justfile send-auto-v3
Producing 10 Avro messages (OrderV2 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV3.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 10
[append] Producing 10 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
                                                Create Table
-------------------------------------------------------------------------------------------------------------
 CREATE TABLE iceberg.default.orders (
    order_id bigint NOT NULL,
    product_name varchar NOT NULL,
    order_description varchar,
    _kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
    _kafka_key varchar COMMENT 'Kafka record key',
    _kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
    f_v2 varchar,
    f_v3 varchar
 )
 WITH (
    format = 'PARQUET',
    format_version = 2,
    location = 's3://warehouse/default/orders',
    object_store_layout_enabled = true
 )
(1 row)

❯ just -f append-scenario/justfile send-auto-v4
Producing 10 Avro messages (OrderV2 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV4.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 10
[append] Producing 10 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
                                                Create Table
-------------------------------------------------------------------------------------------------------------
 CREATE TABLE iceberg.default.orders (
    order_id bigint NOT NULL,
    product_name varchar NOT NULL,
    order_description varchar,
    _kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
    _kafka_key varchar COMMENT 'Kafka record key',
    _kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
    f_v2 varchar,
    f_v3 varchar,
    f_v4 varchar
 )
 WITH (
    format = 'PARQUET',
    format_version = 2,
    location = 's3://warehouse/default/orders',
    object_store_layout_enabled = true
 )
(1 row)

❯ just trino-sql "SELECT order_id, product_name, order_description, f_v2, f_v3, f_v4 FROM iceberg.default.\"orders\" ORDER BY _kafka_metadata.timestamp"

Executing SQL (Trino): SELECT order_id, product_name, order_description, f_v2, f_v3, f_v4 FROM iceberg.default."orders" ORDER BY _kafka_metadata.timestamp
docker compose -f docker-compose.yml exec trino trino --execute 'SELECT order_id, product_name, order_description, f_v2, f_v3, f_v4 FROM iceberg.default."orders" ORDER BY _kafka_metadata.timestamp' --output-format ALIGNED --catalog iceberg --schema default
 order_id | product_name  | order_description |     f_v2      |     f_v3      |     f_v4
----------+---------------+-------------------+---------------+---------------+---------------
        1 | str_1_xaji0y  | str_1_6dpbhs      | NULL          | NULL          | NULL
        2 | str_2_ahxthv  | str_2_3a3zmf      | NULL          | NULL          | NULL
        4 | str_4_3w5uzb  | str_4_ikcidk      | NULL          | NULL          | NULL
        3 | str_3_8mdd4v  | str_3_30t9nt      | NULL          | NULL          | NULL
        5 | str_5_wnnhj7  | str_5_xvg0fn      | NULL          | NULL          | NULL
        8 | str_8_oh9sdb  | str_8_dw2pcn      | NULL          | NULL          | NULL
        6 | str_6_9xuy41  | str_6_ibljh7      | NULL          | NULL          | NULL
        9 | str_9_9t84az  | str_9_ytjxep      | NULL          | NULL          | NULL
        7 | str_7_5lxo6q  | str_7_jiujv6      | NULL          | NULL          | NULL
       10 | str_10_q85jsg | str_10_65kxvf     | NULL          | NULL          | NULL
        1 | str_1_xaji0y  | str_1_6dpbhs      | str_1_ahxthv  | NULL          | NULL
        2 | str_2_3a3zmf  | str_2_8mdd4v      | str_2_30t9nt  | NULL          | NULL
        4 | str_4_xvg0fn  | str_4_9xuy41      | str_4_ibljh7  | NULL          | NULL
        3 | str_3_3w5uzb  | str_3_ikcidk      | str_3_wnnhj7  | NULL          | NULL
        5 | str_5_5lxo6q  | str_5_jiujv6      | str_5_oh9sdb  | NULL          | NULL
        6 | str_6_dw2pcn  | str_6_9t84az      | str_6_ytjxep  | NULL          | NULL
        7 | str_7_q85jsg  | str_7_65kxvf      | str_7_1t2tal  | NULL          | NULL
        8 | str_8_a753lc  | str_8_58drc1      | str_8_1ertj5  | NULL          | NULL
        9 | str_9_pht0hl  | str_9_9xpsei      | str_9_mvihcw  | NULL          | NULL
       10 | str_10_i64ciy | str_10_he7ur2     | str_10_3gdppq | NULL          | NULL
        1 | str_1_xaji0y  | str_1_6dpbhs      | NULL          | str_1_ahxthv  | NULL
        4 | str_4_xvg0fn  | str_4_9xuy41      | NULL          | str_4_ibljh7  | NULL
        2 | str_2_3a3zmf  | str_2_8mdd4v      | NULL          | str_2_30t9nt  | NULL
        5 | str_5_5lxo6q  | str_5_jiujv6      | NULL          | str_5_oh9sdb  | NULL
        3 | str_3_3w5uzb  | str_3_ikcidk      | NULL          | str_3_wnnhj7  | NULL
        8 | str_8_a753lc  | str_8_58drc1      | NULL          | str_8_1ertj5  | NULL
        6 | str_6_dw2pcn  | str_6_9t84az      | NULL          | str_6_ytjxep  | NULL
        9 | str_9_pht0hl  | str_9_9xpsei      | NULL          | str_9_mvihcw  | NULL
        7 | str_7_q85jsg  | str_7_65kxvf      | NULL          | str_7_1t2tal  | NULL
       10 | str_10_i64ciy | str_10_he7ur2     | NULL          | str_10_3gdppq | NULL
        1 | str_1_xaji0y  | str_1_6dpbhs      | NULL          | str_1_ahxthv  | str_1_3a3zmf
        2 | str_2_8mdd4v  | str_2_30t9nt      | NULL          | str_2_3w5uzb  | str_2_ikcidk
        3 | str_3_wnnhj7  | str_3_xvg0fn      | NULL          | str_3_9xuy41  | str_3_ibljh7
        4 | str_4_5lxo6q  | str_4_jiujv6      | NULL          | str_4_oh9sdb  | str_4_dw2pcn
        8 | str_8_he7ur2  | str_8_3gdppq      | NULL          | str_8_0y9dom  | str_8_5igqpk
        5 | str_5_9t84az  | str_5_ytjxep      | NULL          | str_5_q85jsg  | str_5_65kxvf
        6 | str_6_1t2tal  | str_6_a753lc      | NULL          | str_6_58drc1  | str_6_1ertj5
        7 | str_7_pht0hl  | str_7_9xpsei      | NULL          | str_7_mvihcw  | str_7_i64ciy
        9 | str_9_i7p5tb  | str_9_94874f      | NULL          | str_9_rhocn9  | str_9_j2qp89
       10 | str_10_uzfk8u | str_10_t0cvs4     | NULL          | str_10_f8cgvy | str_10_ie6ivw
(40 rows)

{"name": "order_description", "type": ["null", "string"], "default": null},
{"name": "price", "type": "double", "default": 0.0},
{"name": "quantity", "type": "long", "default": 0}
{"name": "f_v2", "type": ["null", "string"], "default": null}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice idea with this field versioning - easy to follow and see what is new and in case of more changes needed can be still possible to follow without making diffs on those files

{"name": "order_id", "type": "long"},
{"name": "product_name", "type": "string"},
{"name": "order_description", "type": ["null", "string"], "default": null},
{"name": "f_v3", "type": ["null", "string"], "default": null}
Copy link

@dumbNickname dumbNickname Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my example I used obligatory field - I think I tried to specify as optional first and later redefine to obligatory but that indeed would be incompatible schema evolution. (not sure if that will matter but just mentioning)

{"name": "order_id", "type": "long"},
{"name": "product_name", "type": "string"},
{"name": "order_description", "type": ["null", "string"], "default": null},
{"name": "f_v2", "type": ["null", "string"], "default": null},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from schemas it looks correct - so my case was what i see here in v2, v3 and v5

@dumbNickname
Copy link

As I wrote above the use case i have been thinking about and tsting is the one you named v5 so the return data from sql statement would be

        1 | str_1_xaji0y  | str_1_6dpbhs      | str_1_ahxthv  | NULL          | NULL
        2 | str_2_3a3zmf  | str_2_8mdd4v      | str_2_30t9nt  | NULL          | NULL
        4 | str_4_xvg0fn  | str_4_9xuy41      | str_4_ibljh7  | NULL          | NULL
        3 | str_3_3w5uzb  | str_3_ikcidk      | str_3_wnnhj7  | NULL          | NULL
        5 | str_5_5lxo6q  | str_5_jiujv6      | str_5_oh9sdb  | NULL          | NULL
        6 | str_6_dw2pcn  | str_6_9t84az      | str_6_ytjxep  | NULL          | NULL
        7 | str_7_q85jsg  | str_7_65kxvf      | str_7_1t2tal  | NULL          | NULL
        8 | str_8_a753lc  | str_8_58drc1      | str_8_1ertj5  | NULL          | NULL
        9 | str_9_pht0hl  | str_9_9xpsei      | str_9_mvihcw  | NULL          | NULL
       10 | str_10_i64ciy | str_10_he7ur2     | str_10_3gdppq | NULL          | NULL
        1 | str_1_xaji0y  | str_1_6dpbhs      | NULL          | str_1_ahxthv  | NULL
        4 | str_4_xvg0fn  | str_4_9xuy41      | NULL          | str_4_ibljh7  | NULL
        2 | str_2_3a3zmf  | str_2_8mdd4v      | NULL          | str_2_30t9nt  | NULL
        5 | str_5_5lxo6q  | str_5_jiujv6      | NULL          | str_5_oh9sdb  | NULL
        3 | str_3_3w5uzb  | str_3_ikcidk      | NULL          | str_3_wnnhj7  | NULL
        8 | str_8_a753lc  | str_8_58drc1      | NULL          | str_8_1ertj5  | NULL
        6 | str_6_dw2pcn  | str_6_9t84az      | NULL          | str_6_ytjxep  | NULL
        9 | str_9_pht0hl  | str_9_9xpsei      | NULL          | str_9_mvihcw  | NULL
        7 | str_7_q85jsg  | str_7_65kxvf      | NULL          | str_7_1t2tal  | NULL
       10 | str_10_i64ciy | str_10_he7ur2     | NULL          | str_10_3gdppq | NULL
      ---

        1 | str_1_xaji0y  | str_1_6dpbhs      | str_1_3a3zmf          | str_1_ahxthv  | NULL
        2 | str_2_8mdd4v  | str_2_30t9nt      | str_2_ikcidk          | str_2_3w5uzb  | NULL
        3 | str_3_wnnhj7  | str_3_xvg0fn      | str_3_ibljh7          | str_3_9xuy41  | NULL
        4 | str_4_5lxo6q  | str_4_jiujv6      | str_4_dw2pcn          | str_4_oh9sdb  | NULL
        8 | str_8_he7ur2  | str_8_3gdppq      | str_8_5igqpk          | str_8_0y9dom  | NULL
        5 | str_5_9t84az  | str_5_ytjxep      | str_5_65kxvf          | str_5_q85jsg  | NULL
        6 | str_6_1t2tal  | str_6_a753lc      | str_6_1ertj5          | str_6_58drc1  | NULL
        7 | str_7_pht0hl  | str_7_9xpsei      | str_7_i64ciy          | str_7_mvihcw  | NULL
        9 | str_9_i7p5tb  | str_9_94874f      | str_9_j2qp89          | str_9_rhocn9  | NULL
       10 | str_10_uzfk8u | str_10_t0cvs4     | str_10_ie6ivw          | str_10_f8cgvy | NULL


@Gezi-lzq Gezi-lzq marked this pull request as draft November 14, 2025 13:07
@Gezi-lzq
Copy link
Collaborator Author

@dumbNickname This PR is just for demonstration purposes and isn't intended to be merged. In the comments above, I actually tested v5 as well, but the query output was getting too long, so to save some effort, I only copied the results up to the v4 run. Everything passed in my tests.

@Gezi-lzq
Copy link
Collaborator Author

❯ just -f append-scenario/justfile create-topic
docker exec automq /opt/automq/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic orders --partitions 16 --config automq.table.topic.enable=true --config automq.table.topic.commit.interval.ms=1000 --config automq.table.topic.convert.value.type=by_schema_id --config automq.table.topic.transform.value.type=flatten --config automq.table.topic.namespace=default || true
Created topic orders.
❯ just -f append-scenario/justfile send-auto
Producing 2 Avro messages (auto-register) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 bash append-scenario/scripts/produce-avro-auto.sh orders 2
[append] Producing 2 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
                                                Create Table
------------------------------------------------------------------------------------------------------------
 CREATE TABLE iceberg.default.orders (
    order_id bigint NOT NULL,
    product_name varchar NOT NULL,
    order_description varchar NOT NULL,
    _kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
    _kafka_key varchar COMMENT 'Kafka record key',
    _kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata'
 )
 WITH (
    format = 'PARQUET',
    format_version = 2,
    location = 's3://warehouse/default/orders',
    object_store_layout_enabled = true
 )
(1 row)
❯ just -f append-scenario/justfile send-auto-v2
Producing 2 Avro messages (OrderV2 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV2.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 2
[append] Producing 2 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
                                                Create Table
-------------------------------------------------------------------------------------------------------------
 CREATE TABLE iceberg.default.orders (
    order_id bigint NOT NULL,
    product_name varchar NOT NULL,
    order_description varchar,
    _kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
    _kafka_key varchar COMMENT 'Kafka record key',
    _kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
    f_v2 varchar
 )
 WITH (
    format = 'PARQUET',
    format_version = 2,
    location = 's3://warehouse/default/orders',
    object_store_layout_enabled = true
 )
(1 row)
❯ just -f append-scenario/justfile send-auto-v3
Producing 2 Avro messages (OrderV3 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV3.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 2
[append] Producing 2 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
                                                Create Table
-------------------------------------------------------------------------------------------------------------
 CREATE TABLE iceberg.default.orders (
    order_id bigint NOT NULL,
    product_name varchar NOT NULL,
    order_description varchar,
    _kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
    _kafka_key varchar COMMENT 'Kafka record key',
    _kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
    f_v2 varchar,
    f_v3 varchar
 )
 WITH (
    format = 'PARQUET',
    format_version = 2,
    location = 's3://warehouse/default/orders',
    object_store_layout_enabled = true
 )
(1 row)
❯ just -f append-scenario/justfile send-auto-v4
error: Justfile does not contain recipe `send-auto-v4`
Did you mean `send-auto-v2`?
❯ just -f append-scenario/justfile send-auto-v4
Producing 2 Avro messages (OrderV4 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV4.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 2
[append] Producing 2 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
                                                Create Table
-------------------------------------------------------------------------------------------------------------
 CREATE TABLE iceberg.default.orders (
    order_id bigint NOT NULL,
    product_name varchar NOT NULL,
    order_description varchar,
    _kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
    _kafka_key varchar COMMENT 'Kafka record key',
    _kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
    f_v2 varchar,
    f_v3 varchar
 )
 WITH (
    format = 'PARQUET',
    format_version = 2,
    location = 's3://warehouse/default/orders',
    object_store_layout_enabled = true
 )
(1 row)
❯ just trino-sql "SELECT order_id, product_name, order_description, f_v2, f_v3 FROM iceberg.default.\"orders\" ORDER BY _kafka_metadata.timestamp"

Executing SQL (Trino): SELECT order_id, product_name, order_description, f_v2, f_v3 FROM iceberg.default."orders" ORDER BY _kafka_metadata.timestamp
docker compose -f docker-compose.yml exec trino trino --execute 'SELECT order_id, product_name, order_description, f_v2, f_v3 FROM iceberg.default."orders" ORDER BY _kafka_metadata.timestamp' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
 order_id | product_name | order_description |     f_v2     |     f_v3
----------+--------------+-------------------+--------------+--------------
        1 | str_1_xaji0y | str_1_6dpbhs      | NULL         | NULL
        2 | str_2_ahxthv | str_2_3a3zmf      | NULL         | NULL
        1 | str_1_xaji0y | str_1_6dpbhs      | str_1_ahxthv | NULL
        2 | str_2_3a3zmf | str_2_8mdd4v      | str_2_30t9nt | NULL
        1 | str_1_xaji0y | str_1_6dpbhs      | NULL         | str_1_ahxthv
        2 | str_2_3a3zmf | str_2_8mdd4v      | NULL         | str_2_30t9nt
        1 | str_1_xaji0y | str_1_6dpbhs      | str_1_ahxthv | str_1_3a3zmf
        2 | str_2_8mdd4v | str_2_30t9nt      | str_2_3w5uzb | str_2_ikcidk
(8 rows)

@dumbNickname
Copy link

dumbNickname commented Nov 17, 2025

So I have setup the playground environment of yours and changed new fields to be obligatory in the avro schema (only string allowed). Then I see an exception similar to what I had with my own example:

java.io.IOException: kafka.automq.table.process.exception.RecordProcessorException: Data processing failed for record: topic=orders, partition=6, offset=0, timestamp=1763411534516 - [UNKNOW_ERROR] Unexpected error processing record: topic=orders, key=null, offset=0, timestamp=1763411534516: Invalid default for field f_v3: null not a "string" (caused by: AvroTypeException - Invalid default for field f_v3: null not a "string")

	at kafka.automq.table.worker.IcebergWriter.write(IcebergWriter.java:134)

	at kafka.automq.table.worker.PartitionWriteTask.handleReadResult(PartitionWriteTask.java:121)

	at kafka.automq.table.worker.PartitionWriteTask.lambda$run0$0(PartitionWriteTask.java:70)

	at kafka.automq.table.worker.EventLoops$EventLoopRef.lambda$execute$0(EventLoops.java:160)

	at kafka.automq.table.worker.EventLoops$EventLoopRef.lambda$execute$1(EventLoops.java:172)

	at com.automq.stream.utils.threads.EventLoop.run(EventLoop.java:69)

Caused by: kafka.automq.table.process.exception.RecordProcessorException: Data processing failed for record: topic=orders, partition=6, offset=0, timestamp=1763411534516 - [UNKNOW_ERROR] Unexpected error processing record: topic=orders, key=null, offset=0, timestamp=1763411534516: Invalid default for field f_v3: null not a "string" (caused by: AvroTypeException - Invalid default for field f_v3: null not a "string")

	at kafka.automq.table.worker.IcebergWriter.write0(IcebergWriter.java:158)

	at kafka.automq.table.worker.IcebergWriter.write(IcebergWriter.java:116)

	... 5 more

Caused by: org.apache.avro.AvroTypeException: Invalid default for field f_v3: null not a "string"

	at org.apache.avro.Schema.validateDefault(Schema.java:1635)

	at org.apache.avro.Schema.access$500(Schema.java:94)

	at org.apache.avro.Schema$Field.<init>(Schema.java:561)

	at org.apache.avro.Schema$Field.<init>(Schema.java:572)

	at kafka.automq.table.process.RecordAssembler.buildFinalAssemblerSchema(RecordAssembler.java:148)

	at kafka.automq.table.process.RecordAssembler.getOrCreateAssemblerSchema(RecordAssembler.java:139)

	at kafka.automq.table.process.RecordAssembler.assemble(RecordAssembler.java:112)

	at kafka.automq.table.process.DefaultRecordProcessor.process(DefaultRecordProcessor.java:119)

	at kafka.automq.table.worker.IcebergWriter.write0(IcebergWriter.java:145)

	... 6 more

example schema partial:

{"name": "f_v2", "type": "string", "default": null} and {"name": "f_v3", "type": "string", "default": null}

For me already submitting send-auto-v3 is enough to get the error. I hope this helps with further analysis

just up
just -f append-scenario/justfile create-topic
just -f append-scenario/justfile send-auto
just -f append-scenario/justfile send-auto-v2
just -f append-scenario/justfile send-auto-v3
just logs automq 

@Gezi-lzq
Copy link
Collaborator Author

@dumbNickname Hello, I'm a bit confused. I noticed you're using a field definition like {"name": "f_v2", "type": "string", "default": null}. As far as I know, in Avro, the type of the default value must match the field's type. Therefore, if the default value is null, you should either change the type to ["null", "string"] or change the default value to "" (an empty string).]

Perhaps I've missed something. Could you please clarify? Thank you.

@dumbNickname
Copy link

dumbNickname commented Nov 19, 2025

hi @Gezi-lzq

Maybe my understanding is wrong, but mostly based on some observations for how schema registry behaves vs iceberg. I am still pretty new to avro so please correct me if I am wrong.

My intention here would be - I need an obligatory field, if not provided then "event" should be rejected by the kafka queue. I do not really want a default value there to be injected. If that value is missing while the event is produced with schema already demanding it - I would assume that event is simply not valid and should not even go into kafka after validation agains schema registry.

I think that default value part does not affect the scenario presented above that much, because your send-auto utility anyway produces valid values for all fields and does not leave those out empty. IN that scenario it looks like iceberg is getting lost on sth that schema registry considered to be valid schema evolution.

@Gezi-lzq
Copy link
Collaborator Author

@dumbNickname Regarding your intention to add a required field (i.e., {"name": "f_v2", "type": "string"}), I am certain it will be blocked by the BACKWARD compatibility mode.

Based on my understanding of Schema Evolution rules, the correct approach here is to introduce it as an optional field using a union type, like: {"name": "f_v2", "type": ["null", "string"], "default": null}.

You raise an excellent point about why the Schema Registry might accept a definition like {"name": "f_v2", "type": "string", "default": null}—it seems to be treating it as an optional field due to the presence of the default value, which is a key mechanism for compatibility. This is something I should look into further to confirm the exact Avro specification behavior in this edge case.

Given your current use case and the compatibility constraint, I believe the proper way forward is indeed to treat both f_v2 and any future fields like f_v3 as optional fields.

@Gezi-lzq
Copy link
Collaborator Author

Okay, I found it. The issue seems to be an early bug/oversight in the check, documented here: confluentinc/schema-registry#1687 and confluentinc/schema-registry#2213. Additionally, the relevant configuration option is buried very deep (to the extent that I couldn't find it in the official documentation).

I was merely trying to argue that adding a field defined as {"name": "f_v2", "type": "string", "default": null} should not be allowed. (Perhaps I need to include that configuration setting(schema.providers.avro.validate.defaults) for this scenario.)

@dumbNickname
Copy link

Thanks for tracking it down. I will also read more on that default setting in avro schemas - just for safety :P

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants