오늘도 더 나은 코드를 작성하였습니까?

flow 중간 연산자(Take, Buffer, conflate) 본문

Coroutine/coroutineFlow

flow 중간 연산자(Take, Buffer, conflate)

hik14 2021. 6. 4. 17:46

Take

- flow에서 특정 개수만 받을경우 사용한다.

suspend fun main() {
    val alphabetFlow = ('a'..'z').asFlow().take(4)

    alphabetFlow.collect { alphabet ->
        println(alphabet)
    }
}

 

TakeWhile

 

- flow의 첫번째 보내는 값이 조건을 만족하면 내보낸다. 

즉, 조건에 맞는 첫번쨰 값을 내보내는것이 아니라,  첫번째 값이고 조건에 맞아야 한다. 

suspend fun main() {

    val intFlow = (1..10).asFlow().takeWhile{ num ->
        num.equals(1)
    }

    intFlow.collect { int ->
        println(int)
    }
}

 

Buffer( 생산자 소비자  한쪽/양쪽이 처리가 늦을 때)

flow는 코루틴에서 실행될때, 순차적으로 값을 내보내고 수집조차 순차적으로 실행된다.

하지만,  특정 상황에 따라 순차적 실행이 비효율적인 경우가 있다. 

 

이럴때는 중간에 버퍼를 생성하여 이전 플로우의 내보내는 값을 담은 후 최종연산을 실행할수도 있다. 

 

즉, 아래 코드와 같은 경우

 

첫번째 버퍼를 사용하지 않은 경우, 글자 1개를 내보내고 수집하는데 300+200 = 500을 6글자 반복하면 대략 3초가 걸린다.

 

두번째 버퍼를 사용하는 경우, 글자 1개를 내보는데 300이고 수집하면 500이지만 출력 200을 기다리는동안 생산자가 생산을 진행한다.(버퍼가 있기 때문이다) 글자 2번째를 내보낼때는 출력 200을 기다리는 동한 생산자가 생산을 진행해서 100만에 내보내고 출력 200해서 총 300소요된다. 그 다음도 동일하다.

 

그럼 총  500+(300*5)  2초가 걸린다. 

suspend fun main() {

    val nameFlow = flow {
        "kotlin".forEach {
            delay(300)
            emit(it)
        }
    }

    val result1 = measureTimeMillis {
        nameFlow.collect { char ->
            delay(200)
            println(char)
        }
    }

    println("not Buffer: $result1")

    val result2 = measureTimeMillis {
        nameFlow.buffer().collect { char ->
            delay(200)
            println(char)
        }
    }

    println("Buffer: $result2")
}

Conflate(생산자의 속도가 빠르고 소비자의 소비가 느릴때,  그리고  최신값이 중요할때)

buffer는 내보내는 값을 전부 기다려서 오래걸리는 작업을 1번에 처리함으로 이점을 가지고 있다.

하지만 모든 값이 필요 없거나  최신 값/마지막 값이 필요하다면,  보내는 데이터중 중간값에 대한 처리를 하지 않아도 된다.

 

그렇다면, 일부 중간값에 대한 처리를 생략하고 내보내진 값중 최신값만 처리한다. 

생산자 중간연산  소비자
value1  value1 value1 을받아서 처리시작 (오래걸림)
value2 value2 value1 처리중
value3 value2, value3, value1 처리중
value4 value2, value3, value4 value4 을받아서 처리시작 (오래걸림)

 

suspend fun main() {

    val intFlow = flow {
        (1..10).forEach {
            delay(100)
            emit(it)
        }
    }

    val result1 = measureTimeMillis {
        intFlow.collect { char ->
            delay(300)
            println(char)
        }
    }

    println("normal: $result1")

    val result2 = measureTimeMillis {
        intFlow.buffer().collect { int ->
            delay(300)
            println(int)
        }
    }

    println("buffer: $result2")

    val result3 = measureTimeMillis {
        intFlow.conflate().collect { int ->
            delay(300)
            println(int)
        }
    }

    println("conflate: $result3")
}

collectLatest(생산자의 속도가 빠르고 소비자의 소비가 느릴때,  그리고  마지막 중요할때)

- 느린 collector가 데이터를 처리하기 전에 새로운 값을 전달 받으면, 이전 collector의 처리를  취소하고 새로 전달받은 값을 처리한다.
- 즉 처리중에 새로운 값이 emit이 되는것보다, 데이터 1개의 처리속도가 느리다면 결국 마지막값만 정상 처리가 된다.

suspend fun main() {

    val intFlow = flow {
        (1..10).forEach {
            delay(100)
            emit(it)
        }
    }

    val result1 = measureTimeMillis {
        intFlow.collect { char ->
            delay(300)
            println(char)
        }
    }

    println("normal: $result1")

    val result2 = measureTimeMillis {
        intFlow.buffer().collect { int ->
            delay(300)
            println(int)
        }
    }

    println("buffer: $result2")

    val result3 = measureTimeMillis {
        intFlow.conflate().collect { int ->
            delay(300)
            println(int)
        }
    }

    println("conflate: $result3")

    val result4 = measureTimeMillis {
        intFlow.collectLatest { int ->
            delay(300)
            println(int)
        }
    }

    println("collectLatest: $result4")
}