index.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. var fs = require('graceful-fs')
  2. var Writable = require('readable-stream').Writable
  3. var util = require('util')
  4. var MurmurHash3 = require('imurmurhash')
  5. var iferr = require('iferr')
  6. var crypto = require('crypto')
  7. function murmurhex () {
  8. var hash = MurmurHash3('')
  9. for (var ii = 0; ii < arguments.length; ++ii) {
  10. hash.hash('' + arguments[ii])
  11. }
  12. return hash.result()
  13. }
  14. var invocations = 0
  15. function getTmpname (filename) {
  16. return filename + '.' + murmurhex(__filename, process.pid, ++invocations)
  17. }
  18. var setImmediate = global.setImmediate || setTimeout
  19. module.exports = WriteStreamAtomic
  20. // Requirements:
  21. // 1. Write everything written to the stream to a temp file.
  22. // 2. If there are no errors:
  23. // a. moves the temp file into its final destination
  24. // b. emits `finish` & `closed` ONLY after the file is
  25. // fully flushed and renamed.
  26. // 3. If there's an error, removes the temp file.
  27. util.inherits(WriteStreamAtomic, Writable)
  28. function WriteStreamAtomic (path, options) {
  29. if (!(this instanceof WriteStreamAtomic)) {
  30. return new WriteStreamAtomic(path, options)
  31. }
  32. Writable.call(this, options)
  33. this.__isWin = options && options.hasOwnProperty('isWin') ? options.isWin : process.platform === 'win32'
  34. this.__atomicTarget = path
  35. this.__atomicTmp = getTmpname(path)
  36. this.__atomicChown = options && options.chown
  37. this.__atomicClosed = false
  38. this.__atomicStream = fs.WriteStream(this.__atomicTmp, options)
  39. this.__atomicStream.once('open', handleOpen(this))
  40. this.__atomicStream.once('close', handleClose(this))
  41. this.__atomicStream.once('error', handleError(this))
  42. }
  43. // We have to suppress default finish emitting, because ordinarily it
  44. // would happen as soon as `end` is called on us and all of the
  45. // data has been written to our target stream. So we suppress
  46. // finish from being emitted here, and only emit it after our
  47. // target stream is closed and we've moved everything around.
  48. WriteStreamAtomic.prototype.emit = function (event) {
  49. if (event === 'finish') return this.__atomicStream.end()
  50. return Writable.prototype.emit.apply(this, arguments)
  51. }
  52. WriteStreamAtomic.prototype._write = function (buffer, encoding, cb) {
  53. var flushed = this.__atomicStream.write(buffer, encoding)
  54. if (flushed) return cb()
  55. this.__atomicStream.once('drain', cb)
  56. }
  57. function handleOpen (writeStream) {
  58. return function (fd) {
  59. writeStream.emit('open', fd)
  60. }
  61. }
  62. function handleClose (writeStream) {
  63. return function () {
  64. if (writeStream.__atomicClosed) return
  65. writeStream.__atomicClosed = true
  66. if (writeStream.__atomicChown) {
  67. var uid = writeStream.__atomicChown.uid
  68. var gid = writeStream.__atomicChown.gid
  69. return fs.chown(writeStream.__atomicTmp, uid, gid, iferr(cleanup, moveIntoPlace))
  70. } else {
  71. moveIntoPlace()
  72. }
  73. }
  74. function moveIntoPlace () {
  75. fs.rename(writeStream.__atomicTmp, writeStream.__atomicTarget, iferr(trapWindowsEPERM, end))
  76. }
  77. function trapWindowsEPERM (err) {
  78. if (writeStream.__isWin &&
  79. err.syscall && err.syscall === 'rename' &&
  80. err.code && err.code === 'EPERM'
  81. ) {
  82. checkFileHashes(err)
  83. } else {
  84. cleanup(err)
  85. }
  86. }
  87. function checkFileHashes (eperm) {
  88. var inprocess = 2
  89. var tmpFileHash = crypto.createHash('sha512')
  90. var targetFileHash = crypto.createHash('sha512')
  91. fs.createReadStream(writeStream.__atomicTmp)
  92. .on('data', function (data, enc) { tmpFileHash.update(data, enc) })
  93. .on('error', fileHashError)
  94. .on('end', fileHashComplete)
  95. fs.createReadStream(writeStream.__atomicTarget)
  96. .on('data', function (data, enc) { targetFileHash.update(data, enc) })
  97. .on('error', fileHashError)
  98. .on('end', fileHashComplete)
  99. function fileHashError () {
  100. if (inprocess === 0) return
  101. inprocess = 0
  102. cleanup(eperm)
  103. }
  104. function fileHashComplete () {
  105. if (inprocess === 0) return
  106. if (--inprocess) return
  107. if (tmpFileHash.digest('hex') === targetFileHash.digest('hex')) {
  108. return cleanup()
  109. } else {
  110. return cleanup(eperm)
  111. }
  112. }
  113. }
  114. function cleanup (err) {
  115. fs.unlink(writeStream.__atomicTmp, function () {
  116. if (err) {
  117. writeStream.emit('error', err)
  118. writeStream.emit('close')
  119. } else {
  120. end()
  121. }
  122. })
  123. }
  124. function end () {
  125. // We have to use our parent class directly because we suppress `finish`
  126. // events fired via our own emit method.
  127. Writable.prototype.emit.call(writeStream, 'finish')
  128. // Delay the close to provide the same temporal separation a physical
  129. // file operation would have– that is, the close event is emitted only
  130. // after the async close operation completes.
  131. setImmediate(function () {
  132. writeStream.emit('close')
  133. })
  134. }
  135. }
  136. function handleError (writeStream) {
  137. return function (er) {
  138. cleanupSync()
  139. writeStream.emit('error', er)
  140. writeStream.__atomicClosed = true
  141. writeStream.emit('close')
  142. }
  143. function cleanupSync () {
  144. try {
  145. fs.unlinkSync(writeStream.__atomicTmp)
  146. } finally {
  147. return
  148. }
  149. }
  150. }