StateFlow의 재시도 처리 구현하기
흔히 사용하는 StateFlow의 재시도 처리를 구현해봅니다.
우리는 컴포즈나 XML 환경에서 일반적으로 화면을 구현할 때 화면의 상태를 StateFlow
로 표현하곤 합니다. UI에 대한 상태를 sealed Interface나 data class를 통해 구현한 뒤, 필요한 데이터를 끌어와서 StateFlow
로 변환하고 ViewModel에 이를 보관하여 사용하는 경우가 많은데요. 아래와 같은 몇 가지 일반적인 패턴을 자주 보셨을 것 같습니다.
MutableStateFlow
와StateFlow
를 사용하고init
블록이나LaunchedEffect
에서 초기화
class XXXViewModel(...): ViewModel {
private val _uiState = MutableStateFlow<XXXState>(XXXState.XXX)
val uiState: StateFlow<XXXState> = _uiState.asStateFlow()
// 방법 1. init 블록에서 초기화
init {
fetchData()
}
// 데이터 소스에서 데이터를 끌어와서 uiState로 매핑
fun fetchData() {
viewModelScope.launch {
val result = XXXRepository.fetch()
...
_uiState.value = state
}
}
}
// 방법 2. Composable의 LaunchedEffect를 통해 초기화
2. stateIn
함수를 사용하고 업스트림이 활성화되면 상태 초기화
class XXXViewModel(...): ViewModel {
val uiState: StateFlow<XXXState> = flow { emit(XXXRepository.fetch()) }
.map { ... }
.stateIn(
scope = viewModelScope,
started = SharingStarted.XXX,
initialValue = XXXState.XXX
)
}
Viewmodel 객체를 생성하는 동안 부작용이 발생할 수 있으며 LaunchedEffect는 화면의 각 초기 컴포지션과 함께 다시 트리거 될 위험이 존재한다.
Google의 Android Toolkit 팀의 Ian Lake는 첫 번째 패턴의 두가지 방식은 위와 같은 이유로 모두 안티 패턴이라고 언급하였고, Flow를 stateIn
이나 sharedIn
과 결합하고 collectAsStateWithLifecycle
API를 통해 flow를 구독할 것을 권장하고 있습니다. (자세한 설명은 엄재웅님의 미디엄 게시물을 참고하세요.)
재시도 처리
우리는 화면에서 표현되는 데이터를 다시 끌어오고 상태를 업데이트하는 재시도 로직을 구현해야 하는 상황을 빈번하게 마주할 수 있습니다. 그렇다면 stateIn
을 통해 상태를 구현했을 때는 어떻게 이를 구현할 수 있을까요?
사실 이에 대한 해법은 SharingStarted
의 WhileSubscribed
API에서 찾을 수 있습니다.
SharingStarted
의 WhileSubscribed
함수로 SharingStarted
를 지정했을 경우 첫 번째 구독자가 나타나면 업스트림의 활성화가 시작되고 활성화된 구독자가 없어지면 (화면이 백그라운드로 내려가는 경우) 중지되며 다시 구독자가 나타날 경우 업스트림의 활성화가 재개됩니다. 결과적으로 UI에 실제로 필요할 때만 데이터가 로드되도록 하는 것입니다. 어떻게 이런 동작이 가능한지 먼저 살펴보겠습니다.
public fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T> {
val config = configureSharing(1)
val state = MutableStateFlow(initialValue)
val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)
return ReadonlyStateFlow(state, job)
}
stateIn
의 내부 구현을 살펴보면 job
을 하나 만드는 것을 볼 수 있습니다.
private fun <T> CoroutineScope.launchSharing(
...
shared: MutableSharedFlow<T>,
upstream: Flow<T>,
...
): Job {
...
return launch(context, start = start) {
when {
...
else -> {
// other & custom strategies
started.command(shared.subscriptionCount)
.distinctUntilChanged()
.collectLatest {
when (it) {
SharingCommand.START -> upstream.collect(shared) // 재개
SharingCommand.STOP -> { }
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
if (initialValue === NO_VALUE) {
shared.resetReplayCache() // SharedFlow
} else {
shared.tryEmit(initialValue) // StateFlow
}
}
}
}
}
}
}
}
해당 job의 내부 구현 중 shared 파라미터를 WhileSubscribed()
함수로 지정했을 때 생성되는 StartedWhileSubscribed
에 관련된 코드만 중점적으로 살펴 보겠습니다.
SharingStarted
의 command
함수가 실행되고 리턴되는 SharingCommand
값에 따라 업스트림을 실행하거나 정지하거나, 리셋시키는 것을 볼 수 있습니다. STOP_AND_RESET_REPLAY_CACHE
에서 initialValue
가 존재한다면 (StateFlow의 경우) initialValue
로 다시 리셋 시키는 것도 확인할 수 있네요.
WhileSubscribed
함수를 통해 생성되는 StartedWhileSubscribed
의 command
함수에선 어떤 방식으로 값을 방출하고 있을까요?
StartedWhileSubscribed
private class StartedWhileSubscribed(
// 구독자가 사라질 때 지연값 (기본값은 0)
private val stopTimeout: Long,
// 구독자가 사라진 뒤 캐시 재설정 시 지연값
// (기본값은 Duration.INFINITE이며 리플레이 캐시를 영원히 유지하고 버퍼를 재설정하지 않음)
private val replayExpiration: Long
) : SharingStarted {
...
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
subscriptionCount.transformLatest { count ->
if (count > 0) {
emit(SharingCommand.START)
} else {
delay(stopTimeout)
if (replayExpiration > 0) {
emit(SharingCommand.STOP)
delay(replayExpiration)
}
emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
}
}
.dropWhile { it != SharingCommand.START }
.distinctUntilChanged()
...
}
파라미터로 전달되는 구독자 수를 나타내는 flow를 다음과 같은 방식으로 SharingCommand
로 변환하여 내보내고 있습니다.
- 구독자 수가 1명 이상이면
START
값을 방출. 그렇지 않으면 (구독자가 없는 경우)delay
를 걸고STOP
을 방출하거나STOP_AND_RESET_REPLAY_CACHE
을 방출 (기본적으로replayExpiration
을 지정하지 않을 경우STOP
을 방출하고 무한 대기) dropWhile
을 통해 최초의 방출 값이START
가 아닐 경우 drop 시킴- 중복 제거 처리
이제 StartedWhileSubscribed
의 구현을 참고하여 재시도 처리를 시도할 수 있도록 SharingStarted
를 구현해볼 수 있습니다. (원문 : https://medium.com/@der.x/restartable-stateflows-in-compose-46316ce670a9)
sealed interface StartedRestartableCommandProvider {
fun commands(): List<SharingCommand>
object StopAndReset : StartedRestartableCommandProvider {
override fun commands() = listOf(
SharingCommand.STOP_AND_RESET_REPLAY_CACHE,
SharingCommand.START
)
}
object Stop : StartedRestartableCommandProvider {
override fun commands() = listOf(
SharingCommand.STOP,
SharingCommand.START
)
}
}
첫 번째로 재시도 전략을 선택하여 처리할 수 있도록 StartedRestartableCommandProvider
를 작성해주었습니다. STOP_AND_RESET_REPLAY_CACHE
을 먼저 제공한다면 위에서 살펴본 launchSharing
함수의 구현에 따라 재시도 시 우리가 지정한 initialValue
가 표시되었다가 업스트림이 활성화될 것이고, STOP
을 먼저 제공한다면 initialValue
가 표시되지 않고 업스트림이 다시 활성화될 것입니다.
interface StartedRestartable: SharingStarted {
fun restart()
}
// 구현체
private class StartedRestartableImpl(
private val sharingStarted: SharingStarted,
private val provider: StartedRestartableCommandProvider
) : StartedRestartable {
private val flow = MutableSharedFlow<SharingCommand>(extraBufferCapacity = 2)
override fun restart() {
provider.commands().forEach {
flow.tryEmit(it)
}
}
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> {
return merge(flow, sharingStarted.command(subscriptionCount))
}
}
SharingStarted
인터페이스가 열려 있기 때문에 이를 상속받은 StartedRestartable
인터페이스를 작성하고 구현체를 구성합니다. 다른 SharingStarted
와 함께 사용할 수 있도록 merge
함수를 통해 command
함수를 재구현합니다.
interface RestartableStateFlow<out T> : StateFlow<T> {
fun restart()
}
// 구현체
private class RestartableStateFlowImpl<T>(
flow: StateFlow<T>,
private val started: StartedRestartable,
) : RestartableStateFlow<T>, StateFlow<T> by flow {
override fun restart() {
started.restart()
}
}
다음으로 재시도 기능이 존재하는 StateFlow
가 필요하기 때문에 restart
행위가 존재하는 Flow
와 구현체를 작성해줍니다.
fun <T> Flow<T>.restartableStateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T,
restartableCommandProvider: StartedRestartableCommandProvider = StopAndReset
): RestartableStateFlow<T> {
val sharingStarted: StartedRestartable = StartedRestartableImpl(
started,
restartableCommandProvider
)
val state = stateIn(scope, sharingStarted, initialValue)
return RestartableStateFlowImpl(state, sharingStarted)
}
마지막으로 확장 함수를 구현하여 쉽게 사용할 수 있도록 노출합니다. (기본 재시도 전략은 StopAndReset
)
class XXXViewModel(...): ViewModel {
val uiState: StateFlow<XXXState> = flow { emit(XXXRepository.fetch()) }
.map { ... }
.restartableStateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = XXXState.XXX
)
}
// using
@Composable
fun XXX() {
...
onClickRetry = uiState::restart
...
}
이제 사용하는 곳에서 간단하게 다른 SharingStarted
와 결합하여 사용할 수 있습니다. 😀
마치며
평소 stateIn
으로 상태를 StateFlow
로 표현하는 코드를 자주 작성하고 있었고, 어떻게 재시도 로직을 구현해야할지 생각해보며 여러 가지 방식으로 구현해보고 있었는데 위 방식이 하나의 해결책이 될 수 있을 것 같습니다.
참조