utils.js 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. "use strict";
  5. module.metadata = {
  6. "stability": "unstable"
  7. };
  8. let { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core");
  9. // This module provides set of high order function for working with event
  10. // streams (streams in a NodeJS style that dispatch data, end and error
  11. // events).
  12. // Function takes a `target` object and returns set of implicit references
  13. // (non property references) it keeps. This basically allows defining
  14. // references between objects without storing the explicitly. See transform for
  15. // more details.
  16. let refs = (function() {
  17. let refSets = new WeakMap();
  18. return function refs(target) {
  19. if (!refSets.has(target)) refSets.set(target, new Set());
  20. return refSets.get(target);
  21. };
  22. })();
  23. function transform(input, f) {
  24. let output = {};
  25. // Since event listeners don't prevent `input` to be GC-ed we wanna presrve
  26. // it until `output` can be GC-ed. There for we add implicit reference which
  27. // is removed once `input` ends.
  28. refs(output).add(input);
  29. const next = data => receive(output, data);
  30. once(output, "start", () => start(input));
  31. on(input, "error", error => emit(output, "error", error));
  32. on(input, "end", function() {
  33. refs(output).delete(input);
  34. end(output);
  35. });
  36. on(input, "data", data => f(data, next));
  37. return output;
  38. }
  39. // High order event transformation function that takes `input` event channel
  40. // and returns transformation containing only events on which `p` predicate
  41. // returns `true`.
  42. function filter(input, predicate) {
  43. return transform(input, function(data, next) {
  44. if (predicate(data))
  45. next(data);
  46. });
  47. }
  48. exports.filter = filter;
  49. // High order function that takes `input` and returns input of it's values
  50. // mapped via given `f` function.
  51. const map = (input, f) => transform(input, (data, next) => next(f(data)));
  52. exports.map = map;
  53. // High order function that takes `input` stream of streams and merges them
  54. // into single event stream. Like flatten but time based rather than order
  55. // based.
  56. function merge(inputs) {
  57. let output = {};
  58. let open = 1;
  59. let state = [];
  60. output.state = state;
  61. refs(output).add(inputs);
  62. function end(input) {
  63. open = open - 1;
  64. refs(output).delete(input);
  65. if (open === 0) emit(output, "end");
  66. }
  67. const error = e => emit(output, "error", e);
  68. function forward(input) {
  69. state.push(input);
  70. open = open + 1;
  71. on(input, "end", () => end(input));
  72. on(input, "error", error);
  73. on(input, "data", data => emit(output, "data", data));
  74. }
  75. // If `inputs` is an array treat it as a stream.
  76. if (Array.isArray(inputs)) {
  77. inputs.forEach(forward);
  78. end(inputs);
  79. }
  80. else {
  81. on(inputs, "end", () => end(inputs));
  82. on(inputs, "error", error);
  83. on(inputs, "data", forward);
  84. }
  85. return output;
  86. }
  87. exports.merge = merge;
  88. const expand = (inputs, f) => merge(map(inputs, f));
  89. exports.expand = expand;
  90. const pipe = (from, to) => on(from, "*", emit.bind(emit, to));
  91. exports.pipe = pipe;
  92. // Shim signal APIs so other modules can be used as is.
  93. const receive = (input, message) => {
  94. if (input[receive])
  95. input[receive](input, message);
  96. else
  97. emit(input, "data", message);
  98. input.value = message;
  99. };
  100. receive.toString = () => "@@receive";
  101. exports.receive = receive;
  102. exports.send = receive;
  103. const end = input => {
  104. if (input[end])
  105. input[end](input);
  106. else
  107. emit(input, "end", input);
  108. };
  109. end.toString = () => "@@end";
  110. exports.end = end;
  111. const stop = input => {
  112. if (input[stop])
  113. input[stop](input);
  114. else
  115. emit(input, "stop", input);
  116. };
  117. stop.toString = () => "@@stop";
  118. exports.stop = stop;
  119. const start = input => {
  120. if (input[start])
  121. input[start](input);
  122. else
  123. emit(input, "start", input);
  124. };
  125. start.toString = () => "@@start";
  126. exports.start = start;
  127. const lift = (step, ...inputs) => {
  128. let args = null;
  129. let opened = inputs.length;
  130. let started = false;
  131. const output = {};
  132. const init = () => {
  133. args = [...inputs.map(input => input.value)];
  134. output.value = step(...args);
  135. };
  136. inputs.forEach((input, index) => {
  137. on(input, "data", data => {
  138. args[index] = data;
  139. receive(output, step(...args));
  140. });
  141. on(input, "end", () => {
  142. opened = opened - 1;
  143. if (opened <= 0)
  144. end(output);
  145. });
  146. });
  147. once(output, "start", () => {
  148. inputs.forEach(start);
  149. init();
  150. });
  151. init();
  152. return output;
  153. };
  154. exports.lift = lift;
  155. const merges = inputs => {
  156. let opened = inputs.length;
  157. let output = { value: inputs[0].value };
  158. inputs.forEach((input, index) => {
  159. on(input, "data", data => receive(output, data));
  160. on(input, "end", () => {
  161. opened = opened - 1;
  162. if (opened <= 0)
  163. end(output);
  164. });
  165. });
  166. once(output, "start", () => {
  167. inputs.forEach(start);
  168. output.value = inputs[0].value;
  169. });
  170. return output;
  171. };
  172. exports.merges = merges;
  173. const foldp = (step, initial, input) => {
  174. let output = map(input, x => step(output.value, x));
  175. output.value = initial;
  176. return output;
  177. };
  178. exports.foldp = foldp;
  179. const keepIf = (p, base, input) => {
  180. let output = filter(input, p);
  181. output.value = base;
  182. return output;
  183. };
  184. exports.keepIf = keepIf;
  185. function Input() {}
  186. Input.start = input => emit(input, "start", input);
  187. Input.prototype.start = Input.start;
  188. Input.end = input => {
  189. emit(input, "end", input);
  190. stop(input);
  191. };
  192. Input.prototype[end] = Input.end;
  193. exports.Input = Input;
  194. const $source = "@@source";
  195. const $outputs = "@@outputs";
  196. exports.outputs = $outputs;
  197. function Reactor(options={}) {
  198. const {onStep, onStart, onEnd} = options;
  199. if (onStep)
  200. this.onStep = onStep;
  201. if (onStart)
  202. this.onStart = onStart;
  203. if (onEnd)
  204. this.onEnd = onEnd;
  205. }
  206. Reactor.prototype.onStep = _ => void(0);
  207. Reactor.prototype.onStart = _ => void(0);
  208. Reactor.prototype.onEnd = _ => void(0);
  209. Reactor.prototype.onNext = function(present, past) {
  210. this.value = present;
  211. this.onStep(present, past);
  212. };
  213. Reactor.prototype.run = function(input) {
  214. on(input, "data", message => this.onNext(message, input.value));
  215. on(input, "end", () => this.onEnd(input.value));
  216. start(input);
  217. this.value = input.value;
  218. this.onStart(input.value);
  219. };
  220. exports.Reactor = Reactor;
  221. /**
  222. * Takes an object used as options with potential keys like 'onMessage',
  223. * used to be called `require('sdk/event/core').setListeners` on.
  224. * This strips all keys that would trigger a listener to be set.
  225. *
  226. * @params {Object} object
  227. * @return {Object}
  228. */
  229. function stripListeners (object) {
  230. return Object.keys(object || {}).reduce((agg, key) => {
  231. if (!EVENT_TYPE_PATTERN.test(key))
  232. agg[key] = object[key];
  233. return agg;
  234. }, {});
  235. }
  236. exports.stripListeners = stripListeners;