Условная цепочка наблюдаемых

Я хочу асинхронно извлекать данные через несколько REST API. Я использую Retrofit на Android с расширением rxJava, т.е. я выполняю любой запрос GET, подписавшись на Observable.

Как я уже сказал, у меня несколько исходных API, поэтому, когда первый источник не дает желаемого результата, я хочу попробовать следующий, если это также не удается, снова попробовать следующий и так далее, пока не будут запрошены все источники или результат нашелся.

Я изо всех сил пытаюсь перевести этот подход в правильное использование Observables, так как я не знаю, какие операторы могут добиться такого поведения, и есть также некоторые ограничения, которые необходимо соблюдать:

  • когда результат найден, оставшиеся API, если таковые имеются, не следует запрашивать.
  • другие компоненты зависят от результата запроса, я хочу, чтобы они получали Observable при запуске запроса, чтобы этот Observable мог уведомить их о завершении запроса
  • Мне нужно сохранить ссылку на вышеупомянутый Observable, потому что один и тот же запрос может быть сделан более одного раза до его завершения, в этом случае я запускаю его только в первый раз, когда он требуется, а последующие запросы получают только Observable, который уведомляет, когда запрос законченный

Я начинал только с одного API для запроса и использовал следующее для запроса и последующего уведомления зависимых компонентов:

private Observable<String> loadData(int jobId) {

    final ConnectableObservable<String> result = Async
            .fromCallable(() -> getResult(jobId))
            .publish();

    getRestRequest()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    dataHolder -> {
                        if (dataHolder.getData() != null && !dataHolder.getData().isEmpty()) {
                            saveData(dataHolder.getData());
                        } else {
                            markNotFound(dataHolder);
                        }
                    },
                    error -> currentJobs.remove(jobId),
                    () -> {
                        currentJobs.remove(jobId);
                        result.connect();
                    });

    return result;
}

Этот код вызывался только для первого запроса, возвращаемый результат Observable затем сохранялся в currentJobs, а последующие запросы получали только Observable без повторного запуска запроса.

Любая помощь высоко ценится.


person DerBob    schedule 27.07.2016    source источник


Ответы (2)


Предполагая, что у вас есть набор наблюдаемых, которые повторно подключаются каждый раз, когда вы подписываетесь:

List<Observable<Result>> suppliers = ...

Тогда вам просто нужно сделать логическую вещь:

Observable<Result> results = Observable
          .from(suppliers)
          .concatMap(supplier -> supplier)
          .takeFirst(result -> isAcceptable(result))
          .cache()
person Tassos Bassoukos    schedule 27.07.2016

Используйте .onErrorResumeNext и предполагая, что каждый наблюдаемый сервис может возвращать 0 или 1 элемент, используйте first для выдачи ошибки, если элементы не выдаются:

Observable<T> a, b, c;
...
a.first().onErrorResumeNext(t -> b.first())
 .onErrorResumeNext(t -> c.first())
 .onErrorResumeNext(t -> d.first())
 ...
person Dave Moten    schedule 28.07.2016
comment
Когда один из запросов завершается без нахождения запрошенных данных, он возвращается с пустым dataHolder. Это не приводит к возникновению ошибки, поэтому этот подход не будет работать должным образом. - person DerBob; 28.07.2016
comment
Я обновил ответ, используя first, чтобы удовлетворить ваши дополнительные требования. - person Dave Moten; 29.07.2016