Skip to content

Commit

Permalink
Add non-Lambda DSL Cafe sample
Browse files Browse the repository at this point in the history
* Polishing for `stomp-chat` sample
* Some dependencies upgrade
  • Loading branch information
artembilan authored and garyrussell committed Jan 5, 2015
1 parent fb6d47f commit 5ccedd3
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
header-name="simpMessageType"
resolution-required="false"
default-output-channel="nullChannel">
<int:mapping value="#{T(org.springframework.messaging.simp.SimpMessageType).CONNECT.name()}"
<int:mapping value="#{T(org.springframework.messaging.simp.SimpMessageType).CONNECT_ACK.name()}"
channel="connectAck"/>
<int:mapping value="#{T(org.springframework.messaging.simp.SimpMessageType).SUBSCRIBE.name()}"
channel="subscribe"/>
Expand All @@ -61,7 +61,8 @@
</int:header-value-router>

<int:outbound-channel-adapter id="connectAck"
expression="@webSocketSessionStore.put(headers.simpSessionId, headers.nativeHeaders.login)"/>
expression="@webSocketSessionStore.put(headers.simpSessionId,
headers.simpConnectMessage.headers.nativeHeaders.login)"/>

<int:publish-subscribe-channel id="subscribe"/>

Expand Down
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ subprojects { subproject ->
sourceCompatibility = 1.6

ext {
activeMqVersion = '5.9.0'
activeMqVersion = '5.10.0'
apacheSshdVersion = '0.10.1'
aspectjVersion = '1.8.0'
commonsDigesterVersion = '2.0'
Expand Down Expand Up @@ -195,7 +195,7 @@ subprojects { subproject ->
slf4jVersion = '1.7.6'
springIntegrationVersion = '4.1.1.RELEASE'
springIntegrationDslVersion = '1.0.1.RELEASE'
springVersion = '4.1.3.RELEASE'
springVersion = '4.1.4.RELEASE'
springSecurityVersion = '3.2.4.RELEASE'
springWebFlowVersion = '2.3.3.RELEASE'
tilesJspVersion = '2.2.1'
Expand Down Expand Up @@ -558,7 +558,7 @@ project('cafe-dsl') {
testCompile 'org.springframework.boot:spring-boot-starter-test'
}

mainClassName = 'org.springframework.integration.samples.dsl.cafe.Application'
mainClassName = 'org.springframework.integration.samples.dsl.cafe.lambda.Application'
}


Expand Down
9 changes: 5 additions & 4 deletions dsl/cafe-dsl/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Cafe Demo: Spring Integration Java DSL
#Cafe Demo: Spring Integration Java DSL

This sample demonstrates the classical Cafe Demo, but it is based on [Spring Integration Java DSL](https://github.com/spring-projects/spring-integration-extensions/wiki/Spring-Integration-Java-DSL-Reference)
and [Spring Boot](http://projects.spring.io/spring-boot).
Expand All @@ -8,10 +8,11 @@ See the `cafe` project **README.md** for more details about the domain and the C
## Run the Sample

* You need Java 8 to run this sample, because it is based on Lambdas.
* running the `org.springframework.integration.samples.dsl.cafe.Application` class from within STS (Right-click on
* running the `org.springframework.integration.samples.dsl.cafe.lambda.Application` class from within STS (Right-click on
Main class --> Run As --> Java Application)
* or from the command line:

$ gradlew :cafe-dsl:run

$ gradlew :cafe-dsl:run

There is the second similar sample - `org.springframework.integration.samples.dsl.cafe.nonlambda.Application`, which
demonstrates how Spring Integration Java DSL can be used from pre Java 8 environment.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
* limitations under the License.
*/

package org.springframework.integration.samples.dsl.cafe;
package org.springframework.integration.samples.dsl.cafe.lambda;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
Expand All @@ -45,12 +44,11 @@
* @author Artem Bilan
* @since 3.0
*/
@Configuration
@EnableAutoConfiguration
@SpringBootApplication
@IntegrationComponentScan
public class Application {

public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);

Cafe cafe = ctx.getBean(Cafe.class);
Expand All @@ -61,8 +59,8 @@ public static void main(String[] args) throws InterruptedException {
cafe.placeOrder(order);
}

Thread.sleep(60000);

System.out.println("Hit 'Enter' to terminate");
System.in.read();
ctx.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.samples.dsl.cafe.nonlambda;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.annotation.CorrelationStrategy;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.AggregatorSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.RouterSpec;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.Consumer;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.router.ExpressionEvaluatingRouter;
import org.springframework.integration.samples.cafe.Delivery;
import org.springframework.integration.samples.cafe.Drink;
import org.springframework.integration.samples.cafe.DrinkType;
import org.springframework.integration.samples.cafe.Order;
import org.springframework.integration.samples.cafe.OrderItem;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.stream.CharacterStreamWritingMessageHandler;
import org.springframework.integration.transformer.GenericTransformer;
import org.springframework.stereotype.Component;

import com.google.common.util.concurrent.Uninterruptibles;

/**
* @author Artem Bilan
* @since 3.0
*/
@SpringBootApplication
@IntegrationComponentScan
public class Application {

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx =
SpringApplication.run(Application.class, args);

Cafe cafe = ctx.getBean(Cafe.class);
for (int i = 1; i <= 100; i++) {
Order order = new Order(i);
order.addItem(DrinkType.LATTE, 2, false);
order.addItem(DrinkType.MOCHA, 3, true);
cafe.placeOrder(order);
}

System.out.println("Hit 'Enter' to terminate");
System.in.read();
ctx.close();
}

@MessagingGateway
public interface Cafe {

@Gateway(requestChannel = "orders.input")
void placeOrder(Order order);

}

private final AtomicInteger hotDrinkCounter = new AtomicInteger();

private final AtomicInteger coldDrinkCounter = new AtomicInteger();

@Autowired
private CafeAggregator cafeAggregator;

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}

@Bean
@SuppressWarnings("unchecked")
public IntegrationFlow orders() {
return IntegrationFlows.from("orders.input")
.split("payload.items", (Consumer) null)
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.route("payload.iced",
new Consumer<RouterSpec<ExpressionEvaluatingRouter>>() {

@Override
public void accept(RouterSpec<ExpressionEvaluatingRouter> spec) {
spec.channelMapping("true", "iced")
.channelMapping("false", "hot");
}

})
.get();
}

@Bean
public IntegrationFlow icedFlow() {
return IntegrationFlows.from(MessageChannels.queue("iced", 10))
.handle(new GenericHandler<OrderItem>() {

@Override
public Object handle(OrderItem payload, Map<String, Object> headers) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()
+ " prepared cold drink #" + coldDrinkCounter.incrementAndGet()
+ " for order #" + payload.getOrderNumber() + ": " + payload);
return payload;
}

})
.channel("output")
.get();
}

@Bean
public IntegrationFlow hotFlow() {
return IntegrationFlows.from(MessageChannels.queue("hot", 10))
.handle(new GenericHandler<OrderItem>() {

@Override
public Object handle(OrderItem payload, Map<String, Object> headers) {
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()
+ " prepared hot drink #" + hotDrinkCounter.incrementAndGet()
+ " for order #" + payload.getOrderNumber() + ": " + payload);
return payload;
}

})
.channel("output")
.get();
}

@Bean
public IntegrationFlow resultFlow() {
return IntegrationFlows.from("output")
.transform(new GenericTransformer<OrderItem, Drink>() {

@Override
public Drink transform(OrderItem orderItem) {
return new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots());
}

})
.aggregate(new Consumer<AggregatorSpec>() {

@Override
public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.processor(cafeAggregator, null);
}

}, null)
.handle(CharacterStreamWritingMessageHandler.stdout())
.get();
}


@Component
public static class CafeAggregator {

@Aggregator
public Delivery output(List<Drink> drinks) {
return new Delivery(drinks);
}

@CorrelationStrategy
public Integer correlation(Drink drink) {
return drink.getOrderNumber();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.runner.RunWith;

import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.integration.samples.dsl.cafe.lambda.Application;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
Expand Down

0 comments on commit 5ccedd3

Please sign in to comment.