Mongodb聚合pipe道新的date

我尝试使用聚合pipe道来添加/创build新的date根据以前的pipe道值,并保存到一个新的集合(见我的pipe道下面)。 但是,语法是错误的,我得到一个错误说

不允许的字段types对象expression式中的date(在'date')//date:new Date('$ _ id.year','$ _id.month','$ _id.day')

我想知道如何使用我在mongo聚合pipe道中的上一年,每月,每天的值来创build新的date? 基本上,在将ISODate转换为分组的年,月,日之后,我想将它们转换回ISODate格式。

pipeline = [{ $match: { event: 'sample_event', } }, { $project: { _id: false, uuid: '$properties.distinct_id', time: '$properties.time' } }, { $group: { _id: { year: { $year: '$time' }, month: { $month: '$time' }, day: { $dayOfMonth: '$time' }, uuid: '$uuid' } } }, { $group: { _id: { year: '$_id.year', month: '$_id.month', day: '$_id.day' }, value: { $sum: 1 } } }, { $sort: { '_id.year': 1, '_id.month': 1, '_id.day': 1 } }, { $project: { _id: { $concat: [ { $substr: ['$_id.year', 0, 4] }, '-', { $cond: [ { $lte: [ '$_id.month', 9 ] }, { $concat: [ '0', { $substr: [ '$_id.month', 0, 2 ] } ]}, { $substr: [ '$_id.month', 0, 2 ] } ] }, '-', { $cond: [ { $lte: [ '$_id.day', 9 ] }, { $concat: [ '0', { $substr: [ '$_id.day', 0, 2 ] } ]}, { $substr: [ '$_id.day', 0, 2 ] } ] }, ] }, date: new Date('$_id.year', '$_id.month', '$_id.day'), // errorrrr value: 1 } }, { $out: 'output_collection' }]; 

您不能在聚合pipe道中“投射”新的数据types。 唯一真正允许的是在整数值(不是双精度值)上使用$substr ,并从date中提取时间戳值作为整数。 但string或数字不能成为string或date对象的数字。

在stream水线中也没有实际评估JavaScript,您可能会看到的任何示例纯粹是一个“单向旅程”,JavaScript函数在创buildpipe道的文档中被“评估”。 但是你不能以任何方式对pipe道内的数据进行操作。

编写新集合的最佳方法是处理游标结果并使用批量操作API写出。 所以,如果这是为了定期构build汇总结果,那么您甚至可以基本追加或更新到目标集合。

这意味着使用底层本地驱动程序的本地方法。 到mongojs的人看起来有点有趣,但方法仍然存在:

 var async = require('async'), mongojs = require('mongojs'), db = mongojs('mongodb://localhost/test',['sample']); db._get(function(err,db) { if (err) throw err; var source = db.collection('source'), bulk = db.collection('target').initializeOrderedBulkOp(), count = 0; var cursor = source.aggregate( [ { "$match": { "event" : "sample event" } }, { "$group": { "_id": { "day": { "$subtract": [ { "$subtract": [ "$properties.time", new Date("1970-01-01") ] }, { "$mod": [ { "$subtract": [ "$properties.time", new Date("1970-01-01") ] }, 1000 * 60 * 60 * 24 ]} ] }, "uuid": "$properties.distinct_id" } }}. { "$group": { "_id": "$_id.day", "count": { "$sum": 1 } }} ], { "cursor": { "batchSize": 101 } } ); cursor.on("data",function(data) { bulk.insert({ "date": new Date(data._id), "count": data.count }); count++; if ( count % 1000 == 0 ) { cursor.pause(); bulk.execute(function(err,result) { if (err) throw err; bulk = db.collection('target').initializeOrderedBulkOp(); cursor.resume(); }); } }); cursor.on("end",function() { if ( count % 1000 != 0 ) bulk.execute(function(err,result) { console.log("done"); }); }); }); 

所以这是您的聚合操作大大简化,运行速度更快。 输出是一个代表“日”的纪元时间戳值,通过从另一个date对象中减去一个date对象获得时间戳作为整数。 这里的通用date数字将数字从提供的date中加上代表“date”的值。

这比强制string更好,它适合作为新date对象的构造函数提供给Date()函数,这当然是在聚合pipe道的“外部”完成的。

这里的插入是批量执行的,每千件只有一次,所以非常有效。 因为事实上这是一个stream式接口,所以你使用事件并使用.pause().resume()以避免太多的并发请求和/或过多的内存使用。 根据需要调整,但无论大小,驱动程序实际上将分解为任何一个发送和返回1000的请求。

或者,当然,只要在这个阶段没有将值转换为date,只需使用$out创build集合,然后让您的代码从任何读取的结果中投射date。 但是你不能操纵一个date,并在聚合pipe道本身中以这种方式返回一个date对象。 使用任何方法最适合你。