MongoDB 上的计算库

MongoDB内置json风格的查询表达式,但有时候用起来不太方面,这种情况下我们要把数据从MongoDB取出来,用外部的第三方库函数完成计算。下面将对比MongoDB上的几种计算库,尤其是语法表达和部署配置方面的区别。

MongoDB Connector

这是MongoDB官方提供的计算库,主要功能是模拟MySQL服务,负责SQL到json查询表达式的翻译,对上接收ODBC或JDBC的SQL请求,对下用json查询表达式访问MongoDB。

Connector支持基本的SQL语法,下面举例说明。MongoDB有名为test1的collection,大多数字段为简单类型,用来存储员工信息,Orders字段为数组类型,用来存储当前员工的多个订单。部分数据如下:

[{
      "_id": {"$oid":   "6074f6c7e85e8d46400dc4a7"},
      "EId": 7,"State":   "Illinois","Dept": "Sales","Name":   "Alexis","Gender": "F","Salary":   9000,"Birthday": "1972-08-16",
      "Orders": [
         {"OrderID":   70,"Client": "DSG","SellerId":   7,"Amount": 288,"OrderDate": "2009-09-30"},
         {"OrderID":   131,"Client": "FOL","SellerId":   7,"Amount": 103.2,"OrderDate": "2009-12-10"}
    ]
}
{
      "_id": {"$oid":   "6074f6c7e85e8d46400dc4a8"},
      "EId": 8,"State": "California", ...
}]

下面用SQL嵌入JAVA代码,实现针对订单表的条件查询。

package mon;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
    public class Main {
    public static void main(String[]   args)throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        Connection connection   =DriverManager.getConnection("jdbc:mysql://127.0.0.1:3307?source=mongo&mechanism=PLAIN&useSSL=false&authenticationPlugins=org.mongodb.mongosql.auth.plugin.MongoSqlAuthenticationPlugin");
        Statement statement =   connection.createStatement();
        String str="SELECT * FROM   mongo.test1_orders where Orders.Amount>1000 and Orders.Amount<=3000   and Orders.Client like'%S%' ";
        ResultSet result =   statement.executeQuery(str);
…
        if(connection != null)   connection.close();
    }
}

类似地,只需修改SQL语句,还可以实现分组汇总和条件查询:

str="SELECT    year( Orders.Orderdate) y,sum( Orders.Amount) s FROM mongo.test1_orders  group by year( Orders.Orderdate)";
str= "SELECT o.Orders.OrderID,o.Orders.Client,o.Orders.Sel≤rId,o.Orders.Amount,o.Orders.OrderDate,e.Name,e.Gender,e.Dept   from mongo.test1_Orders o, mongo. test1 e where   o.Orders.Sel≤rId=e.EId";

上面代码中,Orders.Orderdate是子文档的默认字段名,虽然里面用到了点号,但实际上SQL不支持多层数据类型,所以Orders.Orderdate只是外观像主子关系(可在元数据文件中重定义),但实际并不会分别解析。事实上,Connector把collection test1识别为2个独立的表,一个是不含子文档的table test1,另一个是只有子文档的table test1_Orders。这就导致SQL必须再此(额外)建立关联关系,而不能利用collection原有的天然主子关系。显然,这样的计算效率并不高。

除了不支持多层结构这种通用的SQL缺点之外,Connector 本身也是各类SQL中表达能力较弱的,比如不支持窗口函数。事实上,官网已经明确说MongoDB Connector只适合一些BI工具的基本需求。

由于是官方产品,所以MongoDB Connector的集成和配置都很简单。安装本计算库后,只需在命令行执行如下命令,即可启动数据库服务:

mongosqld --mongo-uri "mongodb://localhost:27017/?connect=direct" --addr "127.0.0.1:3307"

Calcite

Calcite的理想是用SQL语言计算任意数据源,其中就包括MongoDB。遗憾的是,Calcite on MongoDB的文档少且粗,有些功能没有找到具体说明,这导致下面的描述可能不准确。

Calcite只能以collection为单位取数,如果colleciton较大,就很容易内存溢出(Calcite只支持内存计算)。Calcite不能从多层collection中取数,比如查询前面collection test1里的子文档。为了迁就Calcite,这里将test重整成2个单层collection,即Employees和Orders。

使用Calcite对collection Orders进行条件查询时,代码如下:

package org.example;
  import java.sql.Connection;
  import java.sql.DriverManager;
  import java.sql.ResultSet;
  import java.sql.Statement;
  import java.util.Properties;
  public class App 
  {
      public static void main(String[]   args ) throws Exception{
          Properties config = new   Properties();
          config.put("model",   "d:\\mongo-model.json");
          config.put("lex",   "MYSQL");

          Connection con =   DriverManager.getConnection("jdbc:calcite:", config);
          Statement stmt =   con.createStatement();

        String   sql ="select * from orders where Amount>1000 and   Amount<=3000";
          ResultSet rs =   stmt.executeQuery(sql);
          …
          if(con!= null) con.close();
      }
  }

应该注意到,这里的条件查询语句简化了,没有模糊查询部分(其他计算库都有),这是因为Calcite还不支持模糊查询。类似地,Calcite也不支持取年份的函数,或字符串和日期的转换函数,所以前面的分组汇总无法实现,只能改写成下面这样:

sql="select Client, sum(Amount) from orders group by Client";

Calciteshe对关联计算的支持也不好,比如不能取部分字段,只能用*号取全部字段,所以前面的关联查询无法实现,只能改写成下面这样:

sql="SELECT * from Orders,Employees where   Orders.SellerId=Employees.EId";

Calcite的配置分两部分,首先在Maven中引入calcite-mongodb,之后建立元数据文件mongo_model.json,具体内容如下:

{
    "version": "1.0",
  "defaultSchema":   "dSchema",
    "schemas": [
   {
        "type": "custom",
        "name": "alias",
        "factory":   "org.apache.calcite.adapter.mongodb.MongoSchemaFactory",
        "operand": {
          "host": "localhost:27017",
          "database": "mongo"
      }
    },
    {
        "name": "dSchema",
        "tables": [
             {
            "name": "orders",
            "type": "view",
            "sql": "select cast(_MAP['OrderID'] AS integer)AS   OrderID,cast(_MAP['Client'] AS varchar(40)) AS Client,cast(_MAP['SellerId']   AS integer)AS SellerId,cast(_MAP['Amount'] AS float)AS   Amount,cast(_MAP['OrderDate'] AS varchar(20)) AS OrderDate from   \"alias\".\"Orders\""
        },
       {
            "name": "employees",
            "type": "view",
            "sql": "select cast(_MAP['EId'] AS integer)AS   EId,cast(_MAP['State'] AS varchar(40)) AS State,cast(_MAP['Dept'] AS   varchar(40)) AS Dept,cast(_MAP['Name'] AS varchar(40)) AS   Name,cast(_MAP['Gender'] AS varchar(40)) AS Gender,cast(_MAP['Salary'] AS   float)AS Salary,cast(_MAP['Birthday'] AS    varchar(20)) AS Birthday from   \"alias\".\"Employees\""
        }
      ]
    }
  ]
}

上面配置中,\"alias\".\"Orders\"是物理表名,orders是对应的视图名。理论上不用配置视图,只需在代码中直接查物理表即可,但实际上直接查物理表会导致很多SQL错误(比如分组汇总),这很可能是Calcite不够完善导致的。

Scala

Scala是常用的结构化计算语言,对MongoDB支持较早,其原理是:先从MongoDB读取collection,存储为Scala的DataFrame数据对象(或RDD),再用DataFrame的通用计算能力完成计算。

Scala计算库存在一些先天缺点。Scala只能以collection为单位取数,不支持用mongoDB的json查询表达式取数,如果colleciton数据量较大,则取数会花费大量时间。Scala不能从多层collection中取数,如果想计算MongoDB中的多层collection,则必须改造成多个单层collection。比如前面例子中的test1必须拆成2个单层的Orders和Employees。

使用Scala对collection Orders进行条件查询的代码如下:

package test
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config._
import com.mongodb.spark.sql.toSparkSessionFunctions
object Mon {
  def   main(args: Array[String]): Unit = {
    val   warehouseLocation = "file:${system:user.dir}/spark-warehouse"
    val spark   = SparkSession.builder()
        .master("local")
        .appName("MongoDB Test")
        .getOrCreate()

    val Orders   = spark.loadFromMongoDB(ReadConfig(
        Map("uri" -> "mongodb://127.0.0.1:27017/mongo.Orders")
    ))
    val   condtion=Orders.where("Amount>1000 and Amount<=3000 and Client   like'%S%' ")
      condtion.show()
  }
}

类似地,还可以实现分组汇总和关联计算:

//分组汇总
val groupBy=Orders.groupBy(year(Orders("OrderDate"))).agg(sum("Amount"))
//关联计算
val Employees = spark.loadFromMongoDB(ReadConfig(
        Map("uri" ->   "mongodb://127.0.0.1:27017/mongo.employees")
    ))
val   join=Orders.join(Employees,Orders("SellerId")===Employees("EId"),"Inner")
.select("OrderID","Client","SellerId","Amount","OrderDate","Name","Gender","Dept")

应该注意到,虽然必须拆分多层的collection才能取数,但只要取到数据,DataFrame的计算能力还是非常强的,这便是通用数据结构的好处。

配置方面,对于程序员来说非常简单,只需在Maven加入org.mongodb.spark即可。

集算器 SPL

集算器 SPL也是专业的开源结构化计算引擎,原理和Calcite类似,可以用统一的语法和数据结构计算各类数据源,其中就包括MongoDB。但集算器 SPL更“轻”,层次更少,语法更简单,对MongoDB的支持也更成熟。

比如对多层collection test1进行条件查询,SPL代码写作:

A
1 =mongo_open("mongodb://127.0.0.1:27017/mongo")
2 =mongo_shell(A1,"test1.find()")
3 =A2.conj(Orders)
4 =A3.select(Amount>1000 &&   Amount<=3000 && like@c(Client,"*s*")).fetch()
5 =mongo_close(A1)

从A2可以看出来,SPL支持MongoDB的json查询表达式(find、count、distinct和aggregate),比如区间查询写作:=mongo_shell(A2,"test1.find({Orders.mount:{gt:1000,lt:3000}})")。

在collection数据较多且json表达式较简单的时候,可以通过这种方式减少取到的数据,以防内存溢出;也可以加快查询速度,比如针对索引的查询。如果取到的数据依旧很多,SPL也能轻松处理,因为A2返回的是游标类型,可以计算超出内存的数据。

上述代码可在IDE中执行,也可以存为脚本文件(比如select.dfx),通过JDBC接口在JAVA中调用,具体如下:

package Test;
  import java.sql.Connection;
  import java.sql.DriverManager;
  import java.sql.ResultSet;
  import java.sql.Statement;
  public class test1 {
      public static void main(String[]   args)throws Exception {
            Class.forName("com.esproc.jdbc.InternalDriver");
          Connection connection   =DriverManager.getConnection("jdbc:esproc:local://");
          Statement statement =   connection.createStatement();
          ResultSet result =   statement.executeQuery("call select()");

……
          if(connection != null)   connection.close();
      }
  }

类似地,分组汇总代码如下:

A
1 =mongo_open("mongodb://127.0.0.1:27017/mongo")
2 =mongo_shell(A1,"test1.find()")
3 =A2.conj(Orders).groups(year(OrderDate);sum(Amount))
4 =mongo_close(A1)

关联查询代码如下:

A
1 =mongo_open("mongodb://127.0.0.1:27017/mongo")
2 =mongo_shell(A1,"test1.find()")
3 =A2.new(Orders.OrderID,Orders.Client,   Name,Gender,Dept).fetch()
4 =mongo_close(A1)

这里要注意的是,SPL的关联代码要比其他计算库都要简单(实际没有关联动作),甚至比MongoDB官方产品Connector简单。这是因为SPL的数据结构本身就是多层的,可以直接对应test1这种多层的collection,可以天然表达主子关系,如此一来就不必再额外进行关联。而其他计算库都是单层数据结构,难以对应多层collection。

当然,SPL也支持2个单层collection的关联:

A
1 =mongo_open("mongodb://127.0.0.1:27017/mongo")
2 =mongo_shell(A1,"Orders.find()").fetch()
3 =mongo_shell(A1,"Employees.find()").fetch()
4 =mongo_close(A1)
5 =join(A2,SellerId;A3,EId)
6 =A5.new(_1.OrderID,_1.Client,_2.Name,_2.Gender,_2.Dept)

SPL表达形式多样,除了本身的过程化语法,还支持SQL语法。因为SQL数据类型不支持多层数据(参考Calcite和Connector),所以只支持2个单层collection的关联,代码如下:

A
1 =mongo_open("mongodb://127.0.0.1:27017/mongo")
2 =mongo_shell(A34,"Orders.find()").fetch()
3 =mongo_shell(A34,"Employees.find()").fetch()
4 =mongo_close(A34)
5 $select o.OrderId,o.Client,e.Name,e.Gender,e.Dept from   {A35}  o join {A36} e on   o.SellerId=e.EId

MongoDB的特色是多层数据,用json表达式计算多层数据会遇到很多困难,这种情况下SPL做计算库经常可以简化计算。比如:统计下面每条记录中 income,output 的数量之和。

_id income output
1 {"cpu":1000, "mem":500,     "mouse":"100"} {"cpu":1000, "mem":600   ,"mouse":"120"}
2 {"cpu":2000,"mem":1000,    "mouse":"50","mainboard":500 } {"cpu":1500, "mem":300}

用json表达式计算时,代码很繁琐:

var fields = [  "income",   "output"];
db.computer.aggregate([ 
   { 
      $project:{ 
           "values":{ 
              $filter:{ 
                 input:{ 
                      "$objectToArray":"$$ROOT"
                 },
                 cond:{ 
                    $in:[ 
                       "$$this.k",
                       fields
                    ]
                 }
              }
         }
      }
   },
   { 
      $unwind:"$values"
   },
   { 
      $project:{ 
           key:"$values.k",
           values:{ 
              "$sum":{ 
                 "$let":{ 
                    "vars":{ 
                       "item":{ 
                            "$objectToArray":"$values.v"
                       }
                    },
                      "in":"$$item.v"
                 }
              }
         }
      }
   },
   {$sort: {"_id":-1}},
   { "$group": {
    "_id": "$_id",
    'income':{"$first":     "$values"},
    "output":{"$last":     "$values"}
    }},
]);

用SPL计算就简单多了:

A
1 =mongo_open("mongodb://127.0.0.1:27017/raqdb")
2 =mongo_shell(A1,"computer.find()").fetch()
3 =A2.new(_id:ID,income.array().sum():INCOME,output.array().sum():OUTPUT)
4 >A1.close()

最后说下集算器 SPL的配置。在Extend library中启用MongoCli即可完成配置,配置时可用图形界面。

通过上述比较可以看出:在语法表达方面,集算器 SPL对多层数据支持较完美;Connector可以将多层数据识别为多个表,基本能用;scala要将多层数据改造成单层,成本非常高;Calcite不仅改造成本高,而且不够成熟稳定。在配置部署方面,Connector较为方便,集算器 SPL和Scala也比较简单,Calcite依旧垫底。