StateFlow의 재시도 처리 구현하기

hongbeom
hongbeomi dev
Published in
14 min readSep 3, 2024

흔히 사용하는 StateFlow의 재시도 처리를 구현해봅니다.

Photo by Anthony Gomez on Unsplash

우리는 컴포즈나 XML 환경에서 일반적으로 화면을 구현할 때 화면의 상태를 StateFlow로 표현하곤 합니다. UI에 대한 상태를 sealed Interface나 data class를 통해 구현한 뒤, 필요한 데이터를 끌어와서 StateFlow로 변환하고 ViewModel에 이를 보관하여 사용하는 경우가 많은데요. 아래와 같은 몇 가지 일반적인 패턴을 자주 보셨을 것 같습니다.

  • MutableStateFlowStateFlow를 사용하고 init 블록이나 LaunchedEffect 에서 초기화
class XXXViewModel(...): ViewModel {

private val _uiState = MutableStateFlow<XXXState>(XXXState.XXX)
val uiState: StateFlow<XXXState> = _uiState.asStateFlow()

// 방법 1. init 블록에서 초기화
init {
fetchData()
}

// 데이터 소스에서 데이터를 끌어와서 uiState로 매핑
fun fetchData() {
viewModelScope.launch {
val result = XXXRepository.fetch()
...
_uiState.value = state
}
}

}

// 방법 2. Composable의 LaunchedEffect를 통해 초기화

2. stateIn 함수를 사용하고 업스트림이 활성화되면 상태 초기화

class XXXViewModel(...): ViewModel {

val uiState: StateFlow<XXXState> = flow { emit(XXXRepository.fetch()) }
.map { ... }
.stateIn(
scope = viewModelScope,
started = SharingStarted.XXX,
initialValue = XXXState.XXX
)

}

Viewmodel 객체를 생성하는 동안 부작용이 발생할 수 있으며 LaunchedEffect는 화면의 각 초기 컴포지션과 함께 다시 트리거 될 위험이 존재한다.

Google의 Android Toolkit 팀의 Ian Lake는 첫 번째 패턴의 두가지 방식은 위와 같은 이유로 모두 안티 패턴이라고 언급하였고, Flow를 stateIn이나 sharedIn과 결합하고 collectAsStateWithLifecycle API를 통해 flow를 구독할 것을 권장하고 있습니다. (자세한 설명은 엄재웅님의 미디엄 게시물을 참고하세요.)

재시도 처리

우리는 화면에서 표현되는 데이터를 다시 끌어오고 상태를 업데이트하는 재시도 로직을 구현해야 하는 상황을 빈번하게 마주할 수 있습니다. 그렇다면 stateIn을 통해 상태를 구현했을 때는 어떻게 이를 구현할 수 있을까요?

사실 이에 대한 해법은 SharingStartedWhileSubscribed API에서 찾을 수 있습니다.

SharingStartedWhileSubscribed 함수로 SharingStarted를 지정했을 경우 첫 번째 구독자가 나타나면 업스트림의 활성화가 시작되고 활성화된 구독자가 없어지면 (화면이 백그라운드로 내려가는 경우) 중지되며 다시 구독자가 나타날 경우 업스트림의 활성화가 재개됩니다. 결과적으로 UI에 실제로 필요할 때만 데이터가 로드되도록 하는 것입니다. 어떻게 이런 동작이 가능한지 먼저 살펴보겠습니다.

public fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T> {
val config = configureSharing(1)
val state = MutableStateFlow(initialValue)
val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)
return ReadonlyStateFlow(state, job)
}

stateIn의 내부 구현을 살펴보면 job을 하나 만드는 것을 볼 수 있습니다.

private fun <T> CoroutineScope.launchSharing(
...
shared: MutableSharedFlow<T>,
upstream: Flow<T>,
...
): Job {
...
return launch(context, start = start) {
when {
...
else -> {
// other & custom strategies
started.command(shared.subscriptionCount)
.distinctUntilChanged()
.collectLatest {
when (it) {
SharingCommand.START -> upstream.collect(shared) // 재개
SharingCommand.STOP -> { }
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
if (initialValue === NO_VALUE) {
shared.resetReplayCache() // SharedFlow
} else {
shared.tryEmit(initialValue) // StateFlow
}
}
}
}
}
}
}
}

해당 job의 내부 구현 중 shared 파라미터를 WhileSubscribed() 함수로 지정했을 때 생성되는 StartedWhileSubscribed에 관련된 코드만 중점적으로 살펴 보겠습니다.

SharingStartedcommand 함수가 실행되고 리턴되는 SharingCommand 값에 따라 업스트림을 실행하거나 정지하거나, 리셋시키는 것을 볼 수 있습니다. STOP_AND_RESET_REPLAY_CACHE에서 initialValue가 존재한다면 (StateFlow의 경우) initialValue로 다시 리셋 시키는 것도 확인할 수 있네요.

WhileSubscribed 함수를 통해 생성되는 StartedWhileSubscribedcommand함수에선 어떤 방식으로 값을 방출하고 있을까요?

StartedWhileSubscribed

private class StartedWhileSubscribed(
// 구독자가 사라질 때 지연값 (기본값은 0)
private val stopTimeout: Long,
// 구독자가 사라진 뒤 캐시 재설정 시 지연값
// (기본값은 Duration.INFINITE이며 리플레이 캐시를 영원히 유지하고 버퍼를 재설정하지 않음)
private val replayExpiration: Long
) : SharingStarted {
...
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
subscriptionCount.transformLatest { count ->
if (count > 0) {
emit(SharingCommand.START)
} else {
delay(stopTimeout)
if (replayExpiration > 0) {
emit(SharingCommand.STOP)
delay(replayExpiration)
}
emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
}
}
.dropWhile { it != SharingCommand.START }
.distinctUntilChanged()
...
}

파라미터로 전달되는 구독자 수를 나타내는 flow를 다음과 같은 방식으로 SharingCommand로 변환하여 내보내고 있습니다.

  1. 구독자 수가 1명 이상이면 START 값을 방출. 그렇지 않으면 (구독자가 없는 경우) delay를 걸고 STOP을 방출하거나 STOP_AND_RESET_REPLAY_CACHE을 방출 (기본적으로 replayExpiration을 지정하지 않을 경우 STOP을 방출하고 무한 대기)
  2. dropWhile을 통해 최초의 방출 값이 START가 아닐 경우 drop 시킴
  3. 중복 제거 처리

이제 StartedWhileSubscribed의 구현을 참고하여 재시도 처리를 시도할 수 있도록 SharingStarted를 구현해볼 수 있습니다. (원문 : https://medium.com/@der.x/restartable-stateflows-in-compose-46316ce670a9)

sealed interface StartedRestartableCommandProvider {

fun commands(): List<SharingCommand>

object StopAndReset : StartedRestartableCommandProvider {
override fun commands() = listOf(
SharingCommand.STOP_AND_RESET_REPLAY_CACHE,
SharingCommand.START
)
}

object Stop : StartedRestartableCommandProvider {
override fun commands() = listOf(
SharingCommand.STOP,
SharingCommand.START
)
}

}

첫 번째로 재시도 전략을 선택하여 처리할 수 있도록 StartedRestartableCommandProvider를 작성해주었습니다. STOP_AND_RESET_REPLAY_CACHE을 먼저 제공한다면 위에서 살펴본 launchSharing함수의 구현에 따라 재시도 시 우리가 지정한 initialValue가 표시되었다가 업스트림이 활성화될 것이고, STOP 을 먼저 제공한다면 initialValue가 표시되지 않고 업스트림이 다시 활성화될 것입니다.

interface StartedRestartable: SharingStarted {
fun restart()
}

// 구현체
private class StartedRestartableImpl(
private val sharingStarted: SharingStarted,
private val provider: StartedRestartableCommandProvider
) : StartedRestartable {

private val flow = MutableSharedFlow<SharingCommand>(extraBufferCapacity = 2)

override fun restart() {
provider.commands().forEach {
flow.tryEmit(it)
}
}

override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> {
return merge(flow, sharingStarted.command(subscriptionCount))
}
}

SharingStarted 인터페이스가 열려 있기 때문에 이를 상속받은 StartedRestartable 인터페이스를 작성하고 구현체를 구성합니다. 다른 SharingStarted와 함께 사용할 수 있도록 merge 함수를 통해 command 함수를 재구현합니다.

interface RestartableStateFlow<out T> : StateFlow<T> {
fun restart()
}

// 구현체
private class RestartableStateFlowImpl<T>(
flow: StateFlow<T>,
private val started: StartedRestartable,
) : RestartableStateFlow<T>, StateFlow<T> by flow {

override fun restart() {
started.restart()
}
}

다음으로 재시도 기능이 존재하는 StateFlow가 필요하기 때문에 restart 행위가 존재하는 Flow 와 구현체를 작성해줍니다.

fun <T> Flow<T>.restartableStateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T,
restartableCommandProvider: StartedRestartableCommandProvider = StopAndReset
): RestartableStateFlow<T> {
val sharingStarted: StartedRestartable = StartedRestartableImpl(
started,
restartableCommandProvider
)
val state = stateIn(scope, sharingStarted, initialValue)

return RestartableStateFlowImpl(state, sharingStarted)
}

마지막으로 확장 함수를 구현하여 쉽게 사용할 수 있도록 노출합니다. (기본 재시도 전략은 StopAndReset)

class XXXViewModel(...): ViewModel {

val uiState: StateFlow<XXXState> = flow { emit(XXXRepository.fetch()) }
.map { ... }
.restartableStateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = XXXState.XXX
)

}

// using
@Composable
fun XXX() {
...
onClickRetry = uiState::restart
...
}

이제 사용하는 곳에서 간단하게 다른 SharingStarted와 결합하여 사용할 수 있습니다. 😀

마치며

평소 stateIn으로 상태를 StateFlow로 표현하는 코드를 자주 작성하고 있었고, 어떻게 재시도 로직을 구현해야할지 생각해보며 여러 가지 방식으로 구현해보고 있었는데 위 방식이 하나의 해결책이 될 수 있을 것 같습니다.

참조

--

--