如何为node.js实现AWS Elasticache自动发现?

我是一个节点noob,并试图了解如何在node.js应用程序中实现自动发现 。 我打算使用集群模块,并希望每个工作进程保持最新(并持久地连接到)elasticache节点。

由于没有共享内存的概念(比如PHP APC),你是否需要在每个worker中运行代码,每隔X秒唤醒一次,并以某种方式更新IP列表并重新连接memcache客户端?

人们今天如何解决这个问题? 示例代码将不胜感激。

请注意,目前,自动发现仅适用于运行memcached引擎的caching群集。

对于caching引擎版本1.4.14或更高版本,您需要为caching群集configuration端点(或任何caching节点端点)创buildTCP / IP套接字并发送以下命令:

config get cluster 

有了Node.js,你可以使用net.Socket类来达到这个目的。

答复由两行组成:

  • configuration信息的版本号。 每次向caching集群添加或删除节点时,版本号都会增加1。

  • caching节点列表。 列表中的每个节点由主机名| ip-address |端口组表示,每个节点由空格分隔。

回车符和换行符(CR + LF)出现在每行的末尾。

在这里,您可以find如何将自动发现添加到客户端库的更全面的说明。

使用集群模块,您需要在每个进程(即subprocess)中存储相同的信息,并且我会使用每个subprocess的“setInterval”定期检查(例如每60秒)节点列表,并且只有在列表已经更改时才重新连接(这不应该经常发生)。

您可以select仅更新主服务器上的列表,并使用“worker.send”更新工作人员。 这可以使所有在单个服务器上运行的进程保持同步,但是在多服务器体系结构中并不会起作用,所以使用一致性散列非常重要,以便能够更改节点列表并释放“存储在memcached集群中的最小密钥数量。

我会使用一个全局variables来存储这种configuration。

仔细考虑两次,您可以使用适用于Node.jsAWS开发工具包来获取ElastiCache节点列表(也适用于Redis引擎)。

在这种情况下,代码将是这样的:

 var util = require('util'), AWS = require('aws-sdk'), Memcached = require('memcached'); global.AWS_REGION = 'eu-west-1'; // Just as a sample I'm using the EU West region global.CACHE_CLUSTER_ID = 'test'; global.CACHE_ENDPOINTS = []; global.MEMCACHED = null; function init() { AWS.config.update({ region: global.AWS_REGION }); elasticache = new AWS.ElastiCache(); function getElastiCacheEndpoints() { function sameEndpoints(list1, list2) { if (list1.length != list2.length) return false; return list1.every( function(e) { return list2.indexOf(e) > -1; }); } function logElastiCacheEndpoints() { global.CACHE_ENDPOINTS.forEach( function(e) { util.log('Memcached Endpoint: ' + e); }); } elasticache.describeCacheClusters({ CacheClusterId: global.CACHE_CLUSTER_ID, ShowCacheNodeInfo: true }, function(err, data) { if (!err) { util.log('Describe Cache Cluster Id:' + global.CACHE_CLUSTER_ID); if (data.CacheClusters[0].CacheClusterStatus == 'available') { var endpoints = []; data.CacheClusters[0].CacheNodes.forEach( function(n) { var e = n.Endpoint.Address + ':' + n.Endpoint.Port; endpoints.push(e); }); if (!sameEndpoints(endpoints, global.CACHE_ENDPOINTS)) { util.log('Memached Endpoints changed'); global.CACHE_ENDPOINTS = endpoints; if (global.MEMCACHED) global.MEMCACHED.end(); global.MEMCACHED = new Memcached(global.CACHE_ENDPOINTS); process.nextTick(logElastiCacheEndpoints); setInterval(getElastiCacheEndpoints, 60000); // From now on, update every 60 seconds } } else { setTimeout(getElastiCacheEndpoints, 10000); // Try again after 10 seconds until 'available' } } else { util.log('Error describing Cache Cluster:' + err); } }); } getElastiCacheEndpoints(); } init();