123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
- "use strict";
- module.metadata = {
- "stability": "unstable"
- };
- let { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core");
- // This module provides set of high order function for working with event
- // streams (streams in a NodeJS style that dispatch data, end and error
- // events).
- // Function takes a `target` object and returns set of implicit references
- // (non property references) it keeps. This basically allows defining
- // references between objects without storing the explicitly. See transform for
- // more details.
- let refs = (function() {
- let refSets = new WeakMap();
- return function refs(target) {
- if (!refSets.has(target)) refSets.set(target, new Set());
- return refSets.get(target);
- };
- })();
- function transform(input, f) {
- let output = {};
- // Since event listeners don't prevent `input` to be GC-ed we wanna presrve
- // it until `output` can be GC-ed. There for we add implicit reference which
- // is removed once `input` ends.
- refs(output).add(input);
- const next = data => receive(output, data);
- once(output, "start", () => start(input));
- on(input, "error", error => emit(output, "error", error));
- on(input, "end", function() {
- refs(output).delete(input);
- end(output);
- });
- on(input, "data", data => f(data, next));
- return output;
- }
- // High order event transformation function that takes `input` event channel
- // and returns transformation containing only events on which `p` predicate
- // returns `true`.
- function filter(input, predicate) {
- return transform(input, function(data, next) {
- if (predicate(data))
- next(data);
- });
- }
- exports.filter = filter;
- // High order function that takes `input` and returns input of it's values
- // mapped via given `f` function.
- const map = (input, f) => transform(input, (data, next) => next(f(data)));
- exports.map = map;
- // High order function that takes `input` stream of streams and merges them
- // into single event stream. Like flatten but time based rather than order
- // based.
- function merge(inputs) {
- let output = {};
- let open = 1;
- let state = [];
- output.state = state;
- refs(output).add(inputs);
- function end(input) {
- open = open - 1;
- refs(output).delete(input);
- if (open === 0) emit(output, "end");
- }
- const error = e => emit(output, "error", e);
- function forward(input) {
- state.push(input);
- open = open + 1;
- on(input, "end", () => end(input));
- on(input, "error", error);
- on(input, "data", data => emit(output, "data", data));
- }
- // If `inputs` is an array treat it as a stream.
- if (Array.isArray(inputs)) {
- inputs.forEach(forward);
- end(inputs);
- }
- else {
- on(inputs, "end", () => end(inputs));
- on(inputs, "error", error);
- on(inputs, "data", forward);
- }
- return output;
- }
- exports.merge = merge;
- const expand = (inputs, f) => merge(map(inputs, f));
- exports.expand = expand;
- const pipe = (from, to) => on(from, "*", emit.bind(emit, to));
- exports.pipe = pipe;
- // Shim signal APIs so other modules can be used as is.
- const receive = (input, message) => {
- if (input[receive])
- input[receive](input, message);
- else
- emit(input, "data", message);
- input.value = message;
- };
- receive.toString = () => "@@receive";
- exports.receive = receive;
- exports.send = receive;
- const end = input => {
- if (input[end])
- input[end](input);
- else
- emit(input, "end", input);
- };
- end.toString = () => "@@end";
- exports.end = end;
- const stop = input => {
- if (input[stop])
- input[stop](input);
- else
- emit(input, "stop", input);
- };
- stop.toString = () => "@@stop";
- exports.stop = stop;
- const start = input => {
- if (input[start])
- input[start](input);
- else
- emit(input, "start", input);
- };
- start.toString = () => "@@start";
- exports.start = start;
- const lift = (step, ...inputs) => {
- let args = null;
- let opened = inputs.length;
- let started = false;
- const output = {};
- const init = () => {
- args = [...inputs.map(input => input.value)];
- output.value = step(...args);
- };
- inputs.forEach((input, index) => {
- on(input, "data", data => {
- args[index] = data;
- receive(output, step(...args));
- });
- on(input, "end", () => {
- opened = opened - 1;
- if (opened <= 0)
- end(output);
- });
- });
- once(output, "start", () => {
- inputs.forEach(start);
- init();
- });
- init();
- return output;
- };
- exports.lift = lift;
- const merges = inputs => {
- let opened = inputs.length;
- let output = { value: inputs[0].value };
- inputs.forEach((input, index) => {
- on(input, "data", data => receive(output, data));
- on(input, "end", () => {
- opened = opened - 1;
- if (opened <= 0)
- end(output);
- });
- });
- once(output, "start", () => {
- inputs.forEach(start);
- output.value = inputs[0].value;
- });
- return output;
- };
- exports.merges = merges;
- const foldp = (step, initial, input) => {
- let output = map(input, x => step(output.value, x));
- output.value = initial;
- return output;
- };
- exports.foldp = foldp;
- const keepIf = (p, base, input) => {
- let output = filter(input, p);
- output.value = base;
- return output;
- };
- exports.keepIf = keepIf;
- function Input() {}
- Input.start = input => emit(input, "start", input);
- Input.prototype.start = Input.start;
- Input.end = input => {
- emit(input, "end", input);
- stop(input);
- };
- Input.prototype[end] = Input.end;
- exports.Input = Input;
- const $source = "@@source";
- const $outputs = "@@outputs";
- exports.outputs = $outputs;
- function Reactor(options={}) {
- const {onStep, onStart, onEnd} = options;
- if (onStep)
- this.onStep = onStep;
- if (onStart)
- this.onStart = onStart;
- if (onEnd)
- this.onEnd = onEnd;
- }
- Reactor.prototype.onStep = _ => void(0);
- Reactor.prototype.onStart = _ => void(0);
- Reactor.prototype.onEnd = _ => void(0);
- Reactor.prototype.onNext = function(present, past) {
- this.value = present;
- this.onStep(present, past);
- };
- Reactor.prototype.run = function(input) {
- on(input, "data", message => this.onNext(message, input.value));
- on(input, "end", () => this.onEnd(input.value));
- start(input);
- this.value = input.value;
- this.onStart(input.value);
- };
- exports.Reactor = Reactor;
- /**
- * Takes an object used as options with potential keys like 'onMessage',
- * used to be called `require('sdk/event/core').setListeners` on.
- * This strips all keys that would trigger a listener to be set.
- *
- * @params {Object} object
- * @return {Object}
- */
- function stripListeners (object) {
- return Object.keys(object || {}).reduce((agg, key) => {
- if (!EVENT_TYPE_PATTERN.test(key))
- agg[key] = object[key];
- return agg;
- }, {});
- }
- exports.stripListeners = stripListeners;
|