从Node.js服务器查询Spark SQL
我目前使用npm的cassandra-driver从Node.js服务器查询我的Cassandra数据库。 由于我想能够写更复杂的查询,我想使用Spark SQL而不是CQL。 有没有什么办法来创build一个RESTful API(或其他),以便我可以像我目前使用CQL一样使用Spark SQL?
换句话说,我想能够从我的Node.js服务器发送一个Spark SQL查询到另一个服务器,并得到一个结果。
有没有办法做到这一点? 我一直在寻找解决这个问题的一段时间,还没有find任何东西。
编辑:我能够查询我的数据库从Spark壳Scala和Spark SQL,所以这一点工作。 我只需要以某种方式连接Spark和我的Node.js服务器。
我有一个类似的问题,我通过使用Spark-JobServer解决。
Spark-Jobserver(SJS)的主要方法通常是创build一个扩展其SparkSQLJob的特殊作业,如下例所示:
object ExecuteQuery extends SparkSQLJob { override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = { // Code to validate the parameters received in the request body } override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = { // Assuming your request sent a { "query": "..." } in the body: val df = sqlContext.sql(config.getString("query")) createResponseFromDataFrame(df) // You should implement this } }
然而,为了使这个方法与Cassandra很好地协作,你必须使用spark-cassandra-connector ,然后加载数据,你将有两个select:
1)在通过REST调用这个ExecuteQuery
之前,你必须把想要从Cassandra查询的完整数据传输给Spark。 为此,你可以做一些类似的代码 ( 代码由spark-cassandra-connector文档改编 ):
val df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "words", "keyspace" -> "test")) .load()
然后将其注册为一个表,以便SparkSQL能够访问它:
df.registerAsTempTable("myTable") // As a temporary table df.write.saveAsTable("myTable") // As a persistent Hive Table
只有在那之后,您才能够使用ExecuteQuery
从myTable
进行查询。
2)由于在某些使用情况下,第一个选项可能效率低下,所以还有另一种select。
spark-cassandra-connector有一个特殊的CassandraSQLContext ,可以直接从Spark中查询C *表。 它可以用来像:
val cc = new CassandraSQLContext(sc) val df = cc.sql("SELECT * FROM keyspace.table ...")
但是,为了在Spark- SparkContextFactory
上使用不同types的上下文,需要扩展SparkContextFactory
,并在创build上下文时使用它(可以通过POST请求/contexts
来完成)。 在SJS Gitub上可以看到一个特殊的上下文工厂的例子 。 您还必须创build一个SparkCassandraJob
,扩展SparkJob
(但这部分非常简单 )。
最后, ExecuteQuery
作业必须适应使用新的类。 这将是这样的:
object ExecuteQuery extends SparkCassandraJob { override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = { // Code to validate the parameters received in the request body } override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = { // Assuming your request sent a { "query": "..." } in the body: val df = cc.sql(config.getString("query")) createResponseFromDataFrame(df) // You should implement this } }
之后, ExecuteQuery
作业可以通过POST请求通过REST执行。
结论
在这里,我使用第一个选项,因为我需要HiveContext
可用的高级查询(例如,窗口函数),这些查询在CassandraSQLContext
中不可用。 但是,如果你不需要这些操作,我推荐第二种方法,即使它需要一些额外的编码来为SJS创build一个新的ContextFactory。