stream.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. "use strict";
  5. module.metadata = {
  6. "stability": "experimental"
  7. };
  8. const { CC, Cc, Ci, Cu, Cr, components } = require("chrome");
  9. const { EventTarget } = require("../event/target");
  10. const { emit } = require("../event/core");
  11. const { Buffer } = require("./buffer");
  12. const { Class } = require("../core/heritage");
  13. const { setTimeout } = require("../timers");
  14. const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1",
  15. "nsIMultiplexInputStream");
  16. const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1",
  17. "nsIAsyncStreamCopier", "init");
  18. const StringInputStream = CC("@mozilla.org/io/string-input-stream;1",
  19. "nsIStringInputStream");
  20. const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
  21. "nsIArrayBufferInputStream");
  22. const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
  23. "nsIBinaryInputStream", "setInputStream");
  24. const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1",
  25. "nsIInputStreamPump", "init");
  26. const threadManager = Cc["@mozilla.org/thread-manager;1"].
  27. getService(Ci.nsIThreadManager);
  28. const eventTarget = Cc["@mozilla.org/network/socket-transport-service;1"].
  29. getService(Ci.nsIEventTarget);
  30. let isFunction = value => typeof(value) === "function"
  31. function accessor() {
  32. let map = new WeakMap();
  33. return function(target, value) {
  34. if (value)
  35. map.set(target, value);
  36. return map.get(target);
  37. }
  38. }
  39. const Stream = Class({
  40. extends: EventTarget,
  41. initialize: function() {
  42. this.readable = false;
  43. this.writable = false;
  44. this.encoding = null;
  45. },
  46. setEncoding: function setEncoding(encoding) {
  47. this.encoding = String(encoding).toUpperCase();
  48. },
  49. pipe: function pipe(target, options) {
  50. let source = this;
  51. function onData(chunk) {
  52. if (target.writable) {
  53. if (false === target.write(chunk))
  54. source.pause();
  55. }
  56. }
  57. function onDrain() {
  58. if (source.readable)
  59. source.resume();
  60. }
  61. function onEnd() {
  62. target.end();
  63. }
  64. function onPause() {
  65. source.pause();
  66. }
  67. function onResume() {
  68. if (source.readable)
  69. source.resume();
  70. }
  71. function cleanup() {
  72. source.removeListener("data", onData);
  73. target.removeListener("drain", onDrain);
  74. source.removeListener("end", onEnd);
  75. target.removeListener("pause", onPause);
  76. target.removeListener("resume", onResume);
  77. source.removeListener("end", cleanup);
  78. source.removeListener("close", cleanup);
  79. target.removeListener("end", cleanup);
  80. target.removeListener("close", cleanup);
  81. }
  82. if (!options || options.end !== false)
  83. target.on("end", onEnd);
  84. source.on("data", onData);
  85. target.on("drain", onDrain);
  86. target.on("resume", onResume);
  87. target.on("pause", onPause);
  88. source.on("end", cleanup);
  89. source.on("close", cleanup);
  90. target.on("end", cleanup);
  91. target.on("close", cleanup);
  92. emit(target, "pipe", source);
  93. },
  94. pause: function pause() {
  95. emit(this, "pause");
  96. },
  97. resume: function resume() {
  98. emit(this, "resume");
  99. },
  100. destroySoon: function destroySoon() {
  101. this.destroy();
  102. }
  103. });
  104. exports.Stream = Stream;
  105. let nsIStreamListener = accessor();
  106. let nsIInputStreamPump = accessor();
  107. let nsIAsyncInputStream = accessor();
  108. let nsIBinaryInputStream = accessor();
  109. const StreamListener = Class({
  110. initialize: function(stream) {
  111. this.stream = stream;
  112. },
  113. // Next three methods are part of `nsIStreamListener` interface and are
  114. // invoked by `nsIInputStreamPump.asyncRead`.
  115. onDataAvailable: function(request, context, input, offset, count) {
  116. let stream = this.stream;
  117. let buffer = new ArrayBuffer(count);
  118. nsIBinaryInputStream(stream).readArrayBuffer(count, buffer);
  119. emit(stream, "data", new Buffer(buffer));
  120. },
  121. // Next two methods implement `nsIRequestObserver` interface and are invoked
  122. // by `nsIInputStreamPump.asyncRead`.
  123. onStartRequest: function() {},
  124. // Called to signify the end of an asynchronous request. We only care to
  125. // discover errors.
  126. onStopRequest: function(request, context, status) {
  127. let stream = this.stream;
  128. stream.readable = false;
  129. if (!components.isSuccessCode(status))
  130. emit(stream, "error", status);
  131. else
  132. emit(stream, "end");
  133. }
  134. });
  135. const InputStream = Class({
  136. extends: Stream,
  137. readable: false,
  138. paused: false,
  139. initialize: function initialize(options) {
  140. let { asyncInputStream } = options;
  141. this.readable = true;
  142. let binaryInputStream = new BinaryInputStream(asyncInputStream);
  143. let inputStreamPump = new InputStreamPump(asyncInputStream,
  144. -1, -1, 0, 0, false);
  145. let streamListener = new StreamListener(this);
  146. nsIAsyncInputStream(this, asyncInputStream);
  147. nsIInputStreamPump(this, inputStreamPump);
  148. nsIBinaryInputStream(this, binaryInputStream);
  149. nsIStreamListener(this, streamListener);
  150. this.asyncInputStream = asyncInputStream;
  151. this.inputStreamPump = inputStreamPump;
  152. this.binaryInputStream = binaryInputStream;
  153. },
  154. get status() nsIInputStreamPump(this).status,
  155. read: function() {
  156. nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null);
  157. },
  158. pause: function pause() {
  159. this.paused = true;
  160. nsIInputStreamPump(this).suspend();
  161. emit(this, "paused");
  162. },
  163. resume: function resume() {
  164. this.paused = false;
  165. nsIInputStreamPump(this).resume();
  166. emit(this, "resume");
  167. },
  168. close: function close() {
  169. this.readable = false;
  170. nsIInputStreamPump(this).cancel(Cr.NS_OK);
  171. nsIBinaryInputStream(this).close();
  172. nsIAsyncInputStream(this).close();
  173. },
  174. destroy: function destroy() {
  175. this.close();
  176. nsIInputStreamPump(this);
  177. nsIAsyncInputStream(this);
  178. nsIBinaryInputStream(this);
  179. nsIStreamListener(this);
  180. }
  181. });
  182. exports.InputStream = InputStream;
  183. let nsIRequestObserver = accessor();
  184. let nsIAsyncOutputStream = accessor();
  185. let nsIAsyncStreamCopier = accessor();
  186. let nsIMultiplexInputStream = accessor();
  187. const RequestObserver = Class({
  188. initialize: function(stream) {
  189. this.stream = stream;
  190. },
  191. // Method is part of `nsIRequestObserver` interface that is
  192. // invoked by `nsIAsyncStreamCopier.asyncCopy`.
  193. onStartRequest: function() {},
  194. // Method is part of `nsIRequestObserver` interface that is
  195. // invoked by `nsIAsyncStreamCopier.asyncCopy`.
  196. onStopRequest: function(request, context, status) {
  197. let stream = this.stream;
  198. stream.drained = true;
  199. // Remove copied chunk.
  200. let multiplexInputStream = nsIMultiplexInputStream(stream);
  201. multiplexInputStream.removeStream(0);
  202. // If there was an error report.
  203. if (!components.isSuccessCode(status))
  204. emit(stream, "error", status);
  205. // If there more chunks in queue then flush them.
  206. else if (multiplexInputStream.count)
  207. stream.flush();
  208. // If stream is still writable notify that queue has drained.
  209. else if (stream.writable)
  210. emit(stream, "drain");
  211. // If stream is no longer writable close it.
  212. else {
  213. nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK);
  214. nsIMultiplexInputStream(stream).close();
  215. nsIAsyncOutputStream(stream).close();
  216. nsIAsyncOutputStream(stream).flush();
  217. }
  218. }
  219. });
  220. const OutputStreamCallback = Class({
  221. initialize: function(stream) {
  222. this.stream = stream;
  223. },
  224. // Method is part of `nsIOutputStreamCallback` interface that
  225. // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered
  226. // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior,
  227. // causing the `onOutputStreamReady` notification to be suppressed until
  228. // the stream becomes closed.
  229. onOutputStreamReady: function(nsIAsyncOutputStream) {
  230. emit(this.stream, "finish");
  231. }
  232. });
  233. const OutputStream = Class({
  234. extends: Stream,
  235. writable: false,
  236. drained: true,
  237. get bufferSize() {
  238. let multiplexInputStream = nsIMultiplexInputStream(this);
  239. return multiplexInputStream && multiplexInputStream.available();
  240. },
  241. initialize: function initialize(options) {
  242. let { asyncOutputStream, output } = options;
  243. this.writable = true;
  244. // Ensure that `nsIAsyncOutputStream` was provided.
  245. asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream);
  246. // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former
  247. // is used to queue written data chunks that `asyncStreamCopier` will
  248. // asynchronously drain into `asyncOutputStream`.
  249. let multiplexInputStream = MultiplexInputStream();
  250. let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream,
  251. output || asyncOutputStream,
  252. eventTarget,
  253. // nsIMultiplexInputStream
  254. // implemnts .readSegments()
  255. true,
  256. // nsIOutputStream may or
  257. // may not implemnet
  258. // .writeSegments().
  259. false,
  260. // Use default buffer size.
  261. null,
  262. // Should not close an input.
  263. false,
  264. // Should not close an output.
  265. false);
  266. // Create `requestObserver` implementing `nsIRequestObserver` interface
  267. // in the constructor that's gonna be reused across several flushes.
  268. let requestObserver = RequestObserver(this);
  269. // Create observer that implements `nsIOutputStreamCallback` and register
  270. // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once
  271. // `nsIAsyncOutputStream` is closed.
  272. asyncOutputStream.asyncWait(OutputStreamCallback(this),
  273. asyncOutputStream.WAIT_CLOSURE_ONLY,
  274. 0,
  275. threadManager.currentThread);
  276. nsIRequestObserver(this, requestObserver);
  277. nsIAsyncOutputStream(this, asyncOutputStream);
  278. nsIMultiplexInputStream(this, multiplexInputStream);
  279. nsIAsyncStreamCopier(this, asyncStreamCopier);
  280. this.asyncOutputStream = asyncOutputStream;
  281. this.multiplexInputStream = multiplexInputStream;
  282. this.asyncStreamCopier = asyncStreamCopier;
  283. },
  284. write: function write(content, encoding, callback) {
  285. if (isFunction(encoding)) {
  286. callback = encoding;
  287. encoding = callback;
  288. }
  289. // If stream is not writable we throw an error.
  290. if (!this.writable) throw Error("stream is not writable");
  291. let chunk = null;
  292. // If content is not a buffer then we create one out of it.
  293. if (Buffer.isBuffer(content)) {
  294. chunk = new ArrayBufferInputStream();
  295. chunk.setData(content.buffer, 0, content.length);
  296. }
  297. else {
  298. chunk = new StringInputStream();
  299. chunk.setData(content, content.length);
  300. }
  301. if (callback)
  302. this.once("drain", callback);
  303. // Queue up chunk to be copied to output sync.
  304. nsIMultiplexInputStream(this).appendStream(chunk);
  305. this.flush();
  306. return this.drained;
  307. },
  308. flush: function() {
  309. if (this.drained) {
  310. this.drained = false;
  311. nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null);
  312. }
  313. },
  314. end: function end(content, encoding, callback) {
  315. if (isFunction(content)) {
  316. callback = content
  317. content = callback
  318. }
  319. if (isFunction(encoding)) {
  320. callback = encoding
  321. encoding = callback
  322. }
  323. // Setting a listener to "finish" event if passed.
  324. if (isFunction(callback))
  325. this.once("finish", callback);
  326. if (content)
  327. this.write(content, encoding);
  328. this.writable = false;
  329. // Close `asyncOutputStream` only if output has drained. If it's
  330. // not drained than `asyncStreamCopier` is busy writing, so let
  331. // it finish. Note that since `this.writable` is false copier will
  332. // close `asyncOutputStream` once output drains.
  333. if (this.drained)
  334. nsIAsyncOutputStream(this).close();
  335. },
  336. destroy: function destroy() {
  337. nsIAsyncOutputStream(this).close();
  338. nsIAsyncOutputStream(this);
  339. nsIMultiplexInputStream(this);
  340. nsIAsyncStreamCopier(this);
  341. nsIRequestObserver(this);
  342. }
  343. });
  344. exports.OutputStream = OutputStream;
  345. const DuplexStream = Class({
  346. extends: Stream,
  347. implements: [InputStream, OutputStream],
  348. allowHalfOpen: true,
  349. initialize: function initialize(options) {
  350. options = options || {};
  351. let { readable, writable, allowHalfOpen } = options;
  352. InputStream.prototype.initialize.call(this, options);
  353. OutputStream.prototype.initialize.call(this, options);
  354. if (readable === false)
  355. this.readable = false;
  356. if (writable === false)
  357. this.writable = false;
  358. if (allowHalfOpen === false)
  359. this.allowHalfOpen = false;
  360. // If in a half open state and it's disabled enforce end.
  361. this.once("end", () => {
  362. if (!this.allowHalfOpen && (!this.readable || !this.writable))
  363. this.end();
  364. });
  365. },
  366. destroy: function destroy(error) {
  367. InputStream.prototype.destroy.call(this);
  368. OutputStream.prototype.destroy.call(this);
  369. }
  370. });
  371. exports.DuplexStream = DuplexStream;