Skip to content

Commit

Permalink
add springboot - kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
527515025 committed Apr 11, 2019
1 parent 79ce2c7 commit d8a5203
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 0 deletions.
63 changes: 63 additions & 0 deletions springboot-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>


<groupId>cn.abel</groupId>
<artifactId>springboot-kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>


<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>


<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
</dependencies>
</project>
15 changes: 15 additions & 0 deletions springboot-kafka/src/main/java/cn/abel/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cn.abel;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;

/**
* Created by yyb on 2018/12/12.
*/
@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
88 changes: 88 additions & 0 deletions springboot-kafka/src/main/java/cn/abel/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cn.abel.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* Created by yyb on 2018/12/12.
*/
@Configuration
@EnableKafka
public class KafkaConfig {

/**
* kafka地址
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 默认组
*/
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
/**
* 自动消费设定
*/
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
/**
* 一次批量处理的数据量
*/
@Value("${spring.kafka.consumer.max-poll-records}")
private int maxPollRecords;
/**
* 一次批量处理的时间间隔
*/
@Value("${spring.kafka.consumer.max-poll-interval-ms}")
private int maxPollIntervalMs;

@Value("${spring.kafka.listener.concurrency}")
private int cocurrency;

@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer commitInterval;

@Bean(name = "kafkaListenerContainerFactory")
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(cocurrency);
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//每一批数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package cn.abel.service.prehandle;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

/**
* Created by yyb on 2018/12/10.
*/

@Component
public class KafkaConsumerService {

@Autowired
private SplitService splitService;

@KafkaListener(topics = "${log.statistical.kafka.topic}", containerFactory = "kafkaListenerContainerFactory")
public void processMessage(List<ConsumerRecord<?, ?>> records) {

for (ConsumerRecord<?, ?> record : records) {
String message = (String) record.value();
splitService.saveAndSplitLog(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cn.abel.service.prehandle;


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
* Created by yyb on 2018/12/10.
*/
@Service
@Slf4j
public class SplitService {

private static final Logger logger = LoggerFactory.getLogger(SplitService.class);

public void saveAndSplitLog(String message) {
JSONObject jsonObject = JSONObject.parseObject(message);
// 从kafka 中获取到数据
String content = jsonObject.getString("message");
}

}
18 changes: 18 additions & 0 deletions springboot-kafka/src/main/resources/local/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
server.port=8090
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=springboot-kafka
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=2000
spring.kafka.listener.concurrency= 1
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.max-poll-interval-ms=4000
# topic
log.statistical.kafka.topic=nginx_log


logging.file=./logs/springboot-kafka.log




5 changes: 5 additions & 0 deletions springboot-kafka/src/main/resources/local/banner.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
########################################################
# #
# local #
# #
########################################################
35 changes: 35 additions & 0 deletions springboot-kafka/src/main/resources/local/logback-spring.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<contextName>middle_nginxlog_storage</contextName>

<!-- 控制台输入日志信息 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 当日日志归档文件 -->
<file>${LOG_FILE}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--过期日志转存的文件名格式 -->
<FileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}</FileNamePattern>
<!-- 日志保留天数 -->
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
</layout>
</appender>

<!-- com.alibaba.dubbo是dubbo服务的包,在info以下的级别会产生大量的启动日志,调成WARN减少日志输出 -->

<logger name="org.apache" level="INFO"/>
<root level="INFO">
<appender-ref ref="FILE"/>
<appender-ref ref="CONSOLE"/>
</root>

</configuration>

0 comments on commit d8a5203

Please sign in to comment.