从Node.js服务器查询Spark SQL

我目前使用npm的cassandra-driver从Node.js服务器查询我的Cassandra数据库。 由于我想能够写更复杂的查询,我想使用Spark SQL而不是CQL。 有没有什么办法来创build一个RESTful API(或其他),以便我可以像我目前使用CQL一样使用Spark SQL?

换句话说,我想能够从我的Node.js服务器发送一个Spark SQL查询到另一个服务器,并得到一个结果。

有没有办法做到这一点? 我一直在寻找解决这个问题的一段时间,还没有find任何东西。

编辑:我能够查询我的数据库从Spark壳Scala和Spark SQL,所以这一点工作。 我只需要以某种方式连接Spark和我的Node.js服务器。

我有一个类似的问题,我通过使用Spark-JobServer解决。

Spark-Jobserver(SJS)的主要方法通常是创build一个扩展其SparkSQLJob的特殊作业,如下例所示:

object ExecuteQuery extends SparkSQLJob { override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = { // Code to validate the parameters received in the request body } override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = { // Assuming your request sent a { "query": "..." } in the body: val df = sqlContext.sql(config.getString("query")) createResponseFromDataFrame(df) // You should implement this } } 

然而,为了使这个方法与Cassandra很好地协作,你必须使用spark-cassandra-connector ,然后加载数据,你将有两个select:

1)在通过REST调用这个ExecuteQuery之前,你必须把想要从Cassandra查询的完整数据传输给Spark。 为此,你可以做一些类似的代码代码由spark-cassandra-connector文档改编 ):

 val df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "words", "keyspace" -> "test")) .load() 

然后将其注册为一个表,以便SparkSQL能够访问它:

 df.registerAsTempTable("myTable") // As a temporary table df.write.saveAsTable("myTable") // As a persistent Hive Table 

只有在那之后,您才能够使用ExecuteQuerymyTable进行查询。

2)由于在某些使用情况下,第一个选项可能效率低下,所以还有另一种select。

spark-cassandra-connector有一个特殊的CassandraSQLContext ,可以直接从Spark中查询C *表。 它可以用来像:

 val cc = new CassandraSQLContext(sc) val df = cc.sql("SELECT * FROM keyspace.table ...") 

但是,为了在Spark- SparkContextFactory上使用不同types的上下文,需要扩展SparkContextFactory ,并在创build上下文时使用它(可以通过POST请求/contexts来完成)。 在SJS Gitub上可以看到一个特殊的上下文工厂的例子 。 您还必须创build一个SparkCassandraJob ,扩展SparkJob (但这部分非常简单 )。

最后, ExecuteQuery作业必须适应使用新的类。 这将是这样的:

 object ExecuteQuery extends SparkCassandraJob { override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = { // Code to validate the parameters received in the request body } override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = { // Assuming your request sent a { "query": "..." } in the body: val df = cc.sql(config.getString("query")) createResponseFromDataFrame(df) // You should implement this } } 

之后, ExecuteQuery作业可以通过POST请求通过REST执行。


结论

在这里,我使用第一个选项,因为我需要HiveContext可用的高级查询(例如,窗口函数),这些查询在CassandraSQLContext中不可用。 但是,如果你不需要这些操作,我推荐第二种方法,即使它需要一些额外的编码来为SJS创build一个新的ContextFactory。