在高地的循环数据stream

在受到NoFlo.js的启发之后,我刚刚学习了highland.js。 我希望能够recursion地操作stream。 在这个人为的例子中,我将提供一个乘以2的数字,我们过滤结果<= 512。一旦数字相乘,它会被反馈到系统中。 代码我有工作,但如果我拿出pipe道中的多托function,它不处理任何数字。 我怀疑我是不正确地将数据发回到returnPipe。 有没有更好的方式将数据传回系统? 我错过了什么?

### input>--m--->multiplyBy2>---+ | | | | +---<returnPipe<----+ ### H = require('highland') input = H([1]) returnPipe = H.pipeline( H.doto((v)->console.log(v)) ) H.merge([input,returnPipe]) .map((v)-> return v * 2) .filter((v)-> return v <= 512) .pipe(returnPipe) 

从文档: doto旋转一个stream,同时重新发射源stream 。 这意味着就stream水线而言,还有一个function仍然是通过它传递stream。 如果你把doto拿出来,原来的stream不会在下一次迭代中通过返回stream。

如果您要使用pipe道,则必须传递一个方法来获取stream并发射stream。 例如,你可以用H.map((v)=>{console.log(v); return v;})H.pipeline方法,因为这个方法会消耗一个stream并发射一个stream ,它将继续stream,当stream传回到它在.pipe(returnPipe)

编辑:要回答你的问题,当你声明let input = H([1])你实际上是在那里创build一个stream。 您可以删除对pipe道和returnPipe的任何引用,并使用以下代码生成相同的输出:

 let input = H([1]); input.map((v)=> { return v * 2; }) .filter((v)=> { if (v <= 512) { console.log(v); } return v <= 512; }) .pipe(input); 

我最初的目的是在highland.js中写一个recursion的文件读取器。 我发布到highland.js github的问题列表,Victor Vu帮助我把这一切与一个精彩的文章。

 H = require('highland') fs = require('fs') fsPath = require('path') ### directory >---m----------> dirFilesStream >-------------f----> out | | | | +-------------< returnPipe <--------------+ legend: (m)erge (f)ork + directory has the initial file + dirListStream does a directory listing + out prints out the full path of the file + directoryFilter runs stat and filters on directories + returnPipe the only way i can ### directory = H(['someDirectory']) mergePoint = H() dirFilesStream = mergePoint.merge().flatMap((parentPath) -> H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) -> fsPath.join parentPath, path ) out = dirFilesStream # Create the return pipe without using pipe! returnPipe = dirFilesStream.observe().flatFilter((path) -> H.wrapCallback(fs.stat)(path).map (v) -> v.isDirectory() ) # Connect up the merge point now that we have all of our streams. mergePoint.write directory mergePoint.write returnPipe mergePoint.end() # Release backpressure. out.each H.log