Retrofit+Rxjava 服务器 IP 轮询重试机制实现

本贴最后更新于 3116 天前,其中的信息可能已经天翻地覆

app在启动时会请求一些配置信息,其中就包括ip的路由表,将这份路由表存储到本地,至于是sp还是对象持久化抑或是其他方式,可根据实际情况自行选择。

因为项目网络层由Retrofit+Rxjava+Okhttp实现,Retrofit运行时无法改变baseUrl,即使可以通过反射的方式来改变baseUrl,也无法对已经生成的service对象起作用,而且我的项目中所有service对象都通过Dagger2注入,所以最终使用了这样一种方式。

url交给UrlManager来管理

public class UrlManager { 
   public static final String[] HOST_SITE = {"https://xxx/api/",xxx}; 
   public static final String[] HOST_WEB = {"https://xxx/",xxx};  
   public static final String HOST_SITE_DEBUG = "https://xxx/api/";  
   public static final String HOST_WEB_DEBUG = "https://xxx/"; 
   public static List<String> list;   
   public static Random random = new Random();  
   public static String getHostSite() {   
      if(BuildConfig.IS_DEBUG) {        
         return processUrl(HOST_SITE_DEBUG);
      } else {
        String host = getDynamicHost(); 
        if (!TextUtils.isEmpty(host)) return host;  
        return HOST_SITE[random.nextInt(HOST_SITE.length)];
      }
  } 
               
  @Nullable
  private static String getDynamicHost() {
      int index = (int) SPUtils.get(NeutronApplication.getContext(), Constants.URL, 0);
      if (list != null && list.size() > 0 && index < list.size())  
          return list.get(index);    
          return null;
  }
  public static String getHostWeb() { 
     if (BuildConfig.IS_DEBUG) {         
         return processUrl(HOST_WEB_DEBUG);
     } else { 
         String host = getDynamicHost();  
         if (!TextUtils.isEmpty(host)) return host;    
         return HOST_WEB[random.nextInt(HOST_WEB.length)];
     }
  } 
   public static void setHosts(List<String> list) {
        UrlManager.list = list;
        RxHelper.setCounterAttempts(list.size());
  }  
  public static void updateUrlIndex(int i) {
       if (list != null && i >= list.size())
        i = 0;
      SPUtils.put(NeutronApplication.getContext(),Constants.URL, i);
  }   
   public static void updateUrlIndex() {  
      int o = (int) SPUtils.get(NeutronApplication.getContext(), Constants.URL, 0);
       updateUrlIndex(o + 1);
 }
}

app启动时拉取到配置后设置UrlManager中的路由表,然后每次根据索引去路由表动态拿请求地址,那路由索引由谁来控制呢?
因为我将项目中的rxjava抽取了一层RxHelper,所以这件事就交给RxHelper来干了,可以覆盖所有的网络请求。

public class RxHelper {  
  private static final int COUNTER_START = 0;  
  private static int COUNTER_ATTEMPTS = 3;  
  public static void setCounterAttempts(int counterAttempts) {
        COUNTER_ATTEMPTS = counterAttempts;
  } 
  public static <T> rx.Observable.Transformer<T, T> handleResult() { 
         return tObservable -> tObservable
                .flatMap(RxHelper::createData)
                .retryWhen(observable -> observable.compose(zipWithFlatMap()))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io());
  }   
  public static <T> rx.Observable.Transformer<T,T> handleResultWithOutRetryPolicy(){
          return tObservable -> tObservable.flatMap(RxHelper::createData)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io());
  }
  public static <T> Observable.Transformer<T, Long> zipWithFlatMap() { 
         return observable ->
                observable.zipWith(Observable.range(COUNTER_START, COUNTER_ATTEMPTS),
                        (t, repeatAttempt) -> repeatAttempt)
                        .flatMap(new Func1<Integer, Observable<Long>>() {  
                            @Override
                            public Observable<Long> call(Integer repeatAttempt) {
                                UrlManager.updateUrlIndex(repeatAttempt);  
                                return Observable.timer(repeatAttempt * 200, TimeUnit.MILLISECONDS);
                            }
                        });
  }   
  private static <T> Observable<T> createData(final T t) { 
         return Observable.create(new Observable.OnSubscribe<T>() {  
            @Override
            public void call(Subscriber<? super T> subscriber) {  
                try {
                    subscriber.onNext(t);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    LogUtils.logw("Rxhelper: " + e.toString());
                    subscriber.onError(e);
                }
            }
        });
    }
}

这样在每次请求错误时,会递增路由表索引,继续下次请求,轮询的间隔为Observable.timer(repeatAttempt * 200, TimeUnit.MILLISECONDS);
对RxJava的retryWhen不理解的同学请移步对RxJava中.repeatWhen()和.retryWhen()操作符的思考

之前也说了,retrofit不能修改baseUrl,反射的方式也不适合我的项目,至于利用builder生成新的retrofit对象的方式更不考虑了,那我是怎么实现运行时修改请求地址的呢?别忘了okhttp是可以添加拦截器的,在OkHttpIntercepter中:

public class OkHttpInterceptor implements Interceptor {   
    @Override
    public Response intercept(Chain chain) throws IOException {     
       //配置request
        Request request = chain.request();
        Request.Builder requestBuilder = request.newBuilder();
        String url = UrlManager.getHostSite();
        Uri parse = Uri.parse(url);
        String host = parse.getHost();
        HttpUrl httpUrl = request.url().newBuilder().host(host).build();
        requestBuilder.url(httpUrl);
        Response.Builder responseBuilder = chain.proceed(requestBuilder.build()).newBuilder();
        Response response = responseBuilder.build();       
        return response;
    }
}

拦截请求的url,修改其host,这样整个流程就ok了,http的各种错误码的处理也是可以在拦截器中统一处理的,至于其他健壮性的考虑此处就不做过多阐述了。

有同学问我,如果想处理最后一次error通知怎么办呢?可以这样做,修改过的RxHelper如下:

public class RxHelper {  
     private static final int COUNTER_START = 0; 
     private static int COUNTER_ATTEMPTS = 3;   
     public static void setCounterAttempts(int counterAttempts) {
        COUNTER_ATTEMPTS = counterAttempts;
     }   
    public static <T> rx.Observable.Transformer<T, T> handleResult() { 
           return tObservable -> tObservable
                .flatMap(RxHelper::createData)
                .retryWhen(error -> delayedRetry(error))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io());
    }    
    //猫腻主要在这个方法
    private static Observable<Object> delayedRetry(Observable<? extends Throwable> error) {
            return error.zipWith(Observable.range(COUNTER_START, COUNTER_ATTEMPTS + 1),
                (i, repeatAttempt) -> repeatAttempt)
                .flatMap(o -> {
                    UrlManager.updateUrlIndex(o);
                    LogUtils.logw("repeat: " + o);    
                    return o < COUNTER_ATTEMPTS ? Observable.timer(o * 200, TimeUnit.MILLISECONDS)
                            : error.flatMap(Observable::error);
                });
    }  
    public static <T> rx.Observable.Transformer<T, T> handleResultWithOutRetryPolicy() {
            return tObservable -> tObservable.flatMap(RxHelper::createData)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io());
    }   
    private static <T> Observable<T> createData(final T t) {   
         return Observable.create(new Observable.OnSubscribe<T>() {    
            @Override
            public void call(Subscriber<? super T> subscriber) {  
                try {
                    subscriber.onNext(t);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });
    }
}

转自:https://gold.xitu.io/post/584e6d5961ff4b0058e9240a



  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1064 引用 • 3456 回帖 • 150 关注
  • 郑禄秀
    9 引用 • 8 回帖
  • istarvip
    9 引用 • 2 回帖
  • 猿码阁
    19 引用 • 14 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
idxiu
show me the code, change the world! 北京

推荐标签 标签

  • IPFS

    IPFS(InterPlanetary File System,星际文件系统)是永久的、去中心化保存和共享文件的方法,这是一种内容可寻址、版本化、点对点超媒体的分布式协议。请浏览 IPFS 入门笔记了解更多细节。

    20 引用 • 245 回帖 • 232 关注
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖 • 3 关注
  • Kubernetes

    Kubernetes 是 Google 开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。

    118 引用 • 54 回帖 • 3 关注
  • Sillot

    Insights(注意当前设置 master 为默认分支)

    汐洛彖夲肜矩阵(Sillot T☳Converbenk Matrix),致力于服务智慧新彖乄,具有彖乄驱动、极致优雅、开发者友好的特点。其中汐洛绞架(Sillot-Gibbet)基于自思源笔记(siyuan-note),前身是思源笔记汐洛版(更早是思源笔记汐洛分支),是智慧新录乄终端(多端融合,移动端优先)。

    主仓库地址:Hi-Windom/Sillot

    文档地址:sillot.db.sc.cn

    注意事项:

    1. ⚠️ 汐洛仍在早期开发阶段,尚不稳定
    2. ⚠️ 汐洛并非面向普通用户设计,使用前请了解风险
    3. ⚠️ 汐洛绞架基于思源笔记,开发者尽最大努力与思源笔记保持兼容,但无法实现 100% 兼容
    29 引用 • 25 回帖 • 130 关注
  • NetBeans

    NetBeans 是一个始于 1997 年的 Xelfi 计划,本身是捷克布拉格查理大学的数学及物理学院的学生计划。此计划延伸而成立了一家公司进而发展这个商用版本的 NetBeans IDE,直到 1999 年 Sun 买下此公司。Sun 于次年(2000 年)六月将 NetBeans IDE 开源,直到现在 NetBeans 的社群依然持续增长。

    78 引用 • 102 回帖 • 708 关注
  • etcd

    etcd 是一个分布式、高可用的 key-value 数据存储,专门用于在分布式系统中保存关键数据。

    6 引用 • 26 回帖 • 545 关注
  • 友情链接

    确认过眼神后的灵魂连接,站在链在!

    24 引用 • 373 回帖
  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    93 引用 • 122 回帖 • 619 关注
  • 大疆创新

    深圳市大疆创新科技有限公司(DJI-Innovations,简称 DJI),成立于 2006 年,是全球领先的无人飞行器控制系统及无人机解决方案的研发和生产商,客户遍布全球 100 多个国家。通过持续的创新,大疆致力于为无人机工业、行业用户以及专业航拍应用提供性能最强、体验最佳的革命性智能飞控产品和解决方案。

    2 引用 • 14 回帖
  • Q&A

    提问之前请先看《提问的智慧》,好的问题比好的答案更有价值。

    10057 引用 • 45698 回帖 • 69 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 38 关注
  • OpenResty

    OpenResty 是一个基于 NGINX 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。

    17 引用 • 50 关注
  • Anytype
    3 引用 • 31 回帖 • 28 关注
  • webpack

    webpack 是一个用于前端开发的模块加载器和打包工具,它能把各种资源,例如 JS、CSS(less/sass)、图片等都作为模块来使用和处理。

    42 引用 • 130 回帖 • 253 关注
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    187 引用 • 831 回帖
  • uTools

    uTools 是一个极简、插件化、跨平台的现代桌面软件。通过自由选配丰富的插件,打造你得心应手的工具集合。

    7 引用 • 28 回帖
  • Ruby

    Ruby 是一种开源的面向对象程序设计的服务器端脚本语言,在 20 世纪 90 年代中期由日本的松本行弘(まつもとゆきひろ/Yukihiro Matsumoto)设计并开发。在 Ruby 社区,松本也被称为马茨(Matz)。

    7 引用 • 31 回帖 • 262 关注
  • OpenStack

    OpenStack 是一个云操作系统,通过数据中心可控制大型的计算、存储、网络等资源池。所有的管理通过前端界面管理员就可以完成,同样也可以通过 Web 接口让最终用户部署资源。

    10 引用 • 1 关注
  • RemNote
    2 引用 • 16 回帖 • 24 关注
  • 宕机

    宕机,多指一些网站、游戏、网络应用等服务器一种区别于正常运行的状态,也叫“Down 机”、“当机”或“死机”。宕机状态不仅仅是指服务器“挂掉了”、“死机了”状态,也包括服务器假死、停用、关闭等一些原因而导致出现的不能够正常运行的状态。

    13 引用 • 82 回帖 • 77 关注
  • Sphinx

    Sphinx 是一个基于 SQL 的全文检索引擎,可以结合 MySQL、PostgreSQL 做全文搜索,它可以提供比数据库本身更专业的搜索功能,使得应用程序更容易实现专业化的全文检索。

    1 引用 • 227 关注
  • PWA

    PWA(Progressive Web App)是 Google 在 2015 年提出、2016 年 6 月开始推广的项目。它结合了一系列现代 Web 技术,在网页应用中实现和原生应用相近的用户体验。

    14 引用 • 69 回帖 • 183 关注
  • 持续集成

    持续集成(Continuous Integration)是一种软件开发实践,即团队开发成员经常集成他们的工作,通过每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测试)来验证,从而尽早地发现集成错误。

    15 引用 • 7 回帖
  • Solidity

    Solidity 是一种智能合约高级语言,运行在 [以太坊] 虚拟机(EVM)之上。它的语法接近于 JavaScript,是一种面向对象的语言。

    3 引用 • 18 回帖 • 443 关注
  • Electron

    Electron 基于 Chromium 和 Node.js,让你可以使用 HTML、CSS 和 JavaScript 构建应用。它是一个由 GitHub 及众多贡献者组成的活跃社区共同维护的开源项目,兼容 Mac、Windows 和 Linux,它构建的应用可在这三个操作系统上面运行。

    15 引用 • 136 回帖 • 1 关注
  • Access
    1 引用 • 3 回帖 • 1 关注
  • 快应用

    快应用 是基于手机硬件平台的新型应用形态;标准是由主流手机厂商组成的快应用联盟联合制定;快应用标准的诞生将在研发接口、能力接入、开发者服务等层面建设标准平台;以平台化的生态模式对个人开发者和企业开发者全品类开放。

    15 引用 • 127 回帖 • 3 关注