@@ -28,18 +28,16 @@ public SendResult syncSend(Integer id) throws ExecutionException, InterruptedExc
28
28
return kafkaTemplate .send (Demo07Message .TOPIC , message ).get ();
29
29
}
30
30
31
- public SendResult < Object , Object > syncSendInTransaction (Integer id , Runnable runner ) throws ExecutionException , InterruptedException {
32
- return kafkaTemplate .executeInTransaction (new KafkaOperations .OperationsCallback <Object , Object , SendResult < Object , Object > >() {
31
+ public String syncSendInTransaction (Integer id , Runnable runner ) throws ExecutionException , InterruptedException {
32
+ return kafkaTemplate .executeInTransaction (new KafkaOperations .OperationsCallback <Object , Object , String >() {
33
33
34
34
@ Override
35
- public SendResult < Object , Object > doInOperations (KafkaOperations <Object , Object > kafkaOperations ) {
35
+ public String doInOperations (KafkaOperations <Object , Object > kafkaOperations ) {
36
36
// 创建 Demo07Message 消息
37
- SendResult <Object , Object > sendResult ;
38
37
Demo07Message message = new Demo07Message ();
39
38
message .setId (id );
40
39
try {
41
- // sendResult = kafkaOperations.send(Demo07Message.TOPIC, message).get();
42
- sendResult = kafkaOperations .send (Demo07Message .TOPIC , message ).get ();
40
+ SendResult <Object , Object > sendResult = kafkaOperations .send (Demo07Message .TOPIC , message ).get ();
43
41
logger .info ("[doInOperations][发送编号:[{}] 发送结果:[{}]]" , id , sendResult );
44
42
} catch (Exception e ) {
45
43
throw new RuntimeException (e );
@@ -48,8 +46,8 @@ public SendResult<Object, Object> doInOperations(KafkaOperations<Object, Object>
48
46
// 本地业务逻辑... biubiubiu
49
47
runner .run ();
50
48
51
- // 返回发送结果
52
- return sendResult ;
49
+ // 返回结果
50
+ return "success" ;
53
51
}
54
52
55
53
});
0 commit comments