MongoDB 聚合怎么写,更复杂的聚合案例

2024-07-04 12:32:12 浏览数 (2)

上期我们针对MongoDB的聚合操作进行了一个实例的操作并且发现了与传统数据库在操作和索引方面的有意思的不同。这次我们来继续深入聚合操作,我们这里换一个数据集合collection ,将我们的复杂度提高。

(上期:MongoDB 挑战传统数据库聚合查询,干不死他们的)

代码语言:javascript复制
 mongo7 [direct: primary] test> show collections;
test
 mongo7 [direct: primary] test> db.test.find();
[
  {
    _id: '01001',
    city: 'AGAWAM',
    loc: [ -72.622739, 42.070206 ],
    pop: 15338,
    state: 'MA'
  },
  {
    _id: '01002',
    city: 'CUSHMAN',
    loc: [ -72.51565, 42.377017 ],
    pop: 36963,
    state: 'MA'
  },
  {
    _id: '01007',
    city: 'BELCHERTOWN',
    loc: [ -72.410953, 42.275103 ],
    pop: 10579,
    state: 'MA'
  },

这个collection 是一个记录城市的经纬度的集合,里面有城市和州的名字以及具体的经纬度等信息。下面我们要通过几个案例来说明,到底聚合该怎么去撰写,与传统的数据库有多大的不同。问题1 :以上数据中,针对洲名相同城市名相同,重复出现的次数,这些重复出现的次数的总和是多少?

代码语言:javascript复制
 mongo7 [direct: primary] test> db.test.aggregate([
...   {
...     $group: {
...       _id: { state: "$state", city: "$city" }, 
...       count: { $sum: 1 } 
...     }
...   },
...   {
...     $match: {
...       count: { $gt: 1 } 
...     }
...   },
...   {
...     $count: "duplicateCityStateCount" 
...   }
... ])
[ { duplicateCityStateCount: 893 } ]
Enterprise mongo7 [direct: primary] test> 


我们先看第一个列子,这个例子中,我们是以state,city作为分组的对象,然后对于每个分组的对象进行计数,然后发现其中超过1 次的技术对象进行数据的过滤,最终我们计算出到底有多少state city 在数据中出现的次数超过2次以上的总体出现的次数。但如果将这个语句换成SQL 则比较难来实现,但下面的例子,SQL会比较容易实现,如

代码语言:javascript复制
db.test.aggregate([
  {
    $group: {
      _id: { state: "$state", city: "$city" }, // 按州和城市进行分组
      count: { $sum: 1 } // 计算每个组中文档的数量
    }
  },
  {
    $sort: { count: -1 } // 按照文档数量降序排序
  }
])
代码语言:javascript复制
SELECT state, city, COUNT(*) AS count
FROM test
GROUP BY state, city
HAVING COUNT(*) > 1;

上面的SQL 语句和MONGODB 的语句表达的意思是一致的,意思是针对每个城市和州,重复出现的次数的分组统计

代码语言:javascript复制
Enterprise mongo7 [direct: primary] test> db.test.aggregate([
...   {
...     $group: {
...       _id: { state: "$state", city: "$city" }, // 按州和城市进行分组
...       count: { $sum: 1 } // 计算每个组中文档的数量
...     }
...   },
...   {
...     $sort: { count: -1 } // 按照文档数量降序排序
...   }
... ])
[
  { _id: { state: 'TX', city: 'HOUSTON' }, count: 93 },
  { _id: { state: 'CA', city: 'LOS ANGELES' }, count: 56 },
  { _id: { state: 'PA', city: 'PHILADELPHIA' }, count: 48 },
  { _id: { state: 'IL', city: 'CHICAGO' }, count: 47 },
  { _id: { state: 'TX', city: 'SAN ANTONIO' }, count: 45 },
  { _id: { state: 'TX', city: 'DALLAS' }, count: 44 },
  { _id: { state: 'MO', city: 'KANSAS CITY' }, count: 41 },
  { _id: { state: 'NY', city: 'NEW YORK' }, count: 40 },
  { _id: { state: 'TX', city: 'AUSTIN' }, count: 40 },
  { _id: { state: 'NY', city: 'BROOKLYN' }, count: 37 },
  { _id: { state: 'CA', city: 'SAN DIEGO' }, count: 34 },
  { _id: { state: 'FL', city: 'MIAMI' }, count: 34 },
  { _id: { state: 'OK', city: 'OKLAHOMA CITY' }, count: 33 },
  { _id: { state: 'AZ', city: 'PHOENIX' }, count: 33 },
  { _id: { state: 'GA', city: 'ATLANTA' }, count: 31 },
  { _id: { state: 'CA', city: 'SAN JOSE' }, count: 29 },
  { _id: { state: 'CA', city: 'SACRAMENTO' }, count: 28 },
  { _id: { state: 'OK', city: 'TULSA' }, count: 28 },
  { _id: { state: 'NE', city: 'OMAHA' }, count: 27 },
  { _id: { state: 'CA', city: 'SAN FRANCISCO' }, count: 26 }
]

这里我们将查询计划进行答应看看,这个语句到底是怎么执行的

代码语言:javascript复制
Enterprise mongo7 [direct: primary] test> db.test.aggregate([
...   {
...     $group: {
...       _id: { state: "$state", city: "$city" }, 
...       count: { $sum: 1 } 
...     }
...   },
...   {
...     $sort: { count: -1 } 
...   }
... ]).explain("executionStats") 
{
  explainVersion: '2',
  stages: [
    {
      '$cursor': {
        queryPlanner: {
          namespace: 'test.test',
          indexFilterSet: false,
          parsedQuery: {},
          queryHash: '28FD9B9E',
          planCacheKey: 'B81730B0',
          maxIndexedOrSolutionsReached: false,
          maxIndexedAndSolutionsReached: false,
          maxScansToExplodeReached: false,
          winningPlan: {
            queryPlan: {
              stage: 'GROUP',
              planNodeId: 2,
              inputStage: {
                stage: 'COLLSCAN',
                planNodeId: 1,
                filter: {},
                direction: 'forward'
              }
            },
            slotBasedPlan: {
              slots: '$$RESULT=s12 env: { s3 = Timestamp(1714570591, 1) (CLUSTER_TIME), s4 = 1714570597137 (NOW), s2 = Nothing (SEARCH_META), s1 = TimeZoneDatabase(Turkey...Asia/Chungking) (timeZoneDB) }',
              stages: '[2] mkobj s12 [_id = s11, count = s9] true false n'  
                '[2] project [s11 = newObj("state", s6, "city", s5)] n'  
                '[2] group [s6, s5] [s9 = sum(1)] spillSlots[s10] mergingExprs[sum(s10)] n'  
                '[1] scan s7 s8 none none none none lowPriority [s5 = city, s6 = state] @"4a56b246-67a5-4b06-b71b-0d8ec25876c2" true false '
            }
          },
          rejectedPlans: []
        },
        executionStats: {
          executionSuccess: true,
          nReturned: 25701,
          executionTimeMillis: 168,
          totalKeysExamined: 0,
          totalDocsExamined: 29353,
          executionStages: {
            stage: 'mkobj',
            planNodeId: 2,
            nReturned: 25701,
            executionTimeMillisEstimate: 98,
            opens: 1,
            closes: 1,
            saveState: 33,
            restoreState: 33,
            isEOF: 1,
            objSlot: 12,
            fields: [],
            projectFields: [ '_id', 'count' ],
            projectSlots: [ Long("11"), Long("9") ],
            forceNewObject: true,
            returnOldObject: false,
            inputStage: {
              stage: 'project',
              planNodeId: 2,
              nReturned: 25701,
              executionTimeMillisEstimate: 63,
              opens: 1,
              closes: 1,
              saveState: 33,
              restoreState: 33,
              isEOF: 1,
              projections: { '11': 'newObj("state", s6, "city", s5) ' },
              inputStage: {
                stage: 'group',
                planNodeId: 2,
                nReturned: 25701,
                executionTimeMillisEstimate: 41,
                opens: 1,
                closes: 1,
                saveState: 33,
                restoreState: 33,
                isEOF: 1,
                groupBySlots: [ Long("6"), Long("5") ],
                expressions: { '9': 'sum(1) ', initExprs: { '9': null } },
                mergingExprs: { '10': 'sum(s10) ' },
                usedDisk: false,
                spills: 0,
                spilledRecords: 0,
                spilledDataStorageSize: 0,
                inputStage: {
                  stage: 'scan',
                  planNodeId: 1,
                  nReturned: 29353,
                  executionTimeMillisEstimate: 40,
                  opens: 1,
                  closes: 1,
                  saveState: 33,
                  restoreState: 33,
                  isEOF: 1,
                  numReads: 29353,
                  recordSlot: 7,
                  recordIdSlot: 8,
                  fields: [ 'city', 'state' ],
                  outputSlots: [ Long("5"), Long("6") ]
                }
              }
            }
          }
        }
      },
      nReturned: Long("25701"),
      executionTimeMillisEstimate: Long("139")
    },
    {
      '$sort': { sortKey: { count: -1 } },
      totalDataSizeSortedBytesEstimate: Long("12869626"),
      usedDisk: false,
      spills: Long("0"),
      spilledDataStorageSize: Long("0"),
      nReturned: Long("25701"),
      executionTimeMillisEstimate: Long("147")
    }
  ],
  serverInfo: {
    host: 'mongo',
    port: 27017,
    version: '7.0.1',
    gitVersion: '425a0454d12f2664f9e31002bbe4a386a25345b5'
  },
  serverParameters: {
    internalQueryFacetBufferSizeBytes: 104857600,
    internalQueryFacetMaxOutputDocSizeBytes: 104857600,
    internalLookupStageIntermediateDocumentMaxSizeBytes: 104857600,
    internalDocumentSourceGroupMaxMemoryBytes: 104857600,
    internalQueryMaxBlockingSortMemoryUsageBytes: 104857600,
    internalQueryProhibitBlockingMergeOnMongoS: 0,
    internalQueryMaxAddToSetBytes: 104857600,
    internalDocumentSourceSetWindowFieldsMaxMemoryBytes: 104857600,
    internalQueryFrameworkControl: 'trySbeEngine'
  },
  command: {
    aggregate: 'test',
    pipeline: [
      {
        '$group': {
          _id: { state: '$state', city: '$city' },
          count: { '$sum': 1 }
        }
      },
      { '$sort': { count: -1 } }
    ],
    cursor: {},
    '$db': 'test'
  },
  ok: 1,
  '$clusterTime': {
    clusterTime: Timestamp({ t: 1714570591, i: 1 }),
    signature: {
      hash: Binary(Buffer.from("7a5832f6c1d7fc8b6b9bfe23a274a5b4f417cdcd", "hex"), 0),
      keyId: Long("7364003857451450369")
    }
  },
  operationTime: Timestamp({ t: 1714570591, i: 1 })


这里我们在提出一个新的需求,值对州为德克萨斯的城市进行类似的数据统计。同时针对这个查询我们仅仅这对state 添加索引就可以进行查询效率的简单提升。下面的执行计划已经明显的开始使用IXSCAN

代码语言:javascript复制
Enterprise mongo7 [direct: primary] test> db.test.aggregate([ { $match: { state: "TX"  } }, { $group: { _id: { state: "$state", city: "$city" }, count: { $sum: 1 }  } }, { $match: { count: { $gt: 1 }  } }] ).explain("executionStats")
{
  explainVersion: '2',
  stages: [
    {
      '$cursor': {
        queryPlanner: {
          namespace: 'test.test',
          indexFilterSet: false,
          parsedQuery: { state: { '$eq': 'TX' } },
          queryHash: '57E70BDE',
          planCacheKey: 'FC027C07',
          maxIndexedOrSolutionsReached: false,
          maxIndexedAndSolutionsReached: false,
          maxScansToExplodeReached: false,
          winningPlan: {
            queryPlan: {
              stage: 'GROUP',
              planNodeId: 3,
              inputStage: {
                stage: 'FETCH',
                planNodeId: 2,
                inputStage: {
                  stage: 'IXSCAN',
                  planNodeId: 1,
                  keyPattern: { state: 1 },
                  indexName: 'state_1',
                  isMultiKey: false,
                  multiKeyPaths: { state: [] },
                  isUnique: false,
                  isSparse: false,
                  isPartial: false,
                  indexVersion: 2,
                  direction: 'forward',
                  indexBounds: { state: [ '["TX", "TX"]' ] }
                }
              }
            },
            slotBasedPlan: {
              slots: '$$RESULT=s19 env: { s3 = Timestamp(1714571396, 7) (CLUSTER_TIME), s1 = TimeZoneDatabase(Turkey...Asia/Chungking) (timeZoneDB), s2 = Nothing (SEARCH_META), s4 = 1714571398752 (NOW), s6 = KS(3C5458000104), s7 = KS(3C545800FE04), s11 = {"state" : 1} }',
              stages: '[3] mkobj s19 [_id = s18, count = s16] true false n'  
                '[3] project [s18 = newObj("state", s15, "city", s14)] n'  
                '[3] group [s15, s14] [s16 = sum(1)] spillSlots[s17] mergingExprs[sum(s17)] n'  
                '[2] nlj inner [] [s5, s8, s9, s10, s11] n'  
                '    left n'  
                '        [1] cfilter {(exists(s6) && exists(s7))} n'  
                '        [1] ixseek s6 s7 s10 s5 s8 s9 [] @"4a56b246-67a5-4b06-b71b-0d8ec25876c2" @"state_1" true n'  
                '    right n'  
                '        [2] limit 1 n'  
                '        [2] seek s5 s12 s13 s8 s9 s10 s11 [s14 = city, s15 = state] @"4a56b246-67a5-4b06-b71b-0d8ec25876c2" true false n'
            }
          },
          rejectedPlans: []
        },
        executionStats: {
          executionSuccess: true,
          nReturned: 1233,
          executionTimeMillis: 12,
          totalKeysExamined: 1671,
          totalDocsExamined: 1671,
          executionStages: {
            stage: 'mkobj',
            planNodeId: 3,
            nReturned: 1233,
            executionTimeMillisEstimate: 6,
            opens: 1,
            closes: 1,
            saveState: 2,
            restoreState: 2,
            isEOF: 1,
            objSlot: 19,
            fields: [],
            projectFields: [ '_id', 'count' ],
            projectSlots: [ Long("18"), Long("16") ],
            forceNewObject: true,
            returnOldObject: false,
            inputStage: {
              stage: 'project',
              planNodeId: 3,
              nReturned: 1233,
              executionTimeMillisEstimate: 6,
              opens: 1,
              closes: 1,
              saveState: 2,
              restoreState: 2,
              isEOF: 1,
              projections: { '18': 'newObj("state", s15, "city", s14) ' },
              inputStage: {
                stage: 'group',
                planNodeId: 3,
                nReturned: 1233,
                executionTimeMillisEstimate: 6,
                opens: 1,
                closes: 1,
                saveState: 2,
                restoreState: 2,
                isEOF: 1,
                groupBySlots: [ Long("15"), Long("14") ],
                expressions: { '16': 'sum(1) ', initExprs: { '16': null } },
                mergingExprs: { '17': 'sum(s17) ' },
                usedDisk: false,
                spills: 0,
                spilledRecords: 0,
                spilledDataStorageSize: 0,
                inputStage: {
                  stage: 'nlj',
                  planNodeId: 2,
                  nReturned: 1671,
                  executionTimeMillisEstimate: 6,
                  opens: 1,
                  closes: 1,
                  saveState: 2,
                  restoreState: 2,
                  isEOF: 1,
                  totalDocsExamined: 1671,
                  totalKeysExamined: 1671,
                  collectionScans: 0,
                  collectionSeeks: 1671,
                  indexScans: 0,
                  indexSeeks: 1,
                  indexesUsed: [ 'state_1' ],
                  innerOpens: 1671,
                  innerCloses: 1,
                  outerProjects: [],
                  outerCorrelated: [
                    Long("5"),
                    Long("8"),
                    Long("9"),
                    Long("10"),
                    Long("11")
                  ],
                  outerStage: {
                    stage: 'cfilter',
                    planNodeId: 1,
                    nReturned: 1671,
                    executionTimeMillisEstimate: 6,
                    opens: 1,
                    closes: 1,
                    saveState: 2,
                    restoreState: 2,
                    isEOF: 1,
                    numTested: 1,
                    filter: '(exists(s6) && exists(s7)) ',
                    inputStage: {
                      stage: 'ixseek',
                      planNodeId: 1,
                      nReturned: 1671,
                      executionTimeMillisEstimate: 6,
                      opens: 1,
                      closes: 1,
                      saveState: 2,
                      restoreState: 2,
                      isEOF: 1,
                      indexName: 'state_1',
                      keysExamined: 1671,
                      seeks: 1,
                      numReads: 1672,
                      indexKeySlot: 10,
                      recordIdSlot: 5,
                      snapshotIdSlot: 8,
                      indexIdentSlot: 9,
                      outputSlots: [],
                      indexKeysToInclude: '00000000000000000000000000000000',
                      seekKeyLow: 's6 ',
                      seekKeyHigh: 's7 '
                    }
                  },
                  innerStage: {
                    stage: 'limit',
                    planNodeId: 2,
                    nReturned: 1671,
                    executionTimeMillisEstimate: 0,
                    opens: 1671,
                    closes: 1,
                    saveState: 2,
                    restoreState: 2,
                    isEOF: 1,
                    limit: 1,
                    inputStage: {
                      stage: 'seek',
                      planNodeId: 2,
                      nReturned: 1671,
                      executionTimeMillisEstimate: 0,
                      opens: 1671,
                      closes: 1,
                      saveState: 2,
                      restoreState: 2,
                      isEOF: 0,
                      numReads: 1671,
                      recordSlot: 12,
                      recordIdSlot: 13,
                      seekKeySlot: 5,
                      snapshotIdSlot: 8,
                      indexIdentSlot: 9,
                      indexKeySlot: 10,
                      indexKeyPatternSlot: 11,
                      fields: [ 'city', 'state' ],
                      outputSlots: [ Long("14"), Long("15") ]
                    }
                  }
                }
              }
            }
          }
        }
      },
      nReturned: Long("1233"),
      executionTimeMillisEstimate: Long("6")
    },
    {
      '$match': { count: { '$gt': 1 } },
      nReturned: Long("67"),
      executionTimeMillisEstimate: Long("6")
    }
  ],
  serverInfo: {
    host: 'mongo',
    port: 27017,
    version: '7.0.1',
    gitVersion: '425a0454d12f2664f9e31002bbe4a386a25345b5'
  },
  serverParameters: {
    internalQueryFacetBufferSizeBytes: 104857600,
    internalQueryFacetMaxOutputDocSizeBytes: 104857600,
    internalLookupStageIntermediateDocumentMaxSizeBytes: 104857600,
    internalDocumentSourceGroupMaxMemoryBytes: 104857600,
    internalQueryMaxBlockingSortMemoryUsageBytes: 104857600,
    internalQueryProhibitBlockingMergeOnMongoS: 0,
    internalQueryMaxAddToSetBytes: 104857600,
    internalDocumentSourceSetWindowFieldsMaxMemoryBytes: 104857600,
    internalQueryFrameworkControl: 'trySbeEngine'
  },
  command: {
    aggregate: 'test',
    pipeline: [
      { '$match': { state: 'TX' } },
      {
        '$group': {
          _id: { state: '$state', city: '$city' },
          count: { '$sum': 1 }
        }
      },
      { '$match': { count: { '$gt': 1 } } }
    ],
    cursor: {},
    '$db': 'test'
  },
  ok: 1,
  '$clusterTime': {
    clusterTime: Timestamp({ t: 1714571396, i: 7 }),
    signature: {
      hash: Binary(Buffer.from("b50924fe412dcbca2ee2bc00a6592d26c5c39d52", "hex"), 0),
      keyId: Long("7364003857451450369")
    }
  },
  operationTime: Timestamp({ t: 1714571396, i: 7 })
}
Enterprise mongo7 [direct: primary] test> 

如果将上面的MOGNODB 的语句翻译成SQL 可以翻译成

代码语言:javascript复制
SELECT state, city, COUNT(*) AS count
FROM collection
WHERE state = 'TX'
GROUP BY state, city
HAVING COUNT(*) > 1;

写到这里我们,我们回顾一下,在MOGNODB 的数据处理里面,有一些写法,的确无法直接翻译成SQL语句,或者SQL语句通过简单的写法无法直接表达,并且我们也应该熟知,在mongodb的数据处理中,也可以通过分部的方式来处理,比如,不一次这对以state 为完全分组的方式来统计city 的数据,可以针对state 建立索引,并逐个对于以state为条件的方式city的重复数进行统计,针对这样的方式也可以先将state作为固定的输入,通过javascript 查询语句的方式来处理。

结论,Mongodb的查询语句要比SQL语句更灵活,方案更多,优化的点更多,非常适合程序员来通过Mongodb 来继续数据的统计分析。

0 人点赞