Coroutine/coroutineFlow

stateFlow sharedFlow

hik14 2021. 5. 27. 17:34

StateFlow와 SharedFlow는 Flow 에서 최적으로 데이터 state 업데이트를 내보내고 여러 소비자에게 값을 내보낼 수 있는 Flow API입니다.

 

statsflow

- Observable flow

- state를 지니고 있음.

- 최신 및 새로운 상태를 여러  collectors들에게 업데이트 시켜줌.

- value property를 통해 값을 받을 수 있다.

- state를 업데이트하고 flow로 보내려면 MutableStateFlow 클래스의 value 속성에 새 값을 할당한다.

- flow를 StateFlow로 변환하려면 stateIn 중간 연산자를 사용

  private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
  val uiState: StateFlow<LatestNewsUiState> = _uiState
  
  _uiState.value =  LatestNewsUiState.Success(favoriteNews)

- 관찰이 필요한 가변상태를 유지해야하는 클래스에 적합하다. 

 

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // 최신뉴스를 통해 view 업데이트
                // value에 값을 할당 하면 flow에 새로운 요소가 추가되고 
                // 이 flow를 수집하는 모든곳이 업데이트 된다.  
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}

// 최신뉴스 화면에 서로 다른 상태 값
sealed class LatestNewsUiState {
    data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
    data class Error(exception: Throwable): LatestNewsUiState()
}

생산자가 MutableStateFlow를  업데이트 하면,  소비자는 stateFlow를 수집하는 모든 클래스이다.

기존의 ColdFlow( flow builder를 사용하는) 와 다르게, stateFlow는 HotFlow이다. 

(hotFlow : flow를 수집하는 쪽이 생산자 코드를 실행하지 않는다.)

stateFlow는 항상 활성화 상태이면 메모리에 있다. 가비지 루트에서 부터 참조하는 것이 없을 경우만 가비지 콜렉터의 대상이 된다.

 

새로운 소비자가 stateflow에서 수집을 시작하면 스트림의 마지막 상태와 후속 상태가 수신됩니다.

 

View는 다른 flow와 마찬가지로 stateFlow를 listening 하고 있음.

 

class LatestNewsActivity : AppCompatActivity() {

    private val latestNewsViewModel = // getViewModel()

    override fun onCreate(savedInstanceState: Bundle?) {
        ...
   		// 이 코루틴은 lifecycle이 Started 일때 시작하고 Stoped 상태에서 중지된다 
        lifecycleScope.launchWhenStarted {
            // flow를 시작시키고 값을 listening 시작함
            latestNewsViewModel.uiState.collect { uiState ->
				새로운 값을 처리 한다. 
                when (uiState) {
                    is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
                    is LatestNewsUiState.Error -> showError(uiState.exception)
                }
            }
        }
    }
}

 

  *  Lifecycle Kotlin 확장 프로그램의 launchWhen() 함수를 사용하여 UI 계층에서 flow을 collecting하는 것은 항상 안전한 것은 아니다. 뷰가 백그라운드로 이동하면 코 루틴이 일시 중지되어 생산자는 활성화되고 뷰가 소비하지 않는 값을 잠재적으로 방출합니다.

이 동작은 CPU 및 메모리 리소스를 낭비 할 수 있습니다.

 

StateFlow는 ViewModel로 범위가 지정되어 있으므로 launchWhen () 함수를 사용하여 안전하게 collecting 할 수 있으며, View가 백그라운드로 이동할 때 메모리에 남아있게하고, UI 상태에 대해 View에 알리기 만하면 간단한 작업을 수행합니다. 

그러나 문제는 더 집중적 이고 무거운 일을하는 다른 생산자 때문에 생길 수 있다.

 

StateFlow, Flow, and LiveData

StateFlow와 LiveData에는 유사점이 있습니다. 둘 다 Oservable data holder 클래스이며 앱 아키텍처에서 사용할 때 유사한 패턴을 따릅니다.

 

StateFlow와 LiveData는 다른점.

 

- StateFlow를 생성할 때 생성자에 전달되는 초기 state가 필요하지만 LiveData는 그렇지 않습니다

- LiveData.observe ()는 View가 stopped 상태가 될 때 자동으로 소비자를 등록 해제하지만

  StateFlow 또는 flow는 collecting하는것은 그렇지 않다. (UI 계층에서 collection을 사용하면 안된다. )

 - hotFlow 구현에서는 UI가 화면에 표시되지 않을 때(백그라운드에 있을때) collecting 할 경우 주의해야합니다. 이렇게하면 리소스가 낭비 될 수 있습니다.  아래 예와 같이 흐름 수집을 수동으로 중지 할 수 있습니다.

 

class LatestNewsActivity : AppCompatActivity() {
    ...
    // UI states를 위한 참조
    private var uiStateJob: Job? = null

    override fun onStart() {
        super.onStart()
        // view 가 가시상태면 수집 시작한다. 
        uiStateJob = lifecycleScope.launch {
            latestNewsViewModel.uiState.collect { uiState -> ... }
        }
    }

    override fun onStop() {
        // view 가 보이지 않으면 collecting을 멈춘다.
        uiStateJob?.cancel()
        super.onStop()
    }
}

view가 보이지 않을 때  uiState 변경 수신을 중지하는 또 다른 방법은 lifecycle-livedata-ktx 라이브러리의 asLiveData() 함수를 사용하여 흐름을 LiveData로 변환하여 이용하는것 이다. 

class LatestNewsActivity : AppCompatActivity() {
    ...
    override fun onCreate(savedInstanceState: Bundle?) {
        ...
        latestNewsViewModel.uiState.asLiveData().observe(owner = this) { state ->
            // Handle UI state
        }
    }
}

ShareIn 을 사용하여 coldFlow를 hotFlow로 변경하기

StateFlow는 hotFlow이다.

flow가 collecting 되거나 가비지 콜렉션 루트에서 이에 대한 다른 참조가 존재하는 동안 메모리에 남아 있다.

shareIn 연산자를 사용하여 coldFlow를 hotFlow로 변경한다.

 

Kotlin flow에서 만든 callbackFlow를 사용하면,

각 collector가 새로운 flow 을 만드는 대신, shareIn을 사용하여 collector들이  Firestore에서 검색 한 데이터를 공유 할 수 있습니다.

 

1. externalScope는 flow 공유를 위해 사용됩니다.   

(Scope는 shared flow을 필요한 만큼 오래 유지하기 위해 모든 소비자보다 오래 유지되어야합니다.)

 

2. 각 새로운 collector에게 답변할 item의 수.

 

3. 시작할 정책. 

 

class NewsRemoteDataSource(...,
    private val externalScope: CoroutineScope,
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        ...
    }.shareIn(
        externalScope,
        replay = 1,
        started = SharingStarted.WhileSubscribed()
    )
}

latestNews flow는 마지막으로 내 보낸 항목을 새 수집기에 replay(다시 보낸다)합니다. externalScope가 활성 상태이고 활성 collector 가있는 한 latestNews flow 도 활성 상태로 유지됩니다.

 

SharingStarted.WhileSubscribed () 시작 정책은 활성 구독자가있는 동안 업스트림 생산자를 활성 상태로 유지합니다.

SharingStarted.Eagerly는 생산자를 즉시 ​​시작한다.

SharingStarted.Lazily는 첫 번째 구독자가 나타난 후 공유를 시작한다

 

flow을 영원히 활성 상태로 유지하는 것과 같은 다른 시작 정책을 사용할 수 있습니다.

SharedFlow

shareIn 함수는 collecting 하는 모든 소비자에게 값을 내보내는 hotflow인 SharedFlow를 반환합니다.

SharedFlow는 매우 잘  구성된 StateFlow의 일반화 버젼의 flow이다. 

 

shareIn을 사용하지 않고 SharedFlow를 만들 수 있습니다. 예를 들어 SharedFlow를 사용하여 앱의 나머지 부분에 ticks 전송하여 모든 콘텐츠가 동시에 주기적으로 새로 고쳐 지도록 할 수 있습니다. 최신 뉴스를 가져 오는 것 외에도 즐겨 찾는 주제 컬렉션으로 사용자 정보 섹션을 새로 고칠 수도 있습니다.

 

TickHandler -  SharedFlow를 노출하여 다른 클래스가 콘텐츠를 새로 고칠시기를 알 수 있도록합니다.

 

_tickFolw.value = ....

 

tickHanlder.tickFlow.collect{ .. }

// 앱 콘텐츠를 새로 고쳐야 할 때 중앙 집중화하는 클래스
class TickHandler(
    private val externalScope: CoroutineScope,
    private val tickIntervalMs: Long = 5000
) {
    // Backing property
    private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
    val tickFlow: SharedFlow<Event<String>> = _tickFlow

    init {
        externalScope.launch {
            while(true) {
                _tickFlow.emit(Unit)
                delay(tickIntervalMs)
            }
        }
    }
}


class NewsRepository(
    ...,
    private val tickHandler: TickHandler,
    private val externalScope: CoroutineScope
) {
    init {
        externalScope.launch {
            // tick updates 리스닝. 
            tickHandler.tickFlow.collect {
                refreshLatestNews() // 새로고침. 
            }
        }
    }

    suspend fun refreshLatestNews() { ... }
    ...
}

 SharedFlow 동작을 사용자 커스텀.

 

replay - 새 구독자에 대해 이전에 내 보낸 여러 값을 다시 보낼 수 있다.

onBufferOverflow -  버퍼에 전송할 항목이 가득 차있을 때에 대한 정책을 지정

(SUSPEND - 일시 중지. DROP_LATEST 최근꺼 버리기 DROP_OLDEST 오래된거 버리기)

 

MutableSharedFlow에는 active collector의 수를 알려주는 subscriptionCount 속성도 있으므로 그에 따라 비즈니스 논리를 최적화 할 수 있습니다.

 

MutableSharedFlow에는 flow에 전송 된 최신 정보를 재생하지 않으려는 경우 resetReplayCache 함수도 포함되어 있습니다.