Skip to content

Commit

Permalink
Merge pull request microsoft#737 from microsoft/feature/add-scheduled…
Browse files Browse the repository at this point in the history
…enqueuemessage-in-servicebus-binder

enable scheduled enqueue message in servicebus binder
  • Loading branch information
yiliuTo authored Jul 16, 2020
2 parents 37221c4 + 4a4eab8 commit a8e9b5f
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class AzureHeaders {

public static final String NAME = PREFIX + "name";

public static final String SCHEDULED_ENQUEUE_MESSAGE = "x-delay";

/**
* The {@value CHECKPOINTER} header for checkpoint the specific message.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.microsoft.azure.spring.integration.servicebus.converter;

import com.microsoft.azure.management.Azure;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.MessageBody;
Expand All @@ -18,6 +19,8 @@
import org.springframework.util.MimeType;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -82,6 +85,11 @@ protected void setCustomHeaders(MessageHeaders headers, IMessage serviceBusMessa
serviceBusMessage.setReplyTo(headers.get(MessageHeaders.REPLY_CHANNEL, String.class));
}

if (headers.containsKey(AzureHeaders.SCHEDULED_ENQUEUE_MESSAGE)) {
serviceBusMessage.setScheduledEnqueueTimeUtc(Instant.now().plus(Duration.ofMillis(
headers.get(AzureHeaders.SCHEDULED_ENQUEUE_MESSAGE, Integer.class))));
}

headers.forEach((key, value) -> serviceBusMessage.getProperties().put(key, value.toString()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import com.microsoft.azure.spring.integration.test.support.AzureMessageConverterTest;
import org.junit.Test;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.integration.support.MessageBuilder;

import java.util.HashMap;
import java.util.List;

import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -98,4 +101,14 @@ public void shouldConvertIMessageSequenceIntoAMessage() throws JsonProcessingExc
convertedPayload.getPayload(),
new ObjectMapper().writeValueAsBytes(internalSequence));
}

@Test
public void shouldConvertSpringMessageHeaderIntoIMessage() {
org.springframework.messaging.Message<String> springMessage =
MessageBuilder.withPayload(payload).setHeader("x-delay", 5000).build();
IMessage servicebusMessage = getConverter().fromMessage(springMessage, IMessage.class);
assertNotNull(servicebusMessage);
assertNotNull(servicebusMessage.getScheduledEnqueueTimeUtc());

}
}

0 comments on commit a8e9b5f

Please sign in to comment.