在Highland.js中嵌套stream操作

我有一个readdirp模块的目录stream。

我要:-

  • 在每个目录中使用正则expression式(例如README.* )search文件
  • 读取该文件的第一行不以#开始
  • 打印出目录中的每个目录以及自述文件的第一个非标题行。

我正在尝试使用stream和highland.js来做到这一点。

我被困在试图处理每个目录内的所有文件的stream。

 h = require 'highland' dirStream = readdirp root: root, depth: 0, entryType: 'directories' dirStream = h(dirStream) .filter (entry) -> entry.stat.isDirectory() .map (entry) -> # Search all files in the directory for README. fileStream = readdirp root: entry.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store' fileStream = h(fileStream).filter (entry) -> /README\..*/.test entry.name fileStream.each (file) -> readmeStream = fs.createReadStream file _(readmeStream) .split() .takeUntil (line) -> not line.startsWith '#' and line isnt '' .last(1) .toArray (comment) -> # TODO: How do I access `comment` asynchronously to include in the return value of the map? return {name: entry.name, comment: comment} 

最好将高地stream视为不可变的,像filtermap这样的操作会返回依赖旧stream的新stream,而不是旧stream的修改。

此外,高地方法是懒惰的:当你绝对需要数据时,你应该只调用eachtoArray

asynchronous映射stream的标准方式是flatMap 。 这就像map ,但你给它的function应该返回一个stream。 您从flatMap获取的stream是所有返回的stream的串联。 由于新stream依次依赖于所有旧stream,因此可用于对asynchronousstream程进行sorting。

我会修改你的例子到以下(澄清一些variables名称):

 h = require 'highland' readmeStream = h(readdirp root: root, depth: 0, entryType: 'directories') .filter (dir) -> dir.stat.isDirectory() .flatMap (dir) -> # Search all files in the directory for README. h(readdirp root: dir.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store') .filter (file) -> /README\..*/.test file.name .flatMap (file) -> h(fs.createReadStream file.name) .split() .takeUntil (line) -> not line.startsWith '#' and line isnt '' .last(1) .map (comment) -> {name: file.name, comment} 

让我们来看看这个代码中的types。 首先,请注意, flatMap有types(以Haskellish表示法) Stream a → (a → Stream b) → Stream b ,即它需要包含一些types为a东西的stream和一个函数,期望types为a东西,返回包含b s,并返回一个包含b s的stream。 集合types(如stream和数组)实现flatMap作为连接返回集合的标准。

 h(readdirp root: root, depth: 0, entryType: 'directories') 

比方说,这有types的Stream Directoryfilter器不改变types,所以flatMap将是Stream Directory → (Directory → Stream b) → Stream b 。 我们将看看函数返回的内容:

 h(readdirp root: dir.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store') 

调用这个Stream File ,所以第二个flatMapStream File → (File → Stream b) → Stream b

 h(fs.createReadStream file.name) 

这是一个Stream StringsplittakeUntillast不改变,那么map做什么? mapflatMap非常相似:它的types是Stream a → (a → b) → Stream b 。 在这种情况下, aStringb是对象types{name : String, comment : String} 。 然后map返回该对象的stream,这是整个flatMap函数返回的。 第二个flatMap中的b是对象,所以第一个flatMap的函数也返回一个对象stream,所以整个stream是一个Stream {name : String, comment : String}

请注意,由于高地的懒惰,这实际上并没有启动任何stream媒体或处理。 您需要使用eachtoArray导致thunk并启动pipe道。 在eachcallback将与您的对象调用。 根据你想要做的评论,最好是flatMap更多(如果你写他们的文件为例)。

那么,我不是要写一篇文章。 希望这可以帮助。