Rx и модификация: изменение параметра запроса ServiceApi в цикле запроса, запущенном с помощью repeatwhen или repeatuntill

Я использую модификацию и RxJava для своего приложения. Я выполняю запрос, который будет попадать на сервер каждую 1 секунду. Для этого я создаю API-интерфейс модифицированного сервиса с параметрами запроса, подписываюсь на возвращаемый наблюдаемый объект и использую repeatWhen.

Проблема в том, что каждый раз, когда выполняется запрос, я хочу изменить параметр запроса, отправленный с запросом (в основном, нужно сделать его динамическим). Как я могу этого добиться?

Например, вариант использования: sendLocation API отправляет текущее местоположение на сервер каждые 10 секунд. КАК это может быть достигнуто с помощью модификации и RxJava


person Amit    schedule 12.12.2016    source источник


Ответы (3)


Ответ @TassosBassoukos работает, если вы добавите задержку. В частности, вот полный пример использования API github, который чередует вызовы между двумя пользователями. Убедитесь, что это не длится слишком долго -

public interface GitHubService {
    @GET("/users/{user}/repos")
    Observable<String> getRepo(@Path("user") String user);
}

static String[] users = {"octocat", "square"};
static int i = 0;

static String getUser() {
    return users[i++ % 2];
 }

public void RxGitRepo() {
    OkHttpClient client = new OkHttpClient();

    Retrofit retrofit = new Retrofit.Builder()
      .client(client)
      .baseUrl("https://api.github.com")
      .addConverterFactory(ScalarsConverterFactory.create())
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
      .build();

   GitHubService gitHubService = retrofit.create(GitHubService.class);
   Observable
      .defer(() -> Observable.just(getUser()))
      .flatMap(user -> gitHubService.getRepo(user))
      .repeatWhen(done -> done.delay(2, TimeUnit.SECONDS))
      .subscribeOn(Schedulers.io())
      .subscribe(System.out::println);
}

В вашем случае вместо того, чтобы опрашивать местоположение каждые 10 секунд, вы можете настроить наблюдаемое в этом местоположении и использовать его для запуска обновлений. Используя такую ​​библиотеку, как android-reactiveLocation, вы можете получить Observable для потоковой передачи местоположений. Что-то вроде следующего непроверенного кода -

LocationRequest request = LocationRequest.create() 
                             .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
                             .setInterval(10000);

ReactiveLocationProvider locationProvider = new ReactiveLocationProvider(context);
Subscription subscription = locationProvider.getUpdatedLocation(request)
    .sample(10, TimeUnits.SECONDS)
    .flatMap(/* your retrofit call based on the observed location */)  
    .subscribe(/* subsciber for retrofit call results */);
person iagreen    schedule 13.12.2016
comment
спасибо iagreen. Хотел что-то общее, например, завтра вместо местоположения, я хочу отправить некоторые текстовые данные на сервер в параметре запроса, скажем, каждые 2 секунды, и эти текстовые данные постоянно меняются, так как мы можем этого добиться. Я понимаю, что мы должны создать запрос на модернизацию, предоставив параметры запроса, а затем подписаться на этот наблюдаемый, возвращаемый Сервисом, а затем воспроизвести это. Итак, когда я помещаю его в цикл, как я могу изменить параметр запроса в следующем тике - person Amit; 13.12.2016
comment
Я понимаю, что вы говорите, но вы выходите за пределы потока для получения дополнительных данных (местоположение, изменяющийся текст и т. д.). Я бы по-прежнему обертывал изменяющееся событие наблюдаемым и позволял ему управлять запросами к серверу — таким образом, данные, которые нам нужно отправить, естественным образом поступают в виде потока. Мне это кажется более естественным, но см. редактирование для более общего примера того, как вы пытаетесь это сделать. - person iagreen; 13.12.2016
comment
Я сделал точно так же, как в первом примере, который вы предоставили, и это работает. Спасибо за это. - person Amit; 13.12.2016

Как насчет этого?

    AtomicInteger requestParam = new AtomicInteger();

    apiCall.request(requestParam)
            .doOnNext(response -> requestParam.set(response.getRequestParam()))
            .repeatWhen(objectObservable -> objectObservable
                    .delay(1, TimeUnit.SECONDS)
                    .flatMap(o -> Observable.just(requestParam.get())
                    .takeWhile(integer -> /* condition */ )));
person Alexander Perfilyev    schedule 12.12.2016

Используйте что-то вроде этого:

Observable
.defer(() -> Observable.just(getLocation))
.flatMap(location -> doNetworkCall(location))
.repeatWhen(...)
person Tassos Bassoukos    schedule 12.12.2016