0

In RxJS (ES6) I'm trying to get the result in a sequentially series of operations in a single Observable.

I'm not sure If I have to use a forkJoin (but I would like the operations to be executed sequentially) or a concat operator (but I would like to just be notified at the end when all of them are executed).

I TRIED:

forkJoin

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return forkJoin(batch);
        })
    );
  }

concat

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return concat(batch);
        })
    );
  }

In both cases I can't ever see the observables coming from the HttpClient doing their job (no http request sent), but I can see the logging.

The called method in the batch it's the following:

  updateProduct(product: Product) {
      console.log('calling...');
      const options = this.getDefaultRequestOptions();
      return this.http.post('update_product', product, options);
  }

I call the sync() function like the following:

this.productService.sync().subscribe(() => {
  console.log('sync done');
}, error => this.utils.handleError(error));

Output:

(8) calling...
sync done

but no HTTP request starting.

If I do the same outside the forkJoin/concat in the pipe map I can see the HTTP request sent.

sync(): Observable<any> {

    const product = new Product(null, 'Title', 'Desc.', 'CODE');
    return this.api.updateProduct(product);

}

What am I missing?

--- UPDATE - SOLUTION ---

sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        flatMap(products => {

            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            console.log(batch);

            return concat(...batch);
            // return forkJoin(batch);
        })
    );

2 Answers 2

2

Try

sync(): Observable<any> {
  return from(this.db.getProducts()).pipe(
    map(products => {
      const batch: Observable<any>[] = [];
      for (const product of products) {
        batch.push(this.api.updateProduct(product));
      }
     return batch;
   }),
   switchMap(batch => forkJoin(batch)),
  )
}

This is assuming this.db.getProducts() is synchronous static data and not an observable/asynchronous data.

Then you can try doing

  this.productService.sync().subscribe(() => { console.log('sync done'); });

And see if any API calls are being made.

Sign up to request clarification or add additional context in comments.

Comments

0

Your problem is not forkJoin/concat here, it is using map to return another observable.

In your code, map would return Observable<Observable<any>>. Your IDE should highlight this stuff(don't use any, maybe it is a problem here).

Your solution would be to change map to concatMap(at least for test) and next chain should work.

Also, debug subscribe and its return value to see what you have.

3 Comments

Your sight it's right and fundamental to aim to correct resolution but the solution needs a bit more thoughts on. Thank you btw.
“What am I missing?” was the question. “Give a man a fish, and you feed him for a day. Teach a man to fish, and you feed him for a lifetime” - my answer. And for a full solution - please, provide stackblitz for reproduction(to save time). P.S. don’t forget to unsubscribe(prefer take(1)/first/takeUntil/AsyncPipe). Good luck ;)
Thank you a lot @andriishupta as in fact you're answer pointed out to the right spot to correct, also a deep study of the single operator was needed. I believe in this cases referring up to a doc it's essential. Also your last suggestion about unsubscriptions pros and patterns worths some consideration: medium.com/angular-in-depth/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.