Spring Batch๋ก ์ ํ๋ฆฌ์ผ์ด์ ์ ์์ฑํ๋ ๊ฒฝ์ฐ ๋ด๋ถ ๋ฐ์ดํฐ๊ฐ ์๋๋ผ ์ธ๋ถ์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์์ ๊ฐ๊ณตํด์ผ ํ๋ ๊ฒฝ์ฐ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์ด๋ป๊ฒ ๊ตฌ์ถํด์ผ ํ ์ง ๊ฒฐ์ ํด์ผ ํฉ๋๋ค. ์๋ฅผ ๋ค์ด ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ง์ ์ฐ๊ฒฐํด์ ํ์ํ ๋ฐ์ดํฐ๋ฅผ ์ฟผ๋ฆฌ ํ์ฌ ๊ฐ์ ธ์ฌ ๊ฒ์ธ์ง, ์๋๋ฉด csv ๋ฑ ํ์ผ์ ์ฃผ๊ณ ๋ฐ๋ ๋ฐฉ์, HTTP API๋ฅผ ํตํด์ ๊ฐ์ ธ์ค๋ ๋ฐฉ์ ๋ฑ๋ฑ ์ฌ๋ฌ ๊ฐ์ง ๋ฐฉ์์ด ์์ต๋๋ค. ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ๋ฐฉ์, ํ์ผ๋ฐฉ์์ ์คํ๋ง ๋ฐฐ์น์์ ๊ธฐ๋ณธ ์ ๊ณตํด ์ฃผ๊ณ ์์ง๋ง HTTP Paging ๊ธฐ๋ฐ Reader๋ ์ ๊ณตํด ์ฃผ๊ณ ์์ง ์์ ํด๋น ๊ธฐ๋ฅ์ Item Reader๋ฅผ ์ง์ ๊ตฌํํด ๋ณด๊ฒ ์ต๋๋ค. ๋ถ์กฑํ ๋ถ๋ถ์ ํ๋ก์ ํธ์ ๋ง๊ฒ ์ถ๊ฐํ์ ๋ ์ข์ ๊ฑฐ ๊ฐ์ต๋๋ค.
- ์ฝ๊ธฐ(read) : ๋ฐ์ดํฐ ์ ์ฅ์(์ผ๋ฐ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค)์์ ํน์ ๋ฐ์ดํฐ ๋ ์ฝ๋๋ฅผ ์ฝ์ต๋๋ค.
- ์ฒ๋ฆฌ(processing) : ์ํ๋ ๋ฐฉ์์ผ๋ก ๋ฐ์ดํฐ ๊ฐ๊ณต/์ฒ๋ฆฌํฉ๋๋ค.
- ์ฐ๊ธฐ(write) : ์์ ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ ์ฅ์์ ์ ์ฅํฉ๋๋ค.
- Reader์์ ๋ฐ์ดํฐ๋ฅผ ํ๋ ์ฝ์ด ์ต๋๋ค.
- ์ฝ์ด์จ ๋ฐ์ดํฐ๋ฅผ Processor์์ ๊ฐ๊ณตํฉ๋๋ค.
- ๊ฐ๊ณต๋ ๋ฐ์ดํฐ๋ค์ ๋ณ๋์ ๊ณต๊ฐ์ ๋ชจ์๋ค, Chunk ๋จ์๋งํผ ์์ด๊ฒ ๋๋ฉด Writer์ ์ ๋ฌํ๊ณ Writer๋ ์ผ๊ด ์ ์ฅํฉ๋๋ค.
Reader์ Processor์์๋ 1๊ฑด์ฉ ๋ค๋ค์ง๊ณ , Writer์์ Chunk ๋จ์๋ก ์ฒ๋ฆฌ๋๋ค๋ ๊ฒ์ด ์ค์ํฉ๋๋ค.
Chunk ์งํฅ ์ฒ๋ฆฌ๋ฅผ Java ์ฝ๋๋ก ํํํ๋ฉด ์๋์ฒ๋ผ ๋ ๊ฒ ๊ฐ์ต๋๋ค.
fun Chunk_์ฒ๋ฆฌ_๋ฐฉ๋ฒ(chunkSize: Int, totalSize: Int) {
var i = 0
while (i < totalSize) {
val items: MutableList<*> = ArrayList<Any?>()
for (j in 0 until chunkSize) {
val item: Any = itemReader.read()
val processedItem: Any = itemProcessor.process(item)
items.add(processedItem)
}
itemWriter.write(items)
i = i + chunkSize
}
}
์ฆ chunkSize ๋ณ๋ก ๋ฌถ๋ ๋ค๋ ๊ฒ์ total_size์์ chunk_size ๋งํผ ์ฝ์ด ์์ฅํ๋ค๋ ์๋ฏธ์ ๋๋ค.
{
"content": [
{
"bookId": 1,
"bookStatus": "OPEN",
"userId": 1,
"id": 1
}
...
],
...
"number": 0,
"size": 2,
"numberOfElements": 2,
"empty": false
}
HTTP Response๋ ์์ ๊ฐ์ด ๊ตฌ์ฑ๋์ด ์๋ค๊ณ ๊ฐ์ ํ๊ฒ ์ต๋๋ค.
Page | size | URL | Content |
---|---|---|---|
0 | 10 | http://localhost:8080/api/members?page=0&size=10 | 10 |
1 | 10 | http://localhost:8080/api/members?page=1&size=10 | 10 |
2 | 10 | http://localhost:8080/api/members?page=2&size=10 | 3 |
3 | 10 | http://localhost:8080/api/members?page=2&size=10 | 0 |
์ค์ ๋ฐ์ดํฐ๋ rows 23๊ฐ๊ฐ ์ ์ฅ๋์ด ์๋ค๋ฉด size๋ฅผ 10์ ๊ธฐ์ค์ผ๋ก 2ํ์ด์ง ๊น์ง ์ฝ์ผ๋ฉด ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค ์ฝ๊ฒ ๋ฉ๋๋ค. 2ํ์ด์ง์์๋ ๋จ์ ๋ฐ์ดํฐ rows 2๊ฐ๊ฐ ์๋ต๋๋ฉฐ 3ํ์ด์ง๋ฅผ ์กฐํํ๋ฉด ๋น ์๋ต ํ์ด์ง๊ฐ ๋์ด์ค๊ฒ ๋ฉ๋๋ค. ์ฆ HttpPageItemReader๋ content
๊ฐ ๋น ๋ฐฐ์ด์ด ๋์ฌ ๋๊น์ง page๋ฅผ 1์ ์ฆ๊ฐ ์ํค๋ฉฐ ๋ค์ ํ์ด์ง๋ฅผ ๊ณ์ ์ฝ์ด ๋๊ฐ๋ ํํ๋ก ๊ตฌ์ฑ๋ฉ๋๋ค.
open class HttpPageItemReader<T : Any>(
name: String = "HttpPageItemReader",
private val url: String,
private val size: Int,
private var page: Int,
private val parameters: List<Pair<String, Any>>? = null,
private val contentClass: KClass<T>,
private val objectMapper: ObjectMapper = ObjectMapper().registerKotlinModule()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.apply { this.propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE }
) : AbstractItemCountingItemStreamItemReader<T>() {
private val log by logger()
private var contents = mutableListOf<T>()
init {
super.setName(name)
}
// (2)
override fun doOpen() {
log.info("HttpPageItemReader doOpen page: $page, size: $size")
}
// (2)
override fun doClose() {
log.info("HttpPageItemReader doClose page: $page, size: $size")
}
// (3)
override fun doRead(): T? {
if (this.contents.isEmpty()) {
this.contents = readRow()
}
return when {
this.contents.isEmpty() -> null
else -> this.contents.removeAt(this.contents.size - 1)
}
}
// (4)
private fun readRow() =
url
.httpGet(generateQueryParameter())
.responseString()
.run {
when {
second.isSuccessful -> {
page++
serializeResponseBody(responseBody = third.get())
}
else -> throw IllegalArgumentException("...") // 2xx ์๋ต์ ๋ฐ์ง ๋ชปํ ๊ฒฝ์ฐ๋ ๊ฐ ์ํฉ์ ๋ง๊ฒ ๊ตฌํ
}
}
// (5)
private fun generateQueryParameter() = when (parameters) {
null -> {
listOf(
"page" to page,
"size" to size,
)
}
else -> {
mutableListOf(
"page" to page,
"size" to size,
)
.plus(parameters)
}
}
// (6)
private fun serializeResponseBody(responseBody: String): MutableList<T> {
val rootNode = objectMapper.readTree(responseBody)
val contentNode = rootNode.path("content")
require(rootNode.isEmpty.not()) { "Response Body ๊ฐ์ด ๋น์ด ์์ ์ ์์ต๋๋ค." }
require(contentNode.isArray) { "Response content ํ๋๋ Array ํ์
์ด์ด์ผ ํฉ๋๋ค." }
// (7)
return objectMapper.convertValue<List<T>>(
contentNode,
objectMapper.typeFactory.constructCollectionType(List::class.java, contentClass.java)
).toMutableList()
}
}
- (1), (2) ์ปค๋ฅ์ ์ ๋งบ๊ฑฐ๋ ์์์ ํ ๋นํ๊ณ ํด์ ํ๋ ์ผ์ด์ค๊ฐ ์๊ธฐ ๋๋ฌธ์ ๋จ์ํ ๋ก๊ทธ๋ง ์์ฑ
- (3) 1๊ฑด์ ์ฝ์ด์ ์ฒ๋ฆฌํฉ๋๋ค. ํด๋น 1๊ฑด์ ์ฝ์ด ์ฒ๋ฆฌํ ๋ถ๋ถ์ item processor๋ก ๋์ด๊ฐ ์ ์๊ฒ ํฉ๋๋ค.
- (4) ์ค์ ๋ก HTTP ํต์ ์ ์งํํฉ๋๋ค.
- (5) page, size ์ธ ์ฟผ๋ฆฌ ํ๋ผ๋ฏธํฐ๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.
- (6) HTTP Response Body๋ฅผ ๊ฐ์ฒด๋ก ์๋ฆฌ์ผ๋ผ์ด์ฆ ์์ ์ ์งํํฉ๋๋ค. ์ด๋ ์์ธ ์ฒ๋ฆฌ๋ฅผ ์งํํ๊ฒ ํฉ๋๋ค.
- (7) content ๋ฆฌ์คํธ ์๋ฆฌ์ผ๋ผ์ด์ฆ ์งํ
{
"content": [
{
"bookId": 1,
"bookStatus": "OPEN",
"userId": 1,
"id": 1
}
],
...
"number": 0,
"size": 2,
"numberOfElements": 2,
"empty": false
}
content์ ํด๋นํ๋ ๋ด์ฉ๋ค๋ง ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ content ๋ ธ๋๋ฅผ ์ฐพ์ ํด๋น ์ ๋ณด๋ง ์๋ฆฌ์ผ ๋ผ์ด์ฆ๋ฅผ ์งํํฉ๋๋ค. HTTP Paging API์ ๋ํ ์๋ต ํํ๋ฅผ ํต์ผํํ์ฌ ํน์ ์๋ต์ ๋ํด์๋ง ์ง์ ๊ฐ๋ฅํ๊ฒ ์ ํจ์ฑ ๊ฒ์ฌ ์ฝ๋๊ฐ ์์ต๋๋ค. ์ ์ฐํ๊ฒ ์ฌ์ฉ ํ๊ธฐ๋ฅผ ์ํ์๋ฉด ํด๋น ๋ถ๋ถ์ ์ธ๋ถ์๋ ๋ณ๊ฒฝ์ด ๊ฐ๋ฅํ๊ฒ ํ๋ผ๋ฏธํฐ๋ก ๋ฐ๋ ๋ฐฉ์์ผ๋ก ์งํํด๋ ๋ฌด๋ฐฉํฉ๋๋ค. ๋ค๋ง ํต์ผ๋ ์๋ต ํฌ๋งท์ ๊ฐ๋ ๊ฒ์ด ๋ ๋ฐ๋์งํ๋ค๊ณ ์๊ฐํฉ๋๋ค.
@Configuration
class HttpPageReaderJobConfiguration(
private val jobBuilderFactory: JobBuilderFactory,
private val stepBuilderFactory: StepBuilderFactory
) {
...
@Bean
@StepScope
fun httpPageReaderReader(
entityManagerFactory: EntityManagerFactory
) = HttpPageItemReader(
url = "http://localhost:8080/api/members", // API ์ฃผ์
size = 10, // ์๋ต๋ฐ์ content size๋ก ๋๋ถ๋ถ chunk size์ ๋์ผํ๊ฒ ๊ตฌ์ฑ
page = 0, // page start ๊ฐ์ผ๋ก ๋๋ถ๋ถ 0 ๋ถํฐ ์์
parameters = listOf(
"age" to 10,
"email" to "1232@asd.com"
), // ์ฟผ๋ฆฌ ํ๋ผ๋ฏธํฐ, page, size ์ธ์ ๊ฐ์ ์ฌ์ฉ
contentClass = BookReservation::class // ์๋ฆฌ์ผ๋ผ์ด์ฆ ๋์ ํด๋์ค
)
@Bean
@StepScope
fun httpPageReaderWriter(
): ItemWriter<BookReservation> = ItemWriter { contents ->
for (content in contents) {
println(content)
}
}
}
data class BookReservation(
val bookId: Long,
val bookStatus:String,
val userId: Long
)
Local API๋ฅผ ํธ์ถํ์ฌ ๋ก๊ทธ๋ฅผ ์ฐ๋ ๊ฐ๋จํ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋๋ค.
1~20 ๊ฐ์ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ์กฐํํ๊ณ ๋ก๊ทธ๋ฅผ ์ฐ๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค.