forkJoin.js 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import { Observable } from '../Observable';
  2. import { isArray } from '../util/isArray';
  3. import { map } from '../operators/map';
  4. import { isObject } from '../util/isObject';
  5. import { isObservable } from '../util/isObservable';
  6. import { from } from './from';
  7. export function forkJoin(...sources) {
  8. if (sources.length === 1) {
  9. const first = sources[0];
  10. if (isArray(first)) {
  11. return forkJoinInternal(first, null);
  12. }
  13. if (isObject(first) && !isObservable(first)) {
  14. const keys = Object.keys(first);
  15. return forkJoinInternal(keys.map(key => first[key]), keys);
  16. }
  17. }
  18. if (typeof sources[sources.length - 1] === 'function') {
  19. const resultSelector = sources.pop();
  20. sources = (sources.length === 1 && isArray(sources[0])) ? sources[0] : sources;
  21. return forkJoinInternal(sources, null).pipe(map((args) => resultSelector(...args)));
  22. }
  23. return forkJoinInternal(sources, null);
  24. }
  25. function forkJoinInternal(sources, keys) {
  26. return new Observable(subscriber => {
  27. const len = sources.length;
  28. if (len === 0) {
  29. subscriber.complete();
  30. return;
  31. }
  32. const values = new Array(len);
  33. let completed = 0;
  34. let emitted = 0;
  35. for (let i = 0; i < len; i++) {
  36. const source = from(sources[i]);
  37. let hasValue = false;
  38. subscriber.add(source.subscribe({
  39. next: value => {
  40. if (!hasValue) {
  41. hasValue = true;
  42. emitted++;
  43. }
  44. values[i] = value;
  45. },
  46. error: err => subscriber.error(err),
  47. complete: () => {
  48. completed++;
  49. if (completed === len || !hasValue) {
  50. if (emitted === len) {
  51. subscriber.next(keys ?
  52. keys.reduce((result, key, i) => (result[key] = values[i], result), {}) :
  53. values);
  54. }
  55. subscriber.complete();
  56. }
  57. }
  58. }));
  59. }
  60. });
  61. }
  62. //# sourceMappingURL=forkJoin.js.map