@@ -172,60 +172,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
172172 logger .info ("Stop auto" );
173173 }
174174
175- @ Test
176- public void testAutoCommitWithRebalanceListener () throws Exception {
177- logger .info ("Start auto" );
178- Map <String , Object > props = KafkaTestUtils .consumerProps ("test10" , "true" , embeddedKafka );
179- DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <Integer , String >(props );
180- ConcurrentMessageListenerContainer <Integer , String > container =
181- new ConcurrentMessageListenerContainer <>(cf , topic1 );
182- final CountDownLatch latch = new CountDownLatch (4 );
183- container .setMessageListener (new MessageListener <Integer , String >() {
184-
185- @ Override
186- public void onMessage (ConsumerRecord <Integer , String > message ) {
187- logger .info ("auto: " + message );
188- latch .countDown ();
189- }
190- });
191- final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch (2 );
192- final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch (2 );
193- container .setConsumerRebalanceListener (new ConsumerRebalanceListener () {
194-
195- @ Override
196- public void onPartitionsRevoked (Collection <TopicPartition > partitions ) {
197- logger .info ("In test, partitions revoked:" + partitions );
198- rebalancePartitionsRevokedLatch .countDown ();
199- }
200-
201- @ Override
202- public void onPartitionsAssigned (Collection <TopicPartition > partitions ) {
203- logger .info ("In test, partitions assigned:" + partitions );
204- rebalancePartitionsAssignedLatch .countDown ();
205- }
206-
207- });
208-
209- container .setConcurrency (2 );
210- container .setBeanName ("testAuto" );
211- container .start ();
212- ContainerTestUtils .waitForAssignment (container , embeddedKafka .getPartitionsPerTopic ());
213- Map <String , Object > senderProps = KafkaTestUtils .producerProps (embeddedKafka );
214- ProducerFactory <Integer , String > pf = new DefaultKafkaProducerFactory <Integer , String >(senderProps );
215- KafkaTemplate <Integer , String > template = new KafkaTemplate <>(pf );
216- template .setDefaultTopic (topic1 );
217- template .send (0 , "foo" );
218- template .send (2 , "bar" );
219- template .send (0 , "baz" );
220- template .send (2 , "qux" );
221- template .flush ();
222- assertThat (latch .await (60 , TimeUnit .SECONDS )).isTrue ();
223- assertThat (rebalancePartitionsAssignedLatch .await (60 , TimeUnit .SECONDS )).isTrue ();
224- assertThat (rebalancePartitionsRevokedLatch .await (60 , TimeUnit .SECONDS )).isTrue ();
225- container .stop ();
226- logger .info ("Stop auto" );
227- }
228-
229175 @ Test
230176 public void testAfterListenCommit () throws Exception {
231177 logger .info ("Start manual" );
0 commit comments