오늘도 더 나은 코드를 작성하였습니까?

Kotlin flow 기초 본문

Coroutine/coroutineFlow

Kotlin flow 기초

hik14 2021. 5. 13. 14:43

개요

coroutine에서 flow은 단일 값만 반환하는 suspend 함수와 달리 여러 값을 순차적으로 내보낼 수 있는 유형이다.

coroutine flow는 순차적으로 값을 내보내고 정상적으로 또는 예외(Error)로 완료되는 비동기적인 데이터 스트림입니다.

내보낸 값은 동일한 유형이어야 합니다. 예를 들어 Flow<Int>는 정수 값을 내보내는 flow입니다.

 

flow는 코 루틴을 기반으로 빌드되며 여러 값을 제공할 수 있다. flow는 비동기식으로 계산할 수 있는 데이터 스트림의 개념

flow는 value sequence를 생성하는 Iterator와 매우 비슷하지만 suspend 함수를 사용하여 값을 비동기적으로 생성하고 사용

 

사용예시

- flow을 사용하여 데이터베이스에서 실시간 업데이트를 수신할 수 있습니다

- flow은 기본 스레드를 차단하지 않고 다음 값을 생성할 네트워크 요청을 안전하게 만들 수 있습니다.

 

data Stream 항목

 

Producer(생산자)

- stream에 담기는 데이터를 생산한다.  coroutin을 이용하여 비동기적으로 데이터를 생산 가능하다. 

- dataSource( Network API(retrofit), App 내장 DB(Room))

 

Intermediary(중개자 선택) - stream에 추가되는 데이터에 연산을 할 수 있고 stream자체를 수정 가능하다.

 

Consummer(소비자)

- stream의 값을 사용한다.

- View

 

하지만, View에서 사용자 입력을 받아서 데이터의 생산자가 될 수 있으며, 다른 계층구조의 레이어가 사용할 수 도있다. 

 

 

Flow 생성하기

flow을 만들려면 flowBuilder API를 사용합니다. 

flow 빌더 함수는 emit 함수를 사용하여 새 값을 수동으로 데이터 스트림에 내보낼 수 있는 새로운 flow을 만듭니다

 

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // flow의 요청결과를 내보낸다. 
            delay(refreshIntervalMs) // 일정시간동안 코루틴을 정지시킨다. 
        }
    }
}

// 네트워크 요청을 함수(방법)을 정의한 인터페이스
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

 

flow Builder 는  coroutin 내에서 실행됩니다. 따라서 동일한 비동기 API의 이점을 활용할 수 있지만 몇 가지 제한사항이 적용됩니다.

 

flow은 순차적입니다. 생산자가 코루틴에 있으므로, 

suspend 함수를 호출하면 생산자는 suspend 함수가 반환될 때까지 정지 상태로 유지됩니다.

위에선 newsApi.fetchLateNews()에서 데이터를 가져올때 까지 아래 emit. delay 는 호출되지 않는다 

 

제한사항

생산자(NewsRemoteDataSource)는 fetchLatestNews 네트워크 요청이 완료될 때까지 정지됩니다.

그런 다음에만 결과를 Stream으로 내보낸다.

 

flow Builder 에서는 생산자(NewsRemoteDataSource)가 데이터 값을 다른 CoroutineContext의 값으로 내보낼 수 없다. 그러므로 새 코루틴을 만들거나 코드의 withContext 블록을 사용하여 다른 CoroutineContext에서 emit를 호출하지 마라. 이런 경우 callbackFlow 같은  flow Builder를 사용할 수 있습니다.

Stream 변경하기(중간 연산자 )

데이터 소비하지 않고 stream에 데이터를 변경하는 중재 연산자를 사용할 수 있다. 

연산자들은 함수로써,  연산자가 stream 데이터에 적용될때,  나중에 값이 소비 될 때까지 실행되지 않는 연산체인을 생성.

 

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * flow 에서 변환이 적용된다. 
     * 이런 연산은 지연되어 flow 내부에서 작동하지 않고 
     * 현시점에서 flow에 의해 내보내질 값을 변경한다. 
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // filter 연산자는 최신뉴스에서 유저의 선호하는 데이터를 선별을 적용한다. 작동하진않음. 
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // save는 최신뉴스를 캐쉬에 저장한다 
            .onEach { news -> saveInCache(news) }
}

중간 연산자는 차례적으로 연속하여 적용할수 있으며, flow에서 데이터를 내보낼때, 지연연산될 연산체인을 만든다. 

중간 연산자를 stream에 적용하는 것만으로는 flow collecting이 시작되지 않습니다.

 

Collecting from a flow

터미널 연산자를 사용하여 flow을 트리거하여 값 수신 시작한다. 내보내지는 모든 값을 얻기 위해, collect를 사용한다. 

collect는 suspend 함수이기에, 코루틴 내부에서 실행되어야 하며, 모든 데이터에 호출되는 람다함수를 매개변수로 받는다. 

suspend 함수이므로 collect를 호출하는 코루틴은 flow가 닫힐 때까지 일시 중단 될 수 있습니다.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // collect를 사용하여 flow를 트리거 시키고 소비한다. 
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // 최신 뉴스를 이용하여 view 업데이트
            }
        }
    }
}

 

flow를 collecting하게되면,  최신 뉴스를 새로 고치고 일정 간격으로 네트워크 요청 결과를 내보내는 생산자(NewsRemoteDataSource)가 트리거 된다.

 

생산자가 while (true) 루프로 항상 활성 상태를 유지하므로 ViewModel이 지워지고 viewModelScope가 취소되면 데이터 스트림이 종료가된다..

 

Collecting from a flow 이 중지 될수 있는 이유.

 

- collecting하는 코루틴은 viewModelScope 같이 취소되고  결국 생산자를 중지합니다.

- 생산자가 데이터 내보내기 완료후,

이 경우 데이터 스트림이 닫히고 collect를 호출 한 코루틴(viewModel.launch{   })이 실행을 재개합니다.

 

flow가 다른 중간 연산자를 지정하지 않는 한 flow늦 cold and lazy됩니다.

즉, flow에서 터미널 연산자(collect) 호출 된다면,  생산자 코드가 실행됩니다.

 

만약 여러 flow collector가 있다면,  NewsRemoteDataSource가 서로 다른 고정 간격으로 최신 뉴스를 여러 번 가져옵니다. 여러 소비자가 동시에 수집 할 때 흐름을 최적화하고 공유하려면 shareIn 연산자를 사용합니다.