Image by Brett Hondow from Pixabay
I've recently finished a basic course on RxJS. The main reason was to use Redux-Observable after watching this video, "Netflix JavaScript Talks - RxJS + Redux + React = Amazing!" referred to by Nicolas Marcora.
A basic example shows how to do "ping/pong", which was simple to implement.
But the problem is that, when you ping more than once, multiple observables are created.
After hours of struggle, I asked on the official Gitter page for Redux-Observable.
I will share what's done in public (Learn in Public) as the Gitter conversation is private.
Error Demo
Below shows multiple "ping/pong" actions being dispatched as you click on "PING" button.

The code that generated the mess above is shown below.
| import { ofType } from "redux-observable"; | |
| import { mapTo, tap, delay } from "rxjs/operators"; | |
| export const PING = "PING"; | |
| export const PONG = "PONG"; | |
| export const ping = () => ({ type: PING }); | |
| export const pong = () => ({ type: PONG }); | |
| export const pingEpic = action$ => | |
| action$.pipe( | |
| ofType(PING), | |
| delay(1000), | |
| mapTo(pong()), | |
| tap(console.info) | |
| ); | |
| export const pongEpic = action$ => | |
| action$.pipe( | |
| ofType(PONG), | |
| delay(1000), | |
| mapTo(ping()), | |
| tap(console.info) | |
| ); | |
| export default function pingReducer(state = { isPinging: false }, action) { | |
| switch (action.type) { | |
| case "PING": | |
| return { isPinging: true }; | |
| case "PONG": | |
| return { isPinging: false }; | |
| default: | |
| return state; | |
| } | |
| } |
For being a new RxJS (, which is a prerequisite for redux-observable) user, I couldn't figure it after hours of reading documentation and playing around.
Kevin Ghadyani generously spent time to review the code and provide a solution for the issue.
| export const pingEpic = action$ => | |
| action$.pipe( | |
| ofType(PING), | |
| // 1️⃣ from "delay(1000)" to 👇 | |
| switchMap(() => timer(1000)), | |
| mapTo(pong()), | |
| tap(console.info) | |
| ); | |
| // 2️⃣ pongEpic stays the same. | |
| export const pongEpic = action$ => | |
| action$.pipe( | |
| ofType(PONG), | |
| delay(1000), | |
| mapTo(ping()), | |
| tap(console.info) | |
| ); |
1️⃣ uses switchMap, which implicitly cancels previous observable. So when PING button is clicked while the PING action is already dispatched, previous timer observable is canceled and a new one is started.
Kevin's forked Sandbox shows the fix.
Kevin also shows how to accomplish the same task using takeUntil.
| // Original code | |
| const pingEpic = ( | |
| action$, | |
| ) => ( | |
| action$ | |
| .pipe( | |
| ofType(PING), | |
| delay(1000), | |
| map(pong), | |
| ) | |
| ) | |
| // Accepted solution. | |
| const pingEpic = ( | |
| action$, | |
| ) => ( | |
| action$ | |
| .pipe( | |
| ofType(PING), | |
| switchMap(() => ( | |
| timer(1000) | |
| )), | |
| map(pong), | |
| ) | |
| ) | |
| // This kills the entire observable when another PING comes in. Not what you want. | |
| const pingEpic = ( | |
| action$, | |
| ) => ( | |
| action$ | |
| .pipe( | |
| ofType(PING), | |
| takeUntil( | |
| action$ | |
| .pipe( | |
| ofType(PING) | |
| ) | |
| ), | |
| delay(1000), | |
| map(pong), | |
| ) | |
| ) | |
| // Instead of using `switchMap`, you could use `takeUntil` and `mergeMap` just fine. | |
| const pingEpic = ( | |
| action$, | |
| ) => ( | |
| action$ | |
| .pipe( | |
| ofType(PING), | |
| mergeMap(() => ( | |
| timer(1000) | |
| .pipe( | |
| takeUntil( | |
| action$ | |
| .pipe( | |
| ofType(PING) | |
| ) | |
| ) | |
| ) | |
| )), | |
| map(pong), | |
| ) | |
| ) | |
| // I use namespaces often for reusable epics and reducers (https://itnext.io/the-secret-to-using-redux-createnamespacereducer-d3fed2ccca4a) | |
| // They comes in handy when you're using one epic for multiple uses. | |
| const pingEpic = ( | |
| action$, | |
| ) => ( | |
| action$ | |
| .pipe( | |
| ofType(CREATE_PING_LISTENER) | |
| mergeMap(({ | |
| namespace, | |
| }) => ( | |
| action$ | |
| .pipe( | |
| ofType(PING), | |
| ofNamespace(namespace), | |
| takeUntil( | |
| action$ | |
| .pipe( | |
| ofType(STOP_PING_LISTENER) | |
| ) | |
| ), | |
| switchMap(() => ( | |
| timer(1000) | |
| )), | |
| mapTo({ namespace }), | |
| map(pong), | |
| )), | |
| ) | |
| ) |