Android Rxjava3 使用场景详解

2022-07-15,,,

一、rxjava使用场景

为了模拟实际场景,从wanandroid网站找了二个接口,如下:(对wanandroid表示感谢!)

public interface apiserver {

    /**
     * 接口一
     * 获取文章列表
     * @return
     */
    @get("article/list/1/json")
    observable<baseresponse<articlelistresp>> getarticlelist();


    /**
     * 接口二
     * 获取热词
     * @return
     */
    @get("hotkey/json")
    observable<baseresponse<list<hotkeyresp.databean>>> gethotkey();

}

1、多任务嵌套回调

场景:比如调用接口一有回调后才能调用接口二,如果接口一调用失败不再调用接口二。下面是二种写法。

写法一,代码如下:

//为了看清楚代码,没有使用lambda简化
//接口一
observable<baseresponse<articlelistresp>> articlelist = apimanager.getinstance().getapiservice().getarticlelist();
//接口二
observable<baseresponse<list<hotkeyresp.databean>>> hotkey = apimanager.getinstance().getapiservice().gethotkey();
observable.just(articlelist)
        .subscribeon(schedulers.io())
        .observeon(androidschedulers.mainthread())
        .map(new function<observable<baseresponse<articlelistresp>>, observable<baseresponse<list<hotkeyresp.databean>>>>() {
            @override
            public observable<baseresponse<list<hotkeyresp.databean>>> apply(observable<baseresponse<articlelistresp>> baseresponseobservable) throws throwable {
               //处理第一个请求返回的数据
                if(baseresponseobservable!=null) mtv.settext(baseresponseobservable.blockingsingle().tostring());
                return hotkey;   //发起第二次网络请求
            }
        }).subscribeon(schedulers.io())
        .observeon(androidschedulers.mainthread())
        .subscribe(new consumer<observable<baseresponse<list<hotkeyresp.databean>>>>() {
            @override
            public void accept(observable<baseresponse<list<hotkeyresp.databean>>> baseresponseobservable) throws throwable {
                //处理第二次网络请求的结果
                if(baseresponseobservable!=null) mtvtwo.settext(baseresponseobservable.blockingsingle().tostring());
            }
        }, new consumer<throwable>() {
            @override
            public void accept(throwable throwable) throws throwable {
                //异常的处理:比如dialog的dismiss,缺省页展示等
                //注意:如果第一个网络请求异常,整个事件会中断,不会执行第二个网络请求,如果多个请求同理
                //但是请求成功的还是能正常处理
                logutil.e(throwable.tostring());
            }
        });

写法二,代码如下:

        //为了看清楚代码,没有使用lambda简化
        //接口一
        observable<baseresponse<articlelistresp>> articlelist = apimanager.getinstance().getapiservice().getarticlelist();
        //接口二
        observable<baseresponse<list<hotkeyresp.databean>>> hotkey = apimanager.getinstance().getapiservice().gethotkey();
        //请求第一个
        articlelist.subscribeon(schedulers.io())
                .observeon(androidschedulers.mainthread())
                .doonnext(new consumer<baseresponse<articlelistresp>>() {
                    @override
                    public void accept(baseresponse<articlelistresp> articlelistrespbaseresponse) throws throwable {
                        //处理第一个网络请求的结果
                        if(articlelistrespbaseresponse!=null) mtv.settext(articlelistrespbaseresponse.tostring());
                    }
                }).observeon(schedulers.io())
                .flatmap(new function<baseresponse<articlelistresp>, observablesource<baseresponse<list<hotkeyresp.databean>>>>() {
                    @override
                    public observablesource<baseresponse<list<hotkeyresp.databean>>> apply(baseresponse<articlelistresp> articlelistrespbaseresponse) throws throwable {
                        return hotkey;   //将第一个网络请求转换为第二个网络请求
                    }
                }).observeon(androidschedulers.mainthread())
                .subscribe(new consumer<baseresponse<list<hotkeyresp.databean>>>() {
                    @override
                    public void accept(baseresponse<list<hotkeyresp.databean>> listbaseresponse) throws throwable {
                        //处理第二次网络请求的结果
                        if(listbaseresponse!=null) mtvtwo.settext(listbaseresponse.tostring());
                    }
                }, new consumer<throwable>() {
                    @override
                    public void accept(throwable throwable) throws throwable {
                        //注意:如果第一个网络请求异常,整个事件会中断,不会执行第二个网络请求,多个请求同理
                        //但是在异常前面已经成功的网络请求还是能正常处理
                        //异常的处理:比如dialog的dismiss,缺省页展示等
                        logutil.e(throwable.tostring());
                    }
                });

注意异常处理和线程切换,其他细节代码和注释比较详细。

2、多任务合并处理

场景:接口一和接口二返回数据后一起处理。
代码如下:

private void ziprequest() {
    //为了看清楚代码,没有使用lambda简化
    //接口一
    observable<baseresponse<articlelistresp>> articlelist = apimanager.getinstance().getapiservice().getarticlelist();
    //接口二
    observable<baseresponse<list<hotkeyresp.databean>>> hotkey = apimanager.getinstance().getapiservice().gethotkey();
    observable.zip(articlelist, hotkey, this::combinotification)  //传入方法定义合并规则
            .subscribeon(schedulers.io())
            .observeon(androidschedulers.mainthread())
            .subscribe(new observer<string>() {
                @override
                public void onsubscribe(@nonnull disposable d) {

                }

                @override
                public void onnext(@nonnull string msg) {
                    if(!textutils.isempty(msg)){
                        mtv.settext(msg);
                    }
                }

                @override
                public void onerror(@nonnull throwable e) {

                }

                @override
                public void oncomplete() {

                }
            });

}

//合并的规则,以及定义合并的返回值
public string combinotification(baseresponse<articlelistresp> articlelistrespbaseresponse, baseresponse<list<hotkeyresp.databean>> hotkeyresponse) {
    //比如这里取二个接口数据tostring返回
    if (articlelistrespbaseresponse != null && hotkeyresponse != null) {
        return articlelistrespbaseresponse.tostring() + hotkeyresponse.tostring();
    }
    return null;
}

3、轮询

场景一:轮询固定的次数(间隔一定的时间),可以提前退出轮询,也可以等轮询到指定次数后自动退出,每次轮询必须等上一次轮询有结果后才能开始下一次轮询。

特别注意repeatwhen操作符,只有在repeatwhen的function方法中发射onnext事件,重复(repeat)才能触发,发射onerror或者oncomplite都会结束重复(repeat),基于这一点,通过flatmap操作符将事件转化为延迟一定时间的onnext事件,就达到了延时轮询的目的。至于onnext事件发射的什么不重要。

延伸:retrywhen的function方法发射onerror事件才会重试(retry)。

takeuntil操作符可以定义一定的条件,当达到条件时自动结束整个事件的目的,事件结束时会回调subscribe。

代码如下:

/**
 * 轮询
 * @param pollingtimes 轮询的次数
 */
private void timedpolling(int pollingtimes) {
    atomicinteger times = new atomicinteger();
    observable<baseresponse<articlelistresp>> articlelist = apimanager.getinstance().getapiservice().getarticlelist();
    articlelist.repeatwhen(new function<observable<object>, observablesource<?>>() {
        @override
        public observablesource<?> apply(observable<object> objectobservable) throws throwable {
            return objectobservable.flatmap(new function<object, observablesource<?>>() {  //转换事件
                @override
                public observablesource<?> apply(object o) throws throwable {
                    //这里发射延时的onnext事件,触发repeat动作,发射的0不会回调到下面的subscribe
                    return observable.just(0).delay(2, timeunit.seconds);  
                }
            });
        }
    }).subscribeon(schedulers.io())
            .observeon(androidschedulers.mainthread())
            //takeuntil定义了二个结束条件:前面是达到了轮询的次数,后面是网络请求返回了成功,当然也可以写成代码块做其他的返回判断
            .takeuntil(response -> times.incrementandget() >= pollingtimes || response.geterrorcode() == 0)
            .subscribe(new observer<baseresponse<articlelistresp>>() {
                @override
                public void onsubscribe(@nonnull disposable d) {
                   
                }

                @override
                public void onnext(@nonnull baseresponse<articlelistresp> articlelistrespbaseresponse) {
                    
                }

                @override
                public void onerror(@nonnull throwable e) {
                   
                }

                @override
                public void oncomplete() {
                   
                }
            });
}

如果想改成不限制次数的也比较简单。

场景二:轮询固定的次数(间隔一定的时间),可以提前退出轮询,也可以等轮询到指定次数后自动退出,这里的轮询不关心上次请求的结果。
代码如下:

/**
 * 轮询一定的次数
 * @param polltimes 轮询次数
 */
private void timedpolling(int polltimes) {
    //网络请求
    observable<baseresponse<articlelistresp>> articlelist = apimanager.getinstance().getapiservice().getarticlelist();
    //返回值用于取消轮询
    msubscribe = observable.intervalrange(0, polltimes, 0, 2000, timeunit.milliseconds)
            .flatmap(new function<long, observablesource<baseresponse<articlelistresp>>>() {
                @override
                public observablesource<baseresponse<articlelistresp>> apply(long along) throws throwable {
                    return articlelist;  //转换事件
                }
            }).subscribeon(schedulers.io())
            .observeon(androidschedulers.mainthread())
            .subscribe(new consumer<baseresponse<articlelistresp>>() {
                @override
                public void accept(baseresponse<articlelistresp> listbaseresponse) throws throwable {
                    //如果满足了退出轮询的条件,可以调用下面的方法退出轮询
                    //msubscribe.dispose();
                }
            });
}

思路是定时发射事件,然后将事件转化为网络请求。同理可以写出不限次数的轮询。

场景三:不限次数轮询(间隔一定的时间),不关心上次请求的结果。

假如接口返回的code为0时需要取消轮询,代码如下:

observable<baseresponse<articlelistresp>> articlelist = apimanager.getinstance().getapiservice().getarticlelist();
//返回值用于取消轮询
msubscribe = observable.interval(0, 2000, timeunit.milliseconds)
        .flatmap(new function<long, observablesource<baseresponse<articlelistresp>>>() {
            @override
            public observablesource<baseresponse<articlelistresp>> apply(long along) throws throwable {
                return articlelist;
            }
        })
        .takeuntil(response -> response.geterrorcode() == 0)  //使用takeuntil自动取消发射
        .subscribeon(schedulers.io())
        .observeon(androidschedulers.mainthread())
        .subscribe(new consumer<baseresponse<articlelistresp>>() {
            @override
            public void accept(baseresponse<articlelistresp> articlelistrespbaseresponse) throws throwable {
                //处理回调
            }
        }, new consumer<throwable>() {
            @override
            public void accept(throwable throwable) throws throwable {
               //处理异常
            }
        });

如果是其他取消条件,也可以写在代码块里:

.takeuntil(response -> {
    //处理接口数据,然后判断是返回true还是false,true:停止发射,false:继续发射
    return false;
})  //使用takeuntil自动取消发射

不管何种轮询,注意在ondestroy中取消。

4、其他小场景

1)倒计时

验证码的倒计时功能,代码如下:

/**
 * 倒计时
 * @param countdownseconds 倒计时的秒数
 */
private void countdown(int countdownseconds) {
    observable.intervalrange(0, countdownseconds, 0, 1000, timeunit.milliseconds)
            .map(new function<long, string>() {
                @override
                public string apply(long along) throws throwable {
                    return (countdownseconds - along) + "s后重新获取";
                }
            }).observeon(androidschedulers.mainthread())
            .subscribe(new observer<string>() {
                @override
                public void onsubscribe(@nonnull disposable d) {
                    mtv.setenabled(false);
                }

                @override
                public void onnext(@nonnull string s) {
                    mtv.settext(s);
                }

                @override
                public void onerror(@nonnull throwable e) {
                    mtv.setenabled(true);
                    mtv.settext("获取验证码");
                }

                @override
                public void oncomplete() {
                    mtv.settext("获取验证码");
                    mtv.setenabled(true);
                }
            });
}

效果

2)打字机效果

几行代码实现打字机效果:

@requiresapi(api = build.version_codes.m)  //6.0
public class daziview extends view {
    private textpaint mtextpaint;
    private staticlayout mstaticlayout;

    public daziview(context context) {
        super(context,null);
    }

    public daziview(context context, @nullable attributeset attrs) {
        super(context, attrs);
        inittextpaint();
    }

    /**
     * 初始化画笔
     */
    private void inittextpaint() {
        mtextpaint = new textpaint(paint.anti_alias_flag);
        mtextpaint.settextsize(48);
        mtextpaint.setcolor(color.parsecolor("#000000"));
    }

    /**
     * 绘制
     * @param content
     */
  public void drawtext(string content){
        if(!textutils.isempty(content)){
            observable.intervalrange(0,content.length()+1,0,150, timeunit.milliseconds)
                    .subscribe(new consumer<long>() {
                @override
                public void accept(long along) throws throwable {
                //动态改变文本长度
                    mstaticlayout = staticlayout.builder.obtain(content, 0, along.intvalue(), mtextpaint, getwidth())
                            .build();
                    invalidate();
                }
            });
        }
  }

    @override
    protected void ondraw(canvas canvas) {
        super.ondraw(canvas);
        //绘制文本
        mstaticlayout.draw(canvas);
    }
}

文本

<string name="dazi_content">\u3000\u3000你好,这是一个打字机,这是一个打字机这是一个打字机这是一个打字机。\n\u3000\u3000换行空格继续打印。</string>

二、结合rxbinding的使用场景

rxbinding 提供的绑定能够将任何 android view 事件转换为 observable。

一旦将 view 事件转换为 observable ,将发射数据流形式的 ui 事件,我们就可以订阅这个数据流,这与订阅其他 observable 方式相同。

引入下面的库:

implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'

1、点击事件防抖

点击事件的写法:

rxview.clicks(button)   //button为控件
        .subscribe(new consumer<unit>() {
            @override
            public void accept(unit unit) throws throwable {
                //点击事件
            }
        });

长点击事件的写法:

rxview.longclicks(button)
        .subscribe(new consumer<unit>() {
            @override
            public void accept(unit unit) throws throwable {
                //长点击自动响应,不需要等放开手指
            }
        });

点击防抖事件的写法:

rxview.clicks(button)
        .throttlefirst(1000, timeunit.milliseconds)   //一秒以内第一次点击事件有效
        .subscribe(new consumer<unit>() {
            @override
            public void accept(unit unit) throws throwable {
                //点击事件
            }
        });

2、输入搜索优化

rxtextview.textchanges(edittext)  //传入edittext控件
        .debounce(1000,timeunit.milliseconds)  //一秒内没有新的事件时,取最后一次事件发射
        .skip(1)    //跳过第一次edittext的空内容
        .subscribeon(androidschedulers.mainthread())
        .subscribe(new consumer<charsequence>() {
            @override
            public void accept(charsequence charsequence) throws throwable {
                //edittext的内容
            }
        }, new consumer<throwable>() {
            @override
            public void accept(throwable throwable) throws throwable {

            }
        });

3、联合判断

combinelatest 操作符将多个 observable 发射的事件组装起来,然后再发射组装后的新事件。

observable<charsequence> observableedittext = rxtextview.textchanges(edittext).skip(1);
 observable<charsequence> observableedittexttwo =   rxtextview.textchanges(edittext_two).skip(1);

 observable.combinelatest(observableedittext, observableedittexttwo, new bifunction<charsequence, charsequence, boolean>() {
     @override
     public boolean apply(charsequence charsequence, charsequence charsequence2) throws throwable {
         if(!textutils.isempty(charsequence)&&!textutils.isempty(charsequence2)){
             return true;
         }
         return false;
     }
 }).subscribe(new consumer<boolean>() {
     @override
     public void accept(boolean aboolean) throws throwable {
             //todo 其他处理
     }
 });

三、防泄漏

1、observable.unsubscribeon

observable<integer> just = observable.just(0);
just.subscribeon(schedulers.io()).unsubscribeon(schedulers.io());  //取消事件,防止泄漏

2、disposable.dispose

这个比较常用。

3、compositedisposable

对订阅事件统一管理

compositedisposable compositedisposable = new compositedisposable();
compositedisposable.add(disposableone);
compositedisposable.add(disposabletwo);
compositedisposable.clear();

参考了以下文章,表示感谢:

最适合使用 rxjava 处理的四种场景

android rxjava应用:网络请求轮询(有条件)

rxjava3文档级教程三: 实战演练

到此这篇关于android rxjava3 使用场景详解的文章就介绍到这了,更多相关android rxjava3 使用场景内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

《Android Rxjava3 使用场景详解.doc》

下载本文的Word格式文档,以方便收藏与打印。