如何表示多个值
挂起函数可以异步的返回单个值,但是该如何异步的返回多个计算好的值呢?
异步返回多个值的方案
集合 序列。挂起函数。Flow
Flow 异步流
56异步返回多个值 集合 序列 挂起函数 集合 , 序列 , 挂起函数 ,Flow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 fun simpleList () : List<Int > = listOf(1 , 2 , 3 ) fun simpleSequence () : Sequence<Int > = sequence { for (i in 1. .3 ) { yield(i) } } suspend fun simpleList2 () : List<Int > { delay(1000 ) return listOf(1 , 2 , 3 ) } @Test fun `test multiple values`() { simpleSequence().forEach { value -> println(value) } } @Test fun `test multiple values2`() = runBlocking { simpleList2().forEach { values -> println(values) } }
57-通过Flow异步返回多个值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 suspend fun simpleFlow () = flow<Int > { for (i in 1. .3 ) { delay(1000 ) emit(i) } } @Test fun `test multiple values3`() = runBlocking { launch { for (k in 1. .3 ){ println("I am not blocked $k " ) delay(1500 ) } } simpleFlow().collect { value -> println(value) } }
返回多个值 而且是异步的
I am not blocked 1 1 I am not blocked 2 2 I am not blocked 3 3
58 Flow与其他方式的区别
59 Flow应用
流的特性 60 冷流 Flow是一种类似于序列的冷流,flow够坚强中的代码知道流被收集的时候才运行
类似于懒加载, 点了Button,collect才开始下载
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 fun simpleFlow2 () = flow<Int > { println("Flow started" ) for (i in 1. .3 ) { delay(1000 ) emit(i) } } @Test fun `test flow is cold`() = runBlocking { val flow = simpleFlow2() println("Calling collect..." ) flow.collect { value -> println(value) } println("Calling collect again" ) flow.collect { value -> println(value) } }
只要执行 还会再执行一次。
1 2 3 4 5 6 7 8 9 10 Calling collect... Flow started 1 2 3 Calling collect again Flow started 1 2 3
61.流的连续性
1 2 3 4 5 6 7 (1. .5 ).asFlow().filter { it % 2 == 0 }.map { "string $it " }.collect{ println("Collect ${it} " ) }
1 2 Collect string 2 Collect string 4
流构建器 flowOf构建器定义了一个发射固定值集的流
使用asFlow()拓展函数,可以将各种集合与序列转换为流
1 2 3 4 5 6 7 8 9 flowOf("one" , "two" , "three" ) .onEach { delay(1000 ) } .collect { println(it) } (1. .3 ).asFlow().collect { println(it) }
63.流上下文 flowOn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 fun simpleFlow3 () = flow<Int > { println("Flow started ${Thread.currentThread().name} " ) for (i in 1. .3 ){ delay(1000 ) emit(i) } } @Test fun `test flow context`() = runBlocking { simpleFlow3().collect{ println("Collected $it ${Thread.currentThread().name} " ) } }
构建流和保存流在同一个上下文
构建的时候用协程启动的后台线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 fun simpleFlow4 () = flow<Int > { withContext(Dispatchers.IO) { println("Flow started ${Thread.currentThread().name} " ) for (i in 1. .3 ) { delay(1000 ) emit(i) } } } @Test fun `test flow context4`() = runBlocking { simpleFlow4().collect { println("Collected $it ${Thread.currentThread().name} " ) } }
Flow started DefaultDispatcher-worker-1 @coroutine#1
Flow invariant is violated: // 报错
用flowOn可以解决这个问题。flowon可以更改流发射的上下文。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 fun simpleFlow5 () = flow<Int > { println("Flow started ${Thread.currentThread().name} " ) for (i in 1. .3 ) { delay(1000 ) emit(i) } }.flowOn(Dispatchers.Default) @Test fun `test flow context5`() = runBlocking { simpleFlow5().collect { println("Collected $it ${Thread.currentThread().name} " ) } }
1 2 3 4 Flow started DefaultDispatcher-worker-1 @coroutine#2 Collected 1 Test worker @coroutine#1 Collected 2 Test worker @coroutine#1 Collected 3 Test worker @coroutine#1
发送在后台线程,收集在主线程
64.在指定协程中收集流 使用launchIn(),替换collect 我们可以在单独的协程中启动流的收集
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 fun events () = (1. .3 ) .asFlow() .onEach { delay(100 ) } .flowOn(Dispatchers.Default) @Test fun `test flow launch`() = runBlocking<Unit > { val job = events().onEach { events -> println("Event: $events ${Thread.currentThread().name} " ) } .launchIn(CoroutineScope(Dispatchers.IO)) .join() delay(200 ) }
Event: 1 DefaultDispatcher-worker-3 @coroutine#2 Event: 2 DefaultDispatcher-worker-3 @coroutine#2 Event: 3 DefaultDispatcher-worker-2 @coroutine#2
65. 流的取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 fun simpleFlow6 () = flow<Int > { println("Flow started ${Thread.currentThread().name} " ) for (i in 1. .3 ) { delay(1000 ) emit(i) println("Emitting $i " ) } }.flowOn(Dispatchers.Default) @Test fun `test cancel flow`() = runBlocking<Unit > { withTimeoutOrNull(2500 ){ simpleFlow6().collect{value-> println(value)} } println("Done" ) }
withTimeoutOrNull
1 2 3 4 5 6 Flow started DefaultDispatcher-worker-1 @coroutine#2 Emitting 1 1 Emitting 2 2 Done
协程被取消 3没有打印
66流的取消检测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 fun simpleFlow7 () = flow<Int > { for (i in 1. .5 ) { delay(1000 ) emit(i) println("Emitting $i " ) } } @Test fun `test cancel flow check`() = runBlocking<Unit > { withTimeoutOrNull(2500 ) { simpleFlow7().collect { value -> println(value) if (value == 3 ) cancel() } } }
1 Emitting 1 2 Emitting 2 BUILD SUCCESSFUL in 3s
背压
buffer
stateFlow
sharedFlow