Kotlin编程第一课–(协程篇)24 | 实战:让KtHttp支持Flow 又到了熟悉的实战环节,这一次我们接着来改造 KtHttp,让它能够支持协程的 Flow API。
有了前面两次实战的基础,这次我们应该就轻车熟路了。在之前的4.0 版本中,为了让 KtHttp 支持挂起函数,我们有两种思路,一种是改造内部 ,另一种是扩展外部。同理,为了让 KtHttp 支持 Flow,这次的实战也是这两种思路。
因此,这节课我们仍然会分为两个版本。
其实在实际的工作中,我们往往没有权限修改第三方提供的 SDK,那么这时候,如果想要让 SDK 获得 Flow 的能力,我们就只能借助 Kotlin 的扩展函数,为它扩展 出 Flow 的能力。而对于工程内部的代码,我们希望某个功能模块获得 Flow 的能力,就可以直接修改它的源代码 ,让它直接支持 Flow。
那么在这节课里,我会同时用这两种手段来扩展并改造 KtHttp,为你演示其中的关键步骤。在这个过程中,我也会为你讲解其中的常见误区和陷阱,这样一来,你就可以放心地将 Flow 应用到你的实际工作中了。
OK,让我们正式开始吧!
5.0 版本:Callback 转 Flow 在上次的实战课当中,我们在 3.0 版本里,实现了 KtHttp 的异步 Callback 请求。之后在 4.0 版本里,我们并没有改动 KtHttp 的源代码,而是直接在 KtCall 的基础上扩展了挂起函数 的支持。让我们重新回顾一下之前的代码:
suspend fun <T : Any> KtCall<T> .await () : T = suspendCancellableCoroutine { continuation -> val call = call(object : Callback<T> { override fun onSuccess (data : T ) { println("Request success!" ) continuation.resume(data ) } override fun onFail (throwable: Throwable ) { println("Request fail!:$throwable " ) continuation.resumeWithException(throwable) } }) continuation.invokeOnCancellation { println("Call cancelled!" ) call.cancel() } }
我们知道,上面这种做法非常适合针对第三方 SDK 的扩展,而这一切,都要归功于 Kotlin 的扩展函数 特性。那么这节课里,我们希望 KtHttp 支持 Flow,其实也同样可以借助扩展函数来实现。Kotlin 官方提供了一个 API:callbackFlow ,它就是专门用于将 Callback 转为 Flow 的。
Callback 转 Flow,用法跟 Callback 转挂起函数是差不多的。如果你去分析代码段 1 当中的代码模式,会发现 Callback 转挂起函数,主要有三个步骤。
第一步:使用 suspendCancellableCoroutine 执行 Callback 代码,等待 Callback 回调;
第二步:将 Callback 回调结果传出去,onSuccess 的情况就传结果,onFail 的情况就传异常;
第三步:响应协程取消事件 invokeOnCancellation{}。
所以使用 callbackFlow,也是这样三个步骤。如果你看过 Google 官方写的文档,你可能会写出这样的代码:
fun <T : Any> KtCall<T> .asFlow () : Flow<T> = callbackFlow { val call = call(object : Callback<T> { override fun onSuccess (data : T ) { offer(data ) } override fun onFail (throwable: Throwable ) { close(throwable) } }) awaitClose { call.cancel() } }
在这段代码里,callbackFlow 的使用步骤也是分了三步。不过,由于 Google 官方写的文档已经有些过时了,如果你按照文档来写,会发现注释 1 处的代码其实会报错,IDE 会提示应该使用 trySend() 替代 offer()。
所以我们要再来改一改:
fun <T : Any> KtCall<T> .asFlow () : Flow<T> = callbackFlow { val call = call(object : Callback<T> { override fun onSuccess (data : T ) { trySend(data ) } override fun onFail (throwable: Throwable ) { close(throwable) } }) awaitClose { call.cancel() } }
那么从上面的代码中,你会发现,callbackFlow 的底层用到了 Channel,所以你才可以使用 trySend() 这样的 API。这个 API 我在第 19 讲里提到过,它其实就是 Channel.send() 的非挂起函数 版本的 API。这样改完以后,我们的代码就已经没有明显报错了。
但,它仍然还有优化空间,对应的地方我已经用注释标记出来了。
我们来看一下注释 1,这里使用 trySend(),虽然在这个案例当中用这个 API 确实没问题,但在大部分场景下,它其实是不够稳妥的。你可以查看一下它的源码文档,会看到它的返回值类型是 ChannelResult ,代表 trySend() 的执行结果是成功还是失败。
public fun trySend (element: E ) : ChannelResult<Unit >
也就是说,如果我们往 Channel 当中成功地添加了元素,那么 trySend() 的返回值就是成功,如果当前的 Channel 管道已经满了,那么 trySend() 的返回值就是失败。
其实,当 Channel 管道容量已满的时候,我们更希望 trySend() 可以多等等,直到管道容量空闲以后再返回成功。所以这时候,我们可以使用 trySendBlocking() 来替代它。它是 Kotlin 协程 1.5 出现的一个新的 API。
fun <T : Any> KtCall<T> .asFlow () : Flow<T> = callbackFlow { val call = call(object : Callback<T> { override fun onSuccess (data : T ) { trySendBlocking(data ) } override fun onFail (throwable: Throwable ) { close(throwable) } }) awaitClose { call.cancel() } }
不过,这里我们仅仅只是改为 trySendBlocking() 仍然还不够,让我们来运行一下程序,看看问题出在哪里:
interface ApiServiceV5 { @GET("/repo" ) fun repos ( @Field("lang" ) lang: String , @Field("since" ) since: String ) : KtCall<RepoList>} fun main () = runBlocking { testFlow() } private suspend fun testFlow () = KtHttpV5.create(ApiServiceV5::class .java) .repos(lang = "Kotlin" , since = "weekly" ) .asFlow() .catch { println("Catch: $it " ) } .collect { println(it) }
其实,问题的原因也很简单,由于 callbackFlow 的底层是 Channel 实现的,在我们用完它以后,应该主动将其关闭或者释放。不然的话,它就会一直占用计算机资源 。所以这时候,我们可以进一步完善 trySendBlocking() 这部分的代码。
fun <T : Any> KtCall<T> .asFlow () : Flow<T> = callbackFlow { val call = call(object : Callback<T> { override fun onSuccess (data : T ) { trySendBlocking(data ) .onSuccess { close() } .onFailure { close(it) } } override fun onFail (throwable: Throwable ) { close(throwable) } }) awaitClose { call.cancel() } }
上面代码中的 onSuccess、onFailure 其实就相当于回调,在这里,不管是成功还是失败,我们都主动把 callbackFlow 当中的 Channel 关闭。这样一来,程序就可以正常终止了。
提示:在大部分场景下 trySendBlocking() 会比 trySend() 更稳妥一些,因为它会尽可能发送成功。但在某些特殊情况下,trySend() 也有它的优势,因为它不会出现阻塞问题。
好,现在,5.0 版本的代码其实就已经算是合格了。不过,我还想给你介绍下 callbackFlow 的一些使用细节:**close() 与 close(throwable)**。
close() 这个方法,我们既可以传入异常,也可以不传入。不过,这两者在 callbackFlow 当中是有差异的。如果你将代码段 7 当中所有的 close(throwable) 都改为不传异常的话,程序代码也会出现问题。
fun main () = runBlocking { testFlow() } private suspend fun testFlow () = KtHttpV5.create(ApiServiceV5::class .java) .repos(lang = "Kotlin" , since = "weekly" ) .asFlow() .catch { println("Catch: $it " ) } .collect { println(it) } fun <T : Any> KtCall<T> .asFlow () : Flow<T> = callbackFlow { val call = call(object : Callback<T> { override fun onSuccess (data : T ) { trySendBlocking(data ) .onSuccess { close() } .onFailure { close() } } override fun onFail (throwable: Throwable ) { close() } }) awaitClose { call.cancel() } }
在以上代码中,我们断网执行了这段程序,但在控制台上看不到任何异常的输出信息。这就是因为,我们调用 close() 的时候没有传入异常信息。
所以,在 callbackFlow 当中的异常分支里,我们如果使用 close(),一定要带上对应的异常 ,就像代码段 7 的那样“close(throwable)”。或者,为了防止在开发的过程中忘记传入异常信息,我们可以使用 cancel() 方法 。就像下面这样:
fun <T : Any> KtCall<T> .asFlow () : Flow<T> = callbackFlow { val call = call(object : Callback<T> { override fun onSuccess (data : T ) { trySendBlocking(data ) .onSuccess { close() } .onFailure { cancel(CancellationException("Send channel fail!" , it)) } } override fun onFail (throwable: Throwable ) { cancel(CancellationException("Request fail!" , throwable)) } }) awaitClose { call.cancel() } }
根据这里的运行结果,我们可以看到,把 close() 改成 cancel() 以后,程序运行结果也符合预期。而 cancel 其实还有一个优势:就算不小心忘记传 throwable,我们还是可以看到一个 CancellationException。
不过总的来说,只要我们可以记住传入异常信息,close() 和 cancel() 两者的差别并不大。
另外还有一点,如果我们在 callbackFlow 当中还启动了其他的协程任务,close() 和 cancel() 也同样可以取消对应的协程。如下所示:
fun main () = runBlocking { testFlow() } private suspend fun testFlow () = KtHttpV5.create(ApiServiceV5::class .java) .repos(lang = "Kotlin" , since = "weekly" ) .asFlow() .catch { println("Catch: $it " ) } .collect { println(it) } fun <T : Any> KtCall<T> .asFlow () : Flow<T> = callbackFlow { val job = launch { println("Coroutine start" ) delay(3000L ) println("Coroutine end" ) } job.invokeOnCompletion { println("Coroutine completed $it " ) } val call = call(object : Callback<T> { override fun onSuccess (data : T ) { trySendBlocking(data ) .onSuccess { close() } .onFailure { cancel(CancellationException("Send channel fail!" , it)) } } override fun onFail (throwable: Throwable ) { cancel(CancellationException("Request fail!" , throwable)) } }) awaitClose { call.cancel() } }
可以看到,由于协程是结构化的,所以,当我们取消 callbackFlow 的时候,在它内部创建的协程 job,也会跟着被取消。而且,它的异常信息也是一样的。
不过,如果我们把上面的 launch{} 改成了“launch(Job()){}”,那么,协程任务就不会跟随 callbackFlow 一起被取消了。我相信,如果你还记得上节课讲的第二条准则,那你一定可以轻松理解这句话。因为,它们的协程的父子关系已经被破坏了!
最后,我还想再提一下 awaitClose{} 这个挂起函数,它的作用其实就是监听 callbackFlow 的生命周期,当它被关闭或者取消的时候,我们应该同时把 OkHttp 当中的网络请求也取消掉。它的作用,跟代码段 1 当中的 continuation.invokeOnCancellation{} 是类似的。
好,callbackFlow 的用法我们就讲解完了,有了它,以后我们就可以轻松地把第三方 SDK 的 Callback 扩展成 Flow 了。
那么接下来,我们就进入 6.0 版本的开发吧!
6.0 版本:直接支持 Flow 实际上,对于 KtHttp 来说,4.0 版本、5.0 版本都只是外部扩展,我们对 KtHttp 的内部源代码并没有做改动。
而对于 6.0 版本的开发,我们其实是希望 KtHttp 可以直接支持返回 Flow 类型的数据,也就是这样:
interface ApiServiceV5 { @GET("/repo" ) fun repos ( @Field("lang" ) lang: String , @Field("since" ) since: String ) : KtCall<RepoList> @GET("/repo" ) fun reposFlow ( @Field("lang" ) lang: String , @Field("since" ) since: String ) : Flow<RepoList> }
请你留意上面的代码注释,在 ApiServiceV5 当中,我定义了一个接口方法 reposFlow(),它的返回值类型是 Flow,而不是之前的 KtCall。这样一来,我们在 main() 函数当中使用它的时候,就不需要使用 asFlow() 这个扩展函数了。就像下面这样:
private suspend fun testFlow () = KtHttpV5.create(ApiServiceV5::class .java) .reposFlow(lang = "Kotlin" , since = "weekly" ) .catch { println("Catch: $it " ) } .collect { println(it) } fun main () = runBlocking { testFlow() }
可以看到,当我们把 reposFlow() 的返回值类型定义成 Flow 以后,就需要改动 KtHttp 的源代码了。因为,它的内部需要根据这种情况做一些特殊的判断。
其实,在前面 3.0 版本的开发中,我们就已经做过一次判断了。当时,我们特地判断了一下,返回值类型是 KtCall 还是T。让我们来重新回顾一下当时的代码细节:
private fun <T: Any> invoke (path: String , method: Method , args: Array <Any >) : Any? { return if (isKtCallReturn(method)) { val genericReturnType = getTypeArgument(method) KtCall<T>(call, gson, genericReturnType) } else { val response = okHttpClient.newCall(request).execute() val genericReturnType = method.genericReturnType val json = response.body?.string() gson.fromJson<Any?>(json, genericReturnType) } }
看到上面的代码,相信你马上就能想明白了,如果要支持 Flow,我们只需要在这里判断一下,返回值类型是不是 Flow 即可。比如说:
private fun <T : Any> invoke (path: String , method: Method , args: Array <Any >) : Any? { return when { isKtCallReturn(method) -> { val genericReturnType = getTypeArgument(method) KtCall<T>(call, gson, genericReturnType) } isFlowReturn(method) -> { flow<T> { val genericReturnType = getTypeArgument(method) val response = okHttpClient.newCall(request).execute() val json = response.body?.string() val result = gson.fromJson<T>(json, genericReturnType) emit(result) } } else -> { val response = okHttpClient.newCall(request).execute() val genericReturnType = method.genericReturnType val json = response.body?.string() gson.fromJson<Any?>(json, genericReturnType) } } } private fun isFlowReturn (method: Method ) = getRawType(method.genericReturnType) == Flow::class .java
由于代码段 13 当中已经有了 if、else 两个条件分支了,再增加一个分支的话,我们选择了 when 表达式。这里,我们增加了一个 isFlowReturn(method) 的分支,意思就是判断返回值类型是不是 Flow,如果是的话,我们就直接使用 flow{} 创建一个 Flow 返回了。
至此,我们 6.0 版本的开发工作,其实就已经完成了。是不是觉得非常轻松?对比起 Callback 转 Flow,让 KtHttp 直接支持 Flow 确实要简单很多 。从这一点上,我们也可以看到 Flow 的强大和易用性。
那么在这时候,我们就可以写一些简单的测试代码,来验证我们的代码是否可靠了。
private fun <T : Any> invoke (path: String , method: Method , args: Array <Any >) : Any? { return when { isKtCallReturn(method) -> { val genericReturnType = getTypeArgument(method) KtCall<T>(call, gson, genericReturnType) } isFlowReturn(method) -> { logX("Start out" ) flow<T> { logX("Start in" ) val genericReturnType = getTypeArgument(method) val response = okHttpClient.newCall(request).execute() val json = response.body?.string() val result = gson.fromJson<T>(json, genericReturnType) logX("Start emit" ) emit(result) logX("End emit" ) } } else -> { val response = okHttpClient.newCall(request).execute() val genericReturnType = method.genericReturnType val json = response.body?.string() gson.fromJson<Any?>(json, genericReturnType) } } } private suspend fun testFlow () = KtHttpV5.create(ApiServiceV5::class .java) .reposFlow(lang = "Kotlin" , since = "weekly" ) .flowOn(Dispatchers.IO) .catch { println("Catch: $it " ) } .collect { logX("${it.count} " ) }
在上面的代码中,我们增加了一些日志,同时在调用处增加了“flowOn(Dispatchers.IO)”。可以看到,这样一来整个网络请求就执行在了 DefaultDispatcher 这个线程池当中,而其他部分的代码,仍然执行在 main() 线程。这也是符合预期的。
然后,我们可以通过断网来模拟出现异常的情况:
可以看到,程序的运行结果仍然是符合预期的。
下面,我们再来看看 6.0 完整的代码:
interface ApiServiceV5 { @GET("/repo" ) fun repos ( @Field("lang" ) lang: String , @Field("since" ) since: String ) : KtCall<RepoList> @GET("/repo" ) fun reposFlow ( @Field("lang" ) lang: String , @Field("since" ) since: String ) : Flow<RepoList>} object KtHttpV5 { private var okHttpClient: OkHttpClient = OkHttpClient() private var gson: Gson = Gson() var baseUrl = "https://baseUrl.com" fun <T : Any> create (service: Class <T >) : T { return Proxy.newProxyInstance( service.classLoader, arrayOf<Class<*>>(service) ) { proxy, method, args -> val annotations = method.annotations for (annotation in annotations) { if (annotation is GET) { val url = baseUrl + annotation .value return @newProxyInstance invoke<T>(url, method, args!!) } } return @newProxyInstance null } as T } private fun <T : Any> invoke (path: String , method: Method , args: Array <Any >) : Any? { if (method.parameterAnnotations.size != args.size) return null var url = path val parameterAnnotations = method.parameterAnnotations for (i in parameterAnnotations.indices) { for (parameterAnnotation in parameterAnnotations[i]) { if (parameterAnnotation is Field) { val key = parameterAnnotation.value val value = args[i].toString() if (!url.contains("?" )) { url += "?$key =$value " } else { url += "&$key =$value " } } } } val request = Request.Builder() .url(url) .build() val call = okHttpClient.newCall(request) return when { isKtCallReturn(method) -> { val genericReturnType = getTypeArgument(method) KtCall<T>(call, gson, genericReturnType) } isFlowReturn(method) -> { logX("Start out" ) flow<T> { logX("Start in" ) val genericReturnType = getTypeArgument(method) val response = okHttpClient.newCall(request).execute() val json = response.body?.string() val result = gson.fromJson<T>(json, genericReturnType) logX("Start emit" ) emit(result) logX("End emit" ) } } else -> { val response = okHttpClient.newCall(request).execute() val genericReturnType = method.genericReturnType val json = response.body?.string() gson.fromJson<Any?>(json, genericReturnType) } } } private fun getTypeArgument (method: Method ) = (method.genericReturnType as ParameterizedType).actualTypeArguments[0 ] private fun isKtCallReturn (method: Method ) = getRawType(method.genericReturnType) == KtCall::class .java private fun isFlowReturn (method: Method ) = getRawType(method.genericReturnType) == Flow::class .java } fun main () = runBlocking { testFlow() } private suspend fun testFlow () = KtHttpV5.create(ApiServiceV5::class .java) .reposFlow(lang = "Kotlin" , since = "weekly" ) .flowOn(Dispatchers.IO) .catch { println("Catch: $it " ) } .collect { logX("${it.count} " ) }
最后,我们也再来分析一下,为什么 6.0 的代码可以这么简单。这里有两个关键的地方,我也分别用注释标记了。
请你留意注释 1 处的 reposFlow() 方法的定义,它其实是一个普通的函数,并不是挂起函数。换言之,虽然它的返回值类型是 Flow,但我们并不要求它在协程当中被调用。
另外,请留意注释 2 处,flow{} 这个高阶函数,它也只是一个普通函数,同样也不是挂起函数,这就意味着,它可以在普通函数里面直接调用。我们可以看看 flow{} 的定义:
public fun <T> flow (@BuilderInference block: suspend FlowCollector <T >.() -> Unit ) : Flow<T> = SafeFlow(block)
所以,正因为以上这两点,就使得 Flow 的易用性非常高,还记得我们在第 20 讲当中看过的那张 Flow“上游、下游”的示意图吗?我们其实可以进一步完善它:
也就是说,对于 Flow 的上游、中间操作符 而言,它们其实根本就不需要协程作用域,只有在下游调用 collect{} 的时候,才需要协程作用域。
因此,我们前面在写 main() 函数的时候,也可以换成这样的写法:
fun main () { val flow = KtHttpV5.create(ApiServiceV5::class .java) .reposFlow(lang = "Kotlin" , since = "weekly" ) .flowOn(Dispatchers.IO) .catch { println("Catch: $it " ) } runBlocking { flow.collect { logX("${it.count} " ) } } }
可见,正因为 Flow 的上游不需要协程作用域,我们才可以轻松完成 6.0 版本的代码。
小结 这节实战课,为了让 KtHttp 支持 Flow API,我们使用了两种方法。第一种,是从 KtHttp 的外部进行扩展,用这种思路,我们完成了 5.0 版本的开发;第二种,是修改 KtHttp 的内部,让 ApiService 当中的方法可以直接以 Flow 作为返回值类型,利用这种思路,我们完成了 6.0 的开发。
具体来说,我们是用到了这几个知识点,你可以重点关注一下:
**callbackFlow{}**,它的作用就是把 Callback 转换成 Flow。它的底层其实用到了 Channel,因此,我们可以在 callbackFlow{} 当中调用 trySend()、trySendBlocking(),这两个方法都是 Channel 当中的“非挂起函数”的方法。需要注意的是,这里我们不能直接使用 Channel 的挂起函数 send(),因为它必须要在协程体当中执行。
在 callbackFlow{} 里,出现异常的逻辑分支当中,如果我们需要关闭 callbackFlow,那么在调用 close() 的时候,一定要传入对应的异常参数 **close(throwable)**。不然的话,Flow 的下游就无法收到任何的异常信息。
在 callbackFlow{} 当中创建的协程任务 ,也可以跟随 callbackFlow 一同被取消,只要我们不打破它原有的协程父子关系。
由于 Flow 的上游、中间操作符 不需要协程作用域,因此,我们可以在非协程当中执行创建 Flow。这就导致我们 6.0 版本的代码轻松就可以实现。
思考题 在 5.0 版本的代码中,awaitClose{} 的作用是响应协程的取消,同时取消 OkHttp 的请求。其实,它除了这个作用以外,还有另外一个作用。
你可以把 5.0 版本代码中的 awaitClose 删掉,看看会发生什么。对于这样的现象,你能想到 awaitClose{} 的另一个作用吗?
fun <T : Any> KtCall<T> .asFlow () : Flow<T> = callbackFlow { val call = call(object : Callback<T> { override fun onSuccess (data : T ) { trySendBlocking(data ) .onSuccess { close() } .onFailure { cancel(CancellationException("Send channel fail!" , it)) } } override fun onFail (throwable: Throwable ) { cancel(CancellationException("Request fail!" , throwable)) } }) }