Coroutine/coroutineFlow

flow 중간 연산자 (Zip, Combine)

hik14 2021. 6. 4. 16:03

combine

- 2개 이상의 flow를  통합하여 새로운 flow<newType>를 생성할 수도 있다. 

- 모든 플로우의 emit이 완료될때까지 작동되어진다. 

- 작동 과정에서 최신기준의 데이터를 내보낸다.

 

결합되는  Flow 모두  최소 하나의 데이터를 방출한 경우에 첫 번째 결합 데이터가 방출되고,

이후로는 둘중 어느 하나  Flow 라도 데이터를 방출하면 모든 Flow의 최신 데이터를 결합하여 방출해준다.

suspend fun main() {
    
    val charFlow = ('a'..'c').asFlow()
    val intFlow = flow<Int> {
        (1..5).forEach {
            delay(1000)
            emit(it)
        }
    }
    
    val stringFlow = charFlow.combine(intFlow) { int, char ->
        val result = "$int,$char"
        delay(1000)
        result
    }

    stringFlow.collect { string ->
        println(string)
    }
}

- 아래 코드를 보면, charFlow는 a, b, c가 먼저 내보내 지고 IntFlow는 1초의 딜레이를 줘서 천천히 값을 내보면,  charFlow 최신 emit 값인 'c' 와 이후 1초 단위로 내보내는 숫자의 최신 값들이 출력된다. 

 

- flow는 결합하는데 있어서 결합되는 flow의 개수가 무제한이다.

suspend fun main() {

    val charFlow = ('a'..'c').asFlow()

    val intFlow = flow<Int> {
        (1..5).forEach {
            delay(1000)
            emit(it)
        }
    }

    val kotlinFlow = flow<String>{
        val str = "kotlin".forEach {
            delay(1500)
            emit(it.toString())
        }
    }

    val stringFlow = combine(charFlow, intFlow, kotlinFlow) { int, char, string ->
        val str = "int: $int, char: $char, string: $string"
        str
    }

    stringFlow.collect { string ->
        println(string)
    }
}

   val stringFlow = combine((1..3).asFlow(), (4..6).asFlow(), (7..9).asFlow(), (10..12).asFlow(), (13..15).asFlow())
    { i, i2, i3, i4, i5 ->  
		// 인자가 5개 이하인 경우 개별적인 값이 들어온다. 
    }

    val stringFlow = combine((1..3).asFlow(), (4..6).asFlow(), (7..9).asFlow(), (10..12).asFlow(), (13..15).asFlow(), (16..18).asFlow())
    { arr ->
        // 인자가 6개 이상인 경우 Array형태로 값이 들어온다. 		
    }

combineTransform

- 여러개의 flow를 합치고 생성된 플로우의 새로운 타입을 또다른 타입으로 변환하여 내보낸다. 

 

*combineTransForm 은 combine + transFrom 함수를 동시에 적용한것과 같다.

suspend fun main() {

    val nameFlow = flow {
        ('a'..'d').forEach {
            delay(500)
            emit(it)
        }
    }

    val numFlow = flow {
        (1..10).forEach {
            delay(1000)
            emit(it)
        }
    }

    val newFlow =
        combine(nameFlow, numFlow) { name, num ->
            val str = "$name$num"
            println(str)
            str
        }.transform<String, Person> { str ->
            val person = Person(str.first().toString(), str.last().toString())
            emit(person)
        }

    val result = newFlow.collect { person ->
        println(person.toString())
    }
}
data class Person(val name: String, val id: String)


suspend fun main() {

    val nameFlow = flow {
        ('a'..'d').forEach {
            delay(500)
            emit(it)
        }
    }

    val numFlow = flow {
        (1..10).forEach {
            delay(1000)
            emit(it)
        }
    }

    val newFlow = nameFlow.combineTransform<Char, Int, Person>(numFlow) { name, num ->
        println("$name$num")
        emit(Person(name.toString(), num.toString()))
    }

    newFlow.collect { person ->
        println(person.toString())
    }
}

combinelatest

- flow 결합후 최신값만 내보내는 것 이지만 deprecated

 

zip

- 결합된 모든 플로우의 emit이 완료될때까지 작동되어진다. 

- 모든 Flow가 하나씩 방출이 완료되어서 한쌍이 나오는 시점에서 작동한다.

- Flow 중에 하나라도 방출과 사용이 완료가 되면 나머지 Flow의 emit을 기다리지 않고 종료된다. 

( 모든 데이터를 방출하려면 모든 flow의 개수가 같아야 한다. )

suspend fun main() {

    val nameFlow = flow {
        ('a'..'d').forEach {
            delay(500)
            emit(it)
        }
    }

    val numFlow = flow {
        (1..10).forEach {
            delay(1000)
            emit(it)
        }
    }

    val newFlow = nameFlow.zip(numFlow) { name, num ->
        val str = "$name$num"
        str
    }

    newFlow.collect { str ->
        println(str)
    }
}

public fun <T1, T2, R> kotlinx.coroutines.flow.Flow<T1>.zip(other: kotlinx.coroutines.flow.Flow<T2>, transform: suspend (T1, T2) -> R): kotlinx.coroutines.flow.Flow<R> { /* compiled code */ }

ZIP 또한 마지막 람다 인자로 TransForm함수를 넣어줄수 있다. 

suspend fun main() {

    val nameFlow = flow {
        ('a'..'d').forEach {
            delay(500)
            emit(it)
        }
    }

    val numFlow = flow {
        (1..10).forEach {
            delay(1000)
            emit(it)
        }
    }

    val newFlow = nameFlow.zip<Char, Int, Person>(numFlow) { name, num ->
        val person = Person(name.toString(), num.toString())
        person
    }

    newFlow.collect { person ->
        println(person)
    }
}