通过node.js和hdfs模块将file upload到HDFS

我正在尝试使用node.js将file upload到HDFS。 我正在使用hdfs模块,但最终在我的HDFS上的文件是空的(stupidfile.txt不是)

var WebHDFS = require('webhdfs'); var hdfs = WebHDFS.createClient(); var fs = require('fs') var localFilePath = "stupidfile.txt"; var remoteFilePath = "/user/cloudera/doesthiswork.txt"; var localFileStream = fs.createReadStream(localFilePath); var remoteFileStream = hdfs.createWriteStream(remoteFilePath); localFileStream.pipe(remoteFileStream); console.log("opening stream to HDFS"); remoteFileStream.on('error', function onError (err) { // Do something with the error console.log("it failed"); console.log(err); }); remoteFileStream.on('finish', function onFinish () { // Upload is done console.log("it is done!"); }); 

控制台输出

 [cloudera@quickstart Documents]$ node hdfs-upload.js opening stream to HDFS it is done! 

更新下面的日志到标准输出和源代码,我添加了console.log();

 /** * Create writable stream for given path * * @example * * var WebHDFS = require('webhdfs'); * var hdfs = WebHDFS.createClient(); * * var localFileStream = fs.createReadStream('/path/to/local/file'); * var remoteFileStream = hdfs.createWriteStream('/path/to/remote/file'); * * localFileStream.pipe(remoteFileStream); * * remoteFileStream.on('error', function onError (err) { * // Do something with the error * }); * * remoteFileStream.on('finish', function onFinish () { * // Upload is done * }); * * @method createWriteStream * @fires WebHDFS#finish * * @param {String} path * @param {Boolean} [append] If set to true then append data to the file * @param {Object} [opts] * * @returns {Object} */ WebHDFS.prototype.createWriteStream = function createWriteStream (path, append, opts) { if (typeof append === 'object') { opts = append; append = false; } // Validate path if (!path || typeof path !== 'string') { throw new Error('path must be a string'); } var endpoint = this._getOperationEndpoint(append ? 'append' : 'create', path, extend({ overwrite: true, permissions: '0777' }, opts)); var self = this; var stream = null; var params = { method: append ? 'POST' : 'PUT', url: endpoint, json: true }; var req = request(params, function (err, res, body) { // Handle redirect only if there was not an error (eg res is defined) if (res && self._isRedirect(res)) { var upload = request(extend(params, { url: res.headers.location }), function (err, res, body) { if (err) { return req.emit('error', err); } else if (self._isError(res)) { return req.emit('error', self._parseError(body)); } if (res.headers.hasOwnProperty('location')) { return req.emit('finish', res.headers.location); } else { return req.emit('finish'); } }); console.log(stream); stream.pipe(upload); stream.resume(); } }); // Handle possible server error req.on('data', function onError (data) { var error = self._parseError(data.toString()); if (error) { stream.emit('error', error); req.emit('error', error); } }); req.on('error', function onError (err) { req.emit('finish'); // Request is finished }); req.on('pipe', function onPipe (src) { // Unpipe initial request src.unpipe(req); req.end(); // Pause read stream stream = src; stream.pause(); }); return req; }; 

收益:

 { _readableState: { objectMode: false, highWaterMark: 65536, buffer: [], length: 0, pipes: null, pipesCount: 0, flowing: true, ended: true, endEmitted: true, reading: false, sync: false, needReadable: false, emittedReadable: false, readableListening: false, defaultEncoding: 'utf8', ranOut: false, awaitDrain: 0, readingMore: false, decoder: null, encoding: null, resumeScheduled: false }, readable: false, domain: null, _events: { end: [ [Function] ] }, _maxListeners: undefined, path: 'stupidfile.txt', fd: null, flags: 'r', mode: 438, start: undefined, end: undefined, autoClose: true, pos: undefined, destroyed: true, closed: true } 

运行webhdfstesting时,3个失败:

  WebHDFS ✓ should make a directory ✓ should create and write data to a file (39ms) ✓ should append content to an existing file (44ms) ✓ should create and stream data to a file ✓ should append stream content to an existing file 1) should open and read a file stream 2) should open and read a file ✓ should list directory status ✓ should change file permissions 3) should change file owner ✓ should rename file ✓ should check file existence ✓ should stat file ✓ should create symbolic link ✓ should delete file ✓ should delete directory recursively 13 passing (308ms) 3 failing 

1)WebHDFS应该打开并读取一个文件stream:

  Uncaught AssertionError: "" must equal "random datamore random data" + expected - actual +random datamore random data at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/test/webhdfs.js:77:49) at Request.emit (events.js:104:17) at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:627:9) at Request.emit (events.js:107:17) at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1057:14) at Request.emit (events.js:129:20) at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12) at IncomingMessage.emit (events.js:129:20) at _stream_readable.js:903:16 at process._tickCallback (node.js:343:11) 

2)WebHDFS应该打开并读取一个文件:

  Uncaught AssertionError: "" must equal "random datamore random data" + expected - actual +random datamore random data at /home/cloudera/node_modules/webhdfs/test/webhdfs.js:86:34 at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:467:26) at Request.emit (events.js:104:17) at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:627:9) at Request.emit (events.js:107:17) at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1057:14) at Request.emit (events.js:129:20) at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12) at IncomingMessage.emit (events.js:129:20) at _stream_readable.js:903:16 at process._tickCallback (node.js:343:11) 

3)WebHDFS应该改变文件所有者:

  Uncaught AssertionError: {} must be null at /home/cloudera/node_modules/webhdfs/test/webhdfs.js:114:26 at /home/cloudera/node_modules/webhdfs/lib/webhdfs.js:231:24 at Request.onComplete [as _callback] (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:172:26) at Request.self.callback (/home/cloudera/node_modules/request/request.js:123:22) at Request.emit (events.js:110:17) at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1047:14) at Request.emit (events.js:129:20) at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12) at IncomingMessage.emit (events.js:129:20) at _stream_readable.js:903:16 at process._tickCallback (node.js:343:11) npm ERR! Test failed. See above f 

我现在觉得很愚蠢,但是如果有一天有人碰到同样的问题,我会保持这样的。

问题是节点安装。 我通过git repo安装它,从官方网站安装最新的稳定版本修复了这个问题。