Kotlin下Rxjava的基础用法及流式调用示例详解
目录
- 前言
- 基础用法
- fromXXX
- create
- interval & timer
- 指定线程
- observeOn & subscribeOn
- Flowable
- 流式调用
- 背压
前言
万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的还是Android开发者,所以今天来记录下。
js而所谓的响应式编程,就是一种用于应用程序异步编程的技术,他是一个通用的思想,类似与AOP,不只是在java中才有。他专注于对数据的变化做出反应,例如,有一个数据源(这里被称为生产者),一个数据目标(这里被成为消费者),然后在将消费者连接到订阅者之后,响应式编程框架负责将生产者生产的数据推送给消费者,一个可观察对象可以有任意数量的订阅者。
而对于一些思想上的框架,类似于Spring,源码上大体还是比较难的,毕竟就算是人,在思想上跨越也是有难度的,但对于RxJava来说,源码也不是很多,所以在以后会尝试介绍他的源码实现,而使用Rxjava的好处不是在于实现了什么具体的技术功能,比如使用CGLIB可以实现动态代理的技术,使用JDBC可以进行数据查询,而没有rxjava,我们的代码还可以借助Java8的Stream、CompletableFuture来实现。
而rxjava的好处在于让代码更简洁、优雅,通过他的链式调用,消除嵌套等。
在下面的例子中,我们会使用Kotlin来做示范。
基础用法
在这里,Observable 字面意思是可观察者,他表示数据源,通常,一旦订阅者开始收听,他们就会开始提供数据,而just表示仅仅,仅仅生产的数据是一个"T",即泛型类型,在这里是String。
而subscribe表示订阅,当订阅后,他会收到Observable生产的数据,来消费。
fun main() { Observable.just("hello rxjava").subscribe { println(it) } } 输出: hello rxjava
fromXXX
而上面说到,just表示仅仅,在rxjava中,不仅仅是具体的数据,还可以是Callable、Array、Future对象等,详细可以看fromXXX等方法,最终的结果由rxjava调用后如Callable的结果后,传递给订阅者。
fun main() { Observable.fromCallable { println("callable") "hello rxjava" }.subscribe { println(it) } }
create
这个方法给我了我们手动执行的能力,即传递数据到订阅者是我们手动执行的。
fun main() { Observable.create<String> { it.onNext("hello") it.onError(IllegalArgumentException("错误")) it.onComplete() }.subscribe ({ println(it) },{ println(it.message) },{ println("完成") }) }
interval & timer
还可以通过interval实现固定间隔定时。
fun main() { val observable = Observable.interval(1, TimeUnit.SECONDS) observable.subscribe { println(it) } observable.subscribe { println(it) Thread.sleep(2000) } Thread.sleep(100000); }
而timer方法则是延迟N时间后,发送数据到订阅者.
fun main() { val observable = Observable.timer(2, TimeUnit.SECONDS) observable.subscribe { println(it) } observable.subscribe { println(it) Thread.sleep(2000) } Thread.sleep(100000); }
指定线程
而使用上面方法有一个好处,即生产者可以在子线程中完成,而实际消费的时候在主线程,这在Android可谓是一种福利,如下。
fun main() { val threadPool = Executors.newCachedThreadPool() val anyFuture = threadPool.submit(Callable { Thread.sleep(2000) "hello" }) Observable.fromFuture(anyFuture).subscribe { println(it) } }
而如果担心等待时间问题,可是使用第二个重载方法,指定一个超时时间,而subscribe还有两个主要参数我们没说,一个是error发生错误时回调,一个是complete完成时回调,但在发生错误后,complete是不会回调的。
fun main() { val threadPool = Executors.newCachedThreadPool() val anyFuture = threadPool.submit(Callable { Thread.sleep(2000) "hello" }) Observable.fromFuture(anyFuture,1,TimeUnit.SECONDS).subscribe({ println(it) },{ print开发者_C学习ln("错误") },{ println("完成") }) }
observeOn & subscribeOn
但你以为这就结束了吗,不,rxjava提供了丰富的线程切换,observeOn & subscribeOn这两个方法就是用来指定在哪里运行,Schedulers.newThread()
表示在新线程,但rxjava实现的线程中,是守护线程,也就是当主线程退出后,他们也会自动退出,而在下面的例子中,如果在最后不加sleep,会导致主线程退出后,rxjava的所有线程在可能没执行完成后也将退出。
fun main() { Observable.create<String> { println(Thread.currentThread().isDaemon) it.onNext("hello") } .observeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .subscribe { println(Thread.currentThread().name) println(it) } Thread.sleep(10000) }
而如果想自定义线程,也是支持的。
fun createSchedulers(): Scheduler { return Schedulers.from { thread { it.run() } } } fun main() { Observable.create<String> { it.onNext("hello") } .observeOn(createSchedulers()) .subscribeOn(Schedulers.newThread()) .subscribe { println(Thread.currentThread().name) println(it) } }
Flowable
Flowable可以看成Observable新的实现,他支持背压,而他的API和Observable相似,在最后会介绍背压。
流式调用
我们已经熟悉了Java Stream的好处,所以在这编程里python简单看下rxjava的实现,用法都一样,如下,创建集合"a","b","c","d"
。
- map将所有item前添加字符"1"。
- filter将b结尾的数据过滤掉。
- skip忽略前n个数据。
fun main() { Flowable.fromIterable(mutableListOf("a","b","c","d")) .map { "1${it}" } .filter { !it.endsWith("b") } .skip(1) .subscribe { println(it) } }
所以最后收到的消息将是 1c、1d
。
当然他提供的这类API非常之多,就不介绍了。
背压
背压指的是遇到被观察者发送的消息太快,至于它的订阅者不能及时处理数据,而我们可以提供一种告诉被观察者遇到这种情况的策略。
这种场景有个前提条件,被观察者和订阅者在不同线程。
背压策略被定义在BackpressureStrategy,有五种。
MISSING
通过create方法创建的Flowable没有指定背压策略,不会对通过OnNext发送的数据做缓存或丢弃,需要下游通过背压操作符制定策略。
ERROR
如果缓存池数据超限,则抛出异常。
BUFFER
可以无限制添加数据。
DROP
如果缓存池满了,则丢弃。
LATEST
仅保留最新的onNext值,如果下游无法跟上,则覆盖之前的值。
如下,我们使用BUFFER策略,默认的缓存池大小是128,可以通过System.setProperty("rx3.buffer-size","5")
指定,而这个策略会导致只有缓存池不满的情况下,才会生产数据并发送给订阅者。
fun main() { System.setProperty("rx3.buffer-size","5") Observable.interval(1,TimeUnit.MILLISECONDS) .toFlowable(BackpressureStrategy.BUFFER) .map { User(1) } .observeOn(Schedulers.newThread()) .subscribe { Thread.sleep(1000) println("hander $it") } Thread.sleep(100000) }
而如果我们改成DROP,那么最终只有5条数据被消费,其他全部丢弃。
fun main() { js System.setProperty("rx3.buffer-size","5") Observable.range(1,999) .toFjslowable(BackpressureStrategy.DROP) .map { User(1) } .observeOn(Schedulers.newThread()) .subscribe { Thread.sleep(1000) println("hander $it") } Thread.sleep(100000) }
其他就不做demo了。
以上就是Kotlin下Rxjava的基础用法及流式调用示例详解的详细内容,更多关于Kotlin Rxjava的资料请关注我们其它相关文章!
精彩评论