[关闭]
@TryLoveCatch 2025-07-04T08:32:18.000000Z 字数 3653 阅读 336

Kotlin知识体系之Flow

Kotlin知识体系


热流和冷流

StateFlow是如何比较是否需要更新呢?

  1. private fun updateState(expectedState: Any?, newState: Any): Boolean {
  2. var curSequence = 0
  3. var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
  4. synchronized(this) {
  5. val oldState = _state.value
  6. if (expectedState != null && oldState != expectedState) return false // CAS support
  7. if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
  8. _state.value = newState
  9. curSequence = sequence
  10. if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
  11. curSequence++ // make it odd
  12. sequence = curSequence
  13. } else {
  14. // update is already in process, notify it, and return
  15. sequence = curSequence + 2 // change sequence to notify, keep it odd
  16. return true // updated
  17. }
  18. curSlots = slots // read current reference to collectors under lock
  19. }
  20. /*
  21. Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
  22. Loop until we're done firing all the changes. This is a sort of simple flat combining that
  23. ensures sequential firing of concurrent updates and avoids the storm of collector resumes
  24. when updates happen concurrently from many threads.
  25. */
  26. while (true) {
  27. // Benign race on element read from array
  28. curSlots?.forEach {
  29. it?.makePending()
  30. }
  31. // check if the value was updated again while we were updating the old one
  32. synchronized(this) {
  33. if (sequence == curSequence) { // nothing changed, we are done
  34. sequence = curSequence + 1 // make sequence even again
  35. return true // done, updated
  36. }
  37. // reread everything for the next loop under the lock
  38. curSequence = sequence
  39. curSlots = slots
  40. }
  41. }
  42. }

重点是这句if (oldState == newState) return true,所以是通过==来进行比较的

== 操作符

  • 结构相等(内容相等)的比较,等同于调用 equals() 方法。
  • 它用于比较两个对象的内容是否相等。
  • 即使两个对象的内存地址不同,只要它们的内容相等,== 仍然会返回 true

=== 操作符

  • 引用相等的比较,用来检查两个对象是否是同一个引用。
  • 它比较的是两个对象在内存中的地址是否相同。
  • 只有当两个变量引用的是同一个对象实例时,=== 才会返回 true。

所以我们可以理解为,当你在 Kotlin 中使用 == 时,编译器会将它转换为对 equals() 方法的调用。
一般来说,StateFlow我们会使用data class,在 Kotlin 中,数据类(data class)会自动为你生成 equals() 方法,它会根据所有的属性值来比较对象是否相同,默认的 equals()大致是如下这样实现的:

  1. class Person(val name: String, val age: Int) {
  2. override fun equals(other: Any?): Boolean {
  3. if (this === other) return true // 引用相等
  4. if (other !is Person) return false // 类型检查
  5. // 比较内容相等
  6. return name == other.name && age == other.age
  7. }
  8. override fun hashCode(): Int {
  9. return name.hashCode() * 31 + age
  10. }
  11. }

所以我们现在可以总结出来StateFlow的默认比较策略:

所以针对这个策略,我们想要保证StateFlow更新,就需要如下对策:

flowOn

  • flowOn 只影响它前面的操作符
  • flowOn 后面的操作符使用收集协程的上下文
  • 每个 flowOn 都会创建一个新的协程上下文作用域
  1. // dataSource
  2. private val _appListFlow = MutableStateFlow<List<AppInfo>>(emptyList())
  3. private val appListFlow: StateFlow<List<AppInfo>> = _appListFlow.asStateFlow()
  4. override fun getAppList(): Flow<List<AppInfo>> {
  5. return appListFlow
  6. }
  7. // repository
  8. fun getAppList(): Flow<List<AppInfo>> {
  9. return getAppList().map { appList ->
  10. val finalList = processAppList(appList)
  11. BLogger.i(TAG, "getAppList -> appList.size: ${finalList.size}, ${Thread.currentThread().name}")
  12. finalList
  13. }.flowOn(Dispatchers.IO)
  14. }
  15. // viewModel
  16. mainScope.launch {
  17. appRepository.getAppList().collect {
  18. // 遍历打印
  19. var index = 1
  20. for (appInfo in it) {
  21. BLogger.i("${index++} -> $appInfo")
  22. BLogger.i("----------------------------------")
  23. }
  24. }
  25. }

咱们简化一下:

  1. // ✅ 正确用法:flowOn 影响前面的操作符
  2. mainScope.launch {
  3. getAppList() // 看更新/发射值的协程调度器
  4. .map { ... } // 在子线程处理
  5. .flowOn(Dispatchers.IO)
  6. .collect { ... } // 在收集流的协程中指定调度器,所以在主线程收集
  7. }
  1. // 在主线程调用
  2. fun updateOnMainThread() {
  3. _appListFlow.tryEmit(newList) // 在主线程执行
  4. }
  5. // 在子线程调用
  6. fun updateOnBackground() {
  7. thread {
  8. _appListFlow.tryEmit(newList) // 在子线程执行
  9. }
  10. }
  11. // 在协程中调用
  12. viewModelScope.launch(Dispatchers.IO) {
  13. _appListFlow.tryEmit(newList) // 在IO调度器线程执行
  14. }

更复杂的一个例子:

  1. mainScope.launch {
  2. getAppList()
  3. .map { ... } // 在 IO 线程
  4. .filter { ... } // 在 IO 线程
  5. .flowOn(Dispatchers.IO)
  6. .map { ... } // 在 Default 线程
  7. .flowOn(Dispatchers.Default) // 添加第二个 flowOn
  8. .map { ... } // 在主线程(收集线程)
  9. .collect { ... } // 在主线程(收集线程)
  10. }

flowOn只影响它前面的操作符

参考

https://juejin.cn/post/7169843775240405022

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注