Skip to content

Latest commit

ย 

History

History
248 lines (205 loc) ยท 8.98 KB

2022-04-17-batch-http-page-item-reader.md

File metadata and controls

248 lines (205 loc) ยท 8.98 KB

Http Page Item Reader

Spring Batch๋กœ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์ž‘์„ฑํ•˜๋Š” ๊ฒฝ์šฐ ๋‚ด๋ถ€ ๋ฐ์ดํ„ฐ๊ฐ€ ์•„๋‹ˆ๋ผ ์™ธ๋ถ€์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€์„œ ๊ฐ€๊ณตํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์€ ์–ด๋–ป๊ฒŒ ๊ตฌ์ถ•ํ•ด์•ผ ํ• ์ง€ ๊ฒฐ์ •ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ง์ ‘ ์—ฐ๊ฒฐํ•ด์„œ ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฟผ๋ฆฌ ํ•˜์—ฌ ๊ฐ€์ ธ์˜ฌ ๊ฒƒ์ธ์ง€, ์•„๋‹ˆ๋ฉด csv ๋“ฑ ํŒŒ์ผ์„ ์ฃผ๊ณ ๋ฐ›๋Š” ๋ฐฉ์‹, HTTP API๋ฅผ ํ†ตํ•ด์„œ ๊ฐ€์ ธ์˜ค๋Š” ๋ฐฉ์‹ ๋“ฑ๋“ฑ ์—ฌ๋Ÿฌ ๊ฐ€์ง€ ๋ฐฉ์‹์ด ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ๋ฐฉ์‹, ํŒŒ์ผ๋ฐฉ์‹์€ ์Šคํ”„๋ง ๋ฐฐ์น˜์—์„œ ๊ธฐ๋ณธ ์ œ๊ณตํ•ด ์ฃผ๊ณ  ์žˆ์ง€๋งŒ HTTP Paging ๊ธฐ๋ฐ˜ Reader๋Š” ์ œ๊ณตํ•ด ์ฃผ๊ณ  ์žˆ์ง€ ์•Š์•„ ํ•ด๋‹น ๊ธฐ๋Šฅ์„ Item Reader๋ฅผ ์ง์ ‘ ๊ตฌํ˜„ํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ๋ถ€์กฑํ•œ ๋ถ€๋ถ„์€ ํ”„๋กœ์ ํŠธ์— ๋งž๊ฒŒ ์ถ”๊ฐ€ํ•˜์…”๋„ ์ข‹์„ ๊ฑฐ ๊ฐ™์Šต๋‹ˆ๋‹ค.

Spring Batch ๊ธฐ๋ณธ ์†Œ๊ฐœ

  • ์ฝ๊ธฐ(read) : ๋ฐ์ดํ„ฐ ์ €์žฅ์†Œ(์ผ๋ฐ˜์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค)์—์„œ ํŠน์ • ๋ฐ์ดํ„ฐ ๋ ˆ์ฝ”๋“œ๋ฅผ ์ฝ์Šต๋‹ˆ๋‹ค.
  • ์ฒ˜๋ฆฌ(processing) : ์›ํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ๋ฐ์ดํ„ฐ ๊ฐ€๊ณต/์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
  • ์“ฐ๊ธฐ(write) : ์ˆ˜์ •๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์ €์žฅ์†Œ์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

  • Reader์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ํ•˜๋‚˜ ์ฝ์–ด ์˜ต๋‹ˆ๋‹ค.
  • ์ฝ์–ด์˜จ ๋ฐ์ดํ„ฐ๋ฅผ Processor์—์„œ ๊ฐ€๊ณตํ•ฉ๋‹ˆ๋‹ค.
  • ๊ฐ€๊ณต๋œ ๋ฐ์ดํ„ฐ๋“ค์„ ๋ณ„๋„์˜ ๊ณต๊ฐ„์— ๋ชจ์€๋’ค, Chunk ๋‹จ์œ„๋งŒํผ ์Œ“์ด๊ฒŒ ๋˜๋ฉด Writer์— ์ „๋‹ฌํ•˜๊ณ  Writer๋Š” ์ผ๊ด„ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

Reader์™€ Processor์—์„œ๋Š” 1๊ฑด์”ฉ ๋‹ค๋ค„์ง€๊ณ , Writer์—์„  Chunk ๋‹จ์œ„๋กœ ์ฒ˜๋ฆฌ๋œ๋‹ค๋Š” ๊ฒƒ์ด ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค.

Chunk ์ง€ํ–ฅ ์ฒ˜๋ฆฌ๋ฅผ Java ์ฝ”๋“œ๋กœ ํ‘œํ˜„ํ•˜๋ฉด ์•„๋ž˜์ฒ˜๋Ÿผ ๋  ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

fun Chunk_์ฒ˜๋ฆฌ_๋ฐฉ๋ฒ•(chunkSize: Int, totalSize: Int) {
    var i = 0
    while (i < totalSize) {
        val items: MutableList<*> = ArrayList<Any?>()
        for (j in 0 until chunkSize) {
            val item: Any = itemReader.read()
            val processedItem: Any = itemProcessor.process(item)
            items.add(processedItem)
        }
        itemWriter.write(items)
        i = i + chunkSize
    }
}

์ฆ‰ chunkSize ๋ณ„๋กœ ๋ฌถ๋Š” ๋‹ค๋Š” ๊ฒƒ์€ total_size์—์„œ chunk_size ๋งŒํผ ์ฝ์–ด ์ž์žฅํ•œ๋‹ค๋Š” ์˜๋ฏธ์ž…๋‹ˆ๋‹ค.

HttpPageItemReader

HTTP Response

{
  "content": [
    {
      "bookId": 1,
      "bookStatus": "OPEN",
      "userId": 1,
      "id": 1
    }
    ...
  ],
  ...
  "number": 0,
  "size": 2,
  "numberOfElements": 2,
  "empty": false
}

HTTP Response๋Š” ์œ„์™€ ๊ฐ™์ด ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

Page size URL Content
0 10 http://localhost:8080/api/members?page=0&size=10 10
1 10 http://localhost:8080/api/members?page=1&size=10 10
2 10 http://localhost:8080/api/members?page=2&size=10 3
3 10 http://localhost:8080/api/members?page=2&size=10 0

์‹ค์ œ ๋ฐ์ดํ„ฐ๋Š” rows 23๊ฐœ๊ฐ€ ์ €์žฅ๋˜์–ด ์žˆ๋‹ค๋ฉด size๋ฅผ 10์„ ๊ธฐ์ค€์œผ๋กœ 2ํŽ˜์ด์ง€ ๊นŒ์ง€ ์ฝ์œผ๋ฉด ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค ์ฝ๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. 2ํŽ˜์ด์ง€์—์„œ๋Š” ๋‚จ์€ ๋ฐ์ดํ„ฐ rows 2๊ฐœ๊ฐ€ ์‘๋‹ต๋˜๋ฉฐ 3ํŽ˜์ด์ง€๋ฅผ ์กฐํšŒํ•˜๋ฉด ๋นˆ ์‘๋‹ต ํŽ˜์ด์ง€๊ฐ€ ๋„˜์–ด์˜ค๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ์ฆ‰ HttpPageItemReader๋Š” content๊ฐ€ ๋นˆ ๋ฐฐ์—ด์ด ๋‚˜์˜ฌ ๋•Œ๊นŒ์ง€ page๋ฅผ 1์‹ ์ฆ๊ฐ€ ์‹œํ‚ค๋ฉฐ ๋‹ค์Œ ํŽ˜์ด์ง€๋ฅผ ๊ณ„์† ์ฝ์–ด ๋‚˜๊ฐ€๋Š” ํ˜•ํƒœ๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค.

Code

HttpPageItemReader

open class HttpPageItemReader<T : Any>(
    name: String = "HttpPageItemReader",
    private val url: String,
    private val size: Int,
    private var page: Int,
    private val parameters: List<Pair<String, Any>>? = null,
    private val contentClass: KClass<T>,
    private val objectMapper: ObjectMapper = ObjectMapper().registerKotlinModule()
        .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        .apply { this.propertyNamingStrategy = PropertyNamingStrategies.SNAKE_CASE }
) : AbstractItemCountingItemStreamItemReader<T>() {
    private val log by logger()
    private var contents = mutableListOf<T>()

    init {
        super.setName(name)
    }

    // (2)
    override fun doOpen() { 
        log.info("HttpPageItemReader doOpen page: $page, size: $size")
    }

    // (2)
    override fun doClose() {
        log.info("HttpPageItemReader doClose page: $page, size: $size")
    }

    // (3)
    override fun doRead(): T? {
        if (this.contents.isEmpty()) {
            this.contents = readRow()
        }

        return when {
            this.contents.isEmpty() -> null
            else -> this.contents.removeAt(this.contents.size - 1)
        }
    }

    // (4)
    private fun readRow() =
        url
            .httpGet(generateQueryParameter())
            .responseString()
            .run {
                when {
                    second.isSuccessful -> {
                        page++
                        serializeResponseBody(responseBody = third.get())
                    }
                    else -> throw IllegalArgumentException("...") // 2xx ์‘๋‹ต์„ ๋ฐ›์ง€ ๋ชปํ•œ ๊ฒฝ์šฐ๋Š” ๊ฐ ์ƒํ™ฉ์— ๋งž๊ฒŒ ๊ตฌํ˜„
                }
            }


    // (5)
    private fun generateQueryParameter() = when (parameters) {
        null -> {
            listOf(
                "page" to page,
                "size" to size,
            )
        }
        else -> {
            mutableListOf(
                "page" to page,
                "size" to size,
            )
                .plus(parameters)
        }
    }

    // (6)
    private fun serializeResponseBody(responseBody: String): MutableList<T> {
        val rootNode = objectMapper.readTree(responseBody)
        val contentNode = rootNode.path("content")

        require(rootNode.isEmpty.not()) { "Response Body ๊ฐ’์ด ๋น„์–ด ์žˆ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค." }
        require(contentNode.isArray) { "Response content ํ•„๋“œ๋Š” Array ํƒ€์ž… ์ด์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค." }

        // (7)
        return objectMapper.convertValue<List<T>>(
            contentNode,
            objectMapper.typeFactory.constructCollectionType(List::class.java, contentClass.java)
        ).toMutableList()
    }
}
  • (1), (2) ์ปค๋„ฅ์…˜์„ ๋งบ๊ฑฐ๋‚˜ ์ž์›์„ ํ• ๋‹นํ•˜๊ณ  ํ•ด์ œํ•˜๋Š” ์ผ€์ด์Šค๊ฐ€ ์—†๊ธฐ ๋•Œ๋ฌธ์— ๋‹จ์ˆœํžˆ ๋กœ๊ทธ๋งŒ ์ž‘์„ฑ
  • (3) 1๊ฑด์‹ ์ฝ์–ด์„œ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. ํ•ด๋‹น 1๊ฑด์„ ์ฝ์–ด ์ฒ˜๋ฆฌํ•œ ๋ถ€๋ถ„์€ item processor๋กœ ๋„˜์–ด๊ฐˆ ์ˆ˜ ์žˆ๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค.
  • (4) ์‹ค์ œ๋กœ HTTP ํ†ต์‹ ์„ ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
  • (5) page, size ์™ธ ์ฟผ๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
  • (6) HTTP Response Body๋ฅผ ๊ฐ์ฒด๋กœ ์‹œ๋ฆฌ์–ผ๋ผ์ด์ฆˆ ์ž‘์—…์„ ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ์ด๋•Œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋ฅผ ์ง„ํ–‰ํ•˜๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค.
  • (7) content ๋ฆฌ์ŠคํŠธ ์‹œ๋ฆฌ์–ผ๋ผ์ด์ฆˆ ์ง„ํ–‰
{
  "content": [
    {
      "bookId": 1,
      "bookStatus": "OPEN",
      "userId": 1,
      "id": 1
    }
  ],
  ...
  "number": 0,
  "size": 2,
  "numberOfElements": 2,
  "empty": false
}

content์— ํ•ด๋‹นํ•˜๋Š” ๋‚ด์šฉ๋“ค๋งŒ ์‚ฌ์šฉํ•˜๊ธฐ ๋•Œ๋ฌธ์— content ๋…ธ๋“œ๋ฅผ ์ฐพ์•„ ํ•ด๋‹น ์ •๋ณด๋งŒ ์‹œ๋ฆฌ์–ผ ๋ผ์ด์ฆˆ๋ฅผ ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค. HTTP Paging API์— ๋Œ€ํ•œ ์‘๋‹ต ํ˜•ํƒœ๋ฅผ ํ†ต์ผํ™”ํ•˜์—ฌ ํŠน์ • ์‘๋‹ต์— ๋Œ€ํ•ด์„œ๋งŒ ์ง€์› ๊ฐ€๋Šฅํ•˜๊ฒŒ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ ์ฝ”๋“œ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์œ ์—ฐํ•˜๊ฒŒ ์‚ฌ์šฉ ํ•˜๊ธฐ๋ฅผ ์›ํ•˜์‹œ๋ฉด ํ•ด๋‹น ๋ถ€๋ถ„์„ ์™ธ๋ถ€์—๋Š” ๋ณ€๊ฒฝ์ด ๊ฐ€๋Šฅํ•˜๊ฒŒ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›๋Š” ๋ฐฉ์‹์œผ๋กœ ์ง„ํ–‰ํ•ด๋„ ๋ฌด๋ฐฉํ•ฉ๋‹ˆ๋‹ค. ๋‹ค๋งŒ ํ†ต์ผ๋œ ์‘๋‹ต ํฌ๋งท์„ ๊ฐ–๋Š” ๊ฒƒ์ด ๋” ๋ฐ”๋žŒ์งํ•˜๋‹ค๊ณ  ์ƒ๊ฐํ•ฉ๋‹ˆ๋‹ค.

Batch Code

@Configuration
class HttpPageReaderJobConfiguration(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {
    ...
    @Bean
    @StepScope
    fun httpPageReaderReader(
        entityManagerFactory: EntityManagerFactory
    ) = HttpPageItemReader(
        url = "http://localhost:8080/api/members", // API ์ฃผ์†Œ 
        size = 10, // ์‘๋‹ต๋ฐ›์„ content size๋กœ ๋Œ€๋ถ€๋ถ„ chunk size์™€ ๋™์ผํ•˜๊ฒŒ ๊ตฌ์„ฑ
        page = 0, // page start ๊ฐ’์œผ๋กœ ๋Œ€๋ถ€๋ถ„ 0 ๋ถ€ํ„ฐ ์‹œ์ž‘
        parameters = listOf(
            "age" to 10,
            "email" to "1232@asd.com"
        ), // ์ฟผ๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ, page, size ์™ธ์— ๊ฐ’์„ ์‚ฌ์šฉ
        contentClass = BookReservation::class // ์‹œ๋ฆฌ์–ผ๋ผ์ด์ฆˆ ๋Œ€์ƒ ํด๋ž˜์Šค
    )

    @Bean
    @StepScope
    fun httpPageReaderWriter(
    ): ItemWriter<BookReservation> = ItemWriter { contents ->
        for (content in contents) {
            println(content)
        }
    }
}

data class BookReservation(
    val bookId: Long,
    val bookStatus:String,
    val userId: Long
)

Local API๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๋กœ๊ทธ๋ฅผ ์ฐ๋Š” ๊ฐ„๋‹จํ•œ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ž…๋‹ˆ๋‹ค.

1~20 ๊ฐœ์˜ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒํ•˜๊ณ  ๋กœ๊ทธ๋ฅผ ์ฐ๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ฐธ๊ณ