Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
125 changes: 125 additions & 0 deletions batch_topology_diff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3

import os
import sys
import json
from typing import List, Dict
from datetime import datetime
from topology_diff import compare_topologies

def process_batch(test_cases: List[str], start_idx: int, batch_num: int, base_dir: str, output_dir: str, changes_list: List[Dict]) -> None:
"""Process a batch of test cases and save results to a single file."""
batch_results = []

print(f"\nProcessing batch {batch_num} ({len(test_cases)} test cases)")
print("=" * 80)

for idx, test_case in enumerate(test_cases, start=start_idx):
test_case_dir = os.path.join(base_dir, test_case)
if not os.path.exists(test_case_dir):
print(f"#{idx}: Skipping - directory not found: {test_case}")
continue

results = compare_topologies(test_case_dir)
batch_results.append(results)

# Print a brief status
status = results.get("status", "completed")
if status == "skipped":
print(f"#{idx}: {test_case}: {results['reason']}")
elif "error" in results:
print(f"#{idx}: {test_case}: Error - {results['error']}")
else:
has_changes = results['comparison'].get('status') == 'changes_detected'
changes = "changes detected" if has_changes else "no changes"
print(f"#{idx}: {test_case}: {results['versions']['combination']} - {changes}")

# If changes detected, add to changes list
if has_changes:
changes_list.append({
"test_case": test_case,
"test_number": idx,
"batch_number": batch_num,
"versions": results['versions'],
"changes": results['comparison']['sub_topologies']
})

# Save batch results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f"batch_{batch_num:03d}_{timestamp}.json")

with open(output_file, 'w') as f:
json.dump({
"batch_number": batch_num,
"start_index": start_idx,
"timestamp": datetime.now().isoformat(),
"test_cases": batch_results
}, f, indent=2)

print(f"\nBatch {batch_num} results saved to: {output_file}")

def main():
# Create output directories if they don't exist
base_output_dir = "/Users/pragatigupta/Desktop/Projects/ksql/topology_analysis"
batch_dir = os.path.join(base_output_dir, "batches")
os.makedirs(batch_dir, exist_ok=True)

# Get all test cases
base_dir = "/Users/pragatigupta/Desktop/Projects/ksql/ksqldb-functional-tests/src/test/resources/historical_plans"
all_test_cases = sorted(os.listdir(base_dir))

# Start from test case #401 (0-based index 400)
start_idx = 400

# Process in batches of 100
batch_size = 100
remaining_cases = all_test_cases[start_idx:]
num_batches = (len(remaining_cases) + batch_size - 1) // batch_size

print(f"Found {len(remaining_cases)} remaining test cases to process in {num_batches} batches")
print(f"Starting from test case #{start_idx + 1}")

# List to store all test cases with changes
changes_list = []

# Process all batches
for i in range(num_batches):
batch_start = i * batch_size
batch_end = min(batch_start + batch_size, len(remaining_cases))
batch = remaining_cases[batch_start:batch_end]

process_batch(batch, start_idx + batch_start, i + 1, base_dir, batch_dir, changes_list)

# Save the list of changes
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
changes_file = os.path.join(base_output_dir, f"topology_changes_{timestamp}.json")

with open(changes_file, 'w') as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"total_changes_found": len(changes_list),
"changes": changes_list
}, f, indent=2)

# Also save a simple list of changed test cases
changes_list_file = os.path.join(base_output_dir, f"changed_test_cases_{timestamp}.txt")

with open(changes_list_file, 'w') as f:
f.write(f"Found {len(changes_list)} test cases with changes:\n\n")
for change in sorted(changes_list, key=lambda x: x['test_number']):
f.write(f"#{change['test_number']}: {change['test_case']}\n")
f.write(f" {change['versions']['combination']}\n")
for subtop, details in change['changes'].items():
if details.get('status') == 'changes_detected':
added = len(details['changes'].get('added_in_v8', []))
removed = len(details['changes'].get('removed_in_v8', []))
f.write(f" Sub-topology {subtop}: {removed} components removed, {added} components added\n")
f.write("\n")

print(f"\nProcessing complete!")
print(f"Found {len(changes_list)} test cases with changes")
print(f"Detailed changes saved to: {changes_file}")
print(f"Summary of changes saved to: {changes_list_file}")

if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion ksqldb-api-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<version>8.0.1-0-ce</version>
</dependency>

<!-- Only required to provide LoginModule implementation in tests -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ID STRING KEY, BOOL_ARRAY ARRAY<BOOLEAN>, INT_ARRAY ARRAY<INTEGER>, BIGINT_ARRAY ARRAY<BIGINT>, DOUBLE_ARRAY ARRAY<DOUBLE>, STRING_ARRAY ARRAY<STRING>, DECIMAL_ARRAY ARRAY<DECIMAL(2, 1)>) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY<BOOLEAN>, `INT_ARRAY` ARRAY<INTEGER>, `BIGINT_ARRAY` ARRAY<BIGINT>, `DOUBLE_ARRAY` ARRAY<DOUBLE>, `STRING_ARRAY` ARRAY<STRING>, `DECIMAL_ARRAY` ARRAY<DECIMAL(2, 1)>",
"timestampColumn" : null,
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
}
},
"windowInfo" : null,
"orReplace" : false,
"isSource" : false
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_MAX(INPUT.BOOL_ARRAY) BOOL_MAX,\n ARRAY_MAX(INPUT.INT_ARRAY) INT_MAX,\n ARRAY_MAX(INPUT.BIGINT_ARRAY) BIGINT_MAX,\n ARRAY_MAX(INPUT.DOUBLE_ARRAY) DOUBLE_MAX,\n ARRAY_MAX(INPUT.STRING_ARRAY) STRING_MAX,\n ARRAY_MAX(INPUT.DECIMAL_ARRAY) DECIMAL_MAX\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` STRING KEY, `BOOL_MAX` BOOLEAN, `INT_MAX` INTEGER, `BIGINT_MAX` BIGINT, `DOUBLE_MAX` DOUBLE, `STRING_MAX` STRING, `DECIMAL_MAX` DECIMAL(2, 1)",
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
}
},
"windowInfo" : null,
"orReplace" : false,
"isSource" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
}
},
"timestampColumn" : null,
"sourceSchema" : "`ID` STRING KEY, `BOOL_ARRAY` ARRAY<BOOLEAN>, `INT_ARRAY` ARRAY<INTEGER>, `BIGINT_ARRAY` ARRAY<BIGINT>, `DOUBLE_ARRAY` ARRAY<DOUBLE>, `STRING_ARRAY` ARRAY<STRING>, `DECIMAL_ARRAY` ARRAY<DECIMAL(2, 1)>",
"pseudoColumnVersion" : 1
},
"keyColumnNames" : [ "ID" ],
"selectedKeys" : null,
"selectExpressions" : [ "ARRAY_MAX(BOOL_ARRAY) AS BOOL_MAX", "ARRAY_MAX(INT_ARRAY) AS INT_MAX", "ARRAY_MAX(BIGINT_ARRAY) AS BIGINT_MAX", "ARRAY_MAX(DOUBLE_ARRAY) AS DOUBLE_MAX", "ARRAY_MAX(STRING_ARRAY) AS STRING_MAX", "ARRAY_MAX(DECIMAL_ARRAY) AS DECIMAL_MAX" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
}
},
"topicName" : "OUTPUT",
"timestampColumn" : null
},
"queryId" : "CSAS_OUTPUT_0",
"runtimeId" : null
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"metric.reporters" : "",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.connect.basic.auth.credentials.reload" : "false",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.stream.enabled" : "true",
"ksql.query.push.v2.interpreter.enabled" : "true",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.variable.substitution.enable" : "true",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.query.push.v2.alos.enabled" : "true",
"ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.query.pull.range.scan.enabled" : "true",
"ksql.transient.query.cleanup.service.initial.delay.seconds" : "600",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.lambdas.enabled" : "true",
"ksql.source.table.materialization.enabled" : "true",
"ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.client.ip_port.configuration.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.deployment.type" : "selfManaged",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.websocket.connection.max.timeout.ms" : "3600000",
"ksql.persistence.wrap.single.values" : null,
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.connect.basic.auth.credentials.source" : "NONE",
"ksql.schema.registry.url" : "schema_registry.url:0",
"ksql.properties.overrides.denylist" : "",
"ksql.service.id" : "some.ksql.service.id",
"ksql.query.push.v2.max.catchup.consumers" : "5",
"ksql.assert.topic.default.timeout.ms" : "1000",
"ksql.query.pull.forwarding.timeout.ms" : "20000",
"ksql.query.push.v2.enabled" : "false",
"ksql.transient.query.cleanup.service.enable" : "true",
"ksql.query.push.v2.metrics.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "true",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.endpoint.migrate.query" : "true",
"ksql.query.push.v2.registry.installed" : "false",
"ksql.streams.num.stream.threads" : "4",
"ksql.metrics.tags.custom" : "",
"ksql.query.push.v2.catchup.consumer.msg.window" : "50",
"ksql.runtime.feature.shared.enabled" : "false",
"ksql.udf.collect.metrics" : "false",
"ksql.new.query.planner.enabled" : "false",
"ksql.connect.request.headers.plugin" : null,
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.headers.columns.enabled" : "true",
"enable.fips" : "false",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.connect.request.timeout.ms" : "5000",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.query.error.max.queue.size" : "10",
"ksql.query.cleanup.shutdown.timeout.ms" : "30000",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.create.or.replace.enabled" : "true",
"ksql.shared.runtimes.count" : "2",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.resource.extension.class" : null,
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.transient.query.cleanup.service.period.seconds" : "600",
"ksql.suppress.enabled" : "true",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.proxy.protocol.local.mode.enabled" : "false",
"ksql.connect.basic.auth.credentials.file" : "",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.fetch.remote.hosts.max.timeout.seconds" : "10",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.query.push.v2.new.latest.delay.ms" : "5000",
"ksql.query.push.v2.latest.reset.age.ms" : "30000",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.json_sr.converter.deserializer.enabled" : "true",
"ksql.assert.schema.default.timeout.ms" : "1000",
"ksql.query.pull.limit.clause.enabled" : "true",
"ksql.query.pull.router.thread.pool.size" : "50",
"ksql.query.push.v2.continuation.tokens.enabled" : "false",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "false",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.query.pull.thread.pool.size" : "50",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Loading