@@ -157,8 +157,7 @@ def verify_producer():
157157 # Producer config
158158 conf = {'bootstrap.servers' : bootstrap_servers ,
159159 'error_cb' : error_cb ,
160- 'api.version.request' : api_version_request ,
161- 'default.topic.config' : {'produce.offset.report' : True }}
160+ 'api.version.request' : api_version_request }
162161
163162 # Create producer
164163 p = confluent_kafka .Producer (conf )
@@ -756,10 +755,7 @@ def verify_avro():
756755 'api.version.request' : api_version_request ,
757756 'api.version.fallback.ms' : 0 ,
758757 'broker.version.fallback' : '0.11.0.0' ,
759- 'schema.registry.url' : schema_registry_url ,
760- 'default.topic.config' : {
761- 'produce.offset.report' : True
762- }}
758+ 'schema.registry.url' : schema_registry_url }
763759
764760 consumer_conf = dict (base_conf , ** {
765761 'group.id' : 'test.py' ,
@@ -777,20 +773,16 @@ def verify_avro_https(mode_conf):
777773 if mode_conf is None :
778774 abort_on_missing_configuration ('avro-https' )
779775
780- base_conf = dict ({'bootstrap.servers' : bootstrap_servers ,
781- 'error_cb' : error_cb ,
782- 'api.version.request' : api_version_request },
783- ** mode_conf )
776+ base_conf = dict (mode_conf , ** {'bootstrap.servers' : bootstrap_servers ,
777+ 'error_cb' : error_cb ,
778+ 'api.version.request' : api_version_request })
784779
785- consumer_conf = dict ({
786- 'group.id' : generate_group_id (),
787- 'session.timeout.ms' : 6000 ,
788- 'enable.auto.commit' : False ,
789- 'api.version.request' : api_version_request ,
790- 'on_commit' : print_commit_result ,
791- 'default.topic.config' : {
792- 'auto.offset.reset' : 'earliest'
793- }}, ** base_conf )
780+ consumer_conf = dict (base_conf , ** {'group.id' : generate_group_id (),
781+ 'session.timeout.ms' : 6000 ,
782+ 'enable.auto.commit' : False ,
783+ 'api.version.request' : api_version_request ,
784+ 'on_commit' : print_commit_result ,
785+ 'auto.offset.reset' : 'earliest' })
794786
795787 run_avro_loop (base_conf , consumer_conf )
796788
@@ -877,7 +869,7 @@ def run_avro_loop(producer_conf, consumer_conf):
877869
878870 msgcount = 0
879871 while msgcount < len (combinations ):
880- msg = c .poll (0 )
872+ msg = c .poll (100 )
881873
882874 if msg is None or msg .error ():
883875 continue
@@ -1238,8 +1230,8 @@ def print_usage(exitcode, reason=None):
12381230 modes = test_modes
12391231
12401232 if bootstrap_servers is None or topic is None :
1241- print_usage (1 , "Properties bootstrap.servers and topic must be set. "
1242- "Use {} as a template when creating a new conf file ." .format (testconf_file ))
1233+ print_usage (1 , "Missing property bootstrap.servers. "
1234+ "Ensure {} includes a valid bootstrap.servers property ." .format (testconf_file ))
12431235
12441236 print ('Using confluent_kafka module version %s (0x%x)' % confluent_kafka .version ())
12451237 print ('Using librdkafka version %s (0x%x)' % confluent_kafka .libversion ())
0 commit comments