MongoDB是……
该章节介绍如何向MongoDB中添加数据。
通过
/** * 本JS说明: * 1、将这里的内容保存到JS文件(这里命名为:xxx.js)中 * 2、在JS文件所在的文件夹“在此处打开PowerShell窗口”或者“命令提示符” * 3、在终端输入下面的命令 * 示例: mongo xxx.js */ /** * mongo 其他 * 0、添加 数据库名称 参数 * 语法: mongo 数据库名称 xxx.js * 说明: 这里默认连接的是本地数据库 * 1、添加 mongodb地址(端口)和数据库名称 参数 * 语法: mongo IP地址:端口/数据库名称 xxx.js * 说明: * 1.1、此时你可以在JS文件中直接使用 db ,例如:print(db.getName()); print(db.version()); * 1.2、如果设置了用户名和密码,则需要添加 -u -p --authenticationDatabase 参数 * 示例:mongo IP地址:端口/数据库名称 xxx.js -u 用户名 -p 密码 --authenticationDatabase admin * 2、在终端输入时不带 mongodb地址和数据库名称 参数 * 语法: mongo xxx.js * 说明: 这种情况下,如何获取 db 以及设置了 用户名和密码 如何处理,请直接看代码 * …… 更多使用方式, * 你可以在终端输入: mongo --help * https://www.mongodb.com/docs/manual/reference/program/mongo/ */ // 1、数据库连接 !!!这里需要替换!!! let mongo = new Mongo('IP地址:端口'); mongo.getDB('admin').auth('用户名', '密码'); let db = mongo.getDB('数据库名称'); // 2、方法:批量插入数据 function realBatchInsertData(collectionName, trs) { try { db[collectionName].insertMany(trs); } catch (e) { print(e); } }; // 3、准备数据以及插入数据 // 每页插入的条数 let pageSize = 100; // 这里放置你需要插入的数据 !!!这里需要替换!!! let trs = []; // 计算页数 let pageNum = Math.ceil(trs.length / pageSize); let startTime = new Date(); print("此次共插入" + trs.length + "条记录,分页方式插入,每页" + pageSize + "条,共" + pageNum + "页"); print("---> 开始插入,计时开始。现在时间:" + startTime.toLocaleString()); for (let m = 1; m <= pageNum; m++) { let currentPageTrs = trs.slice((m - 1) * pageSize, m * pageSize); print(" 第" + m + "页,共" + currentPageTrs.length + "条"); // !!!这里需要替换!!! realBatchInsertData("表名", currentPageTrs); } var endTime = new Date(); print("---> 结束插入,计时结束。现在时间:" + endTime.toLocaleString() + "。耗时:" + (endTime - startTime) / 1000 + "s");
备份相关
# 说明:目录会自动创建的 # 1、备份本地数据库(Test001)-未开启权限认证 mongodump -h localhost:27017 -d Test001 -o D:\backup\mongodb\local mongodump -h 127.0.0.1:27017 -d Test001 -o D:\backup\mongodb\local # 2、备份远程数据库(Test001)-开启权限认证 mongodump -h IP地址:端口号 -d Test001 -o D:\backup\mongodb\remote -u=用户名 -p=密码 --authenticationDatabase admin
还原相关
# 1、还原到本地数据库(Test002)-未开启权限认证 mongorestore -h localhost:27017 -d Test002 D:\backup\mongodb\local\Test001 mongorestore -h 127.0.0.1:27017 -d Test002 D:\backup\mongodb\local\Test001 # 2、还原到远程数据库(Test002)-开启权限认证 mongorestore -h IP地址:端口号 -d Test002 D:\backup\mongodb\remote\Test001 -u=用户名 -p=密码 --authenticationDatabase admin
复制数据库的几种方式,等待添加……
复制集合的几种方式。找到了一篇文章,文中介绍了多种方式,很全。链接地址:7 different ways to clone MongoDB connection
// 这里会创建一个新表"config2",可能需要权限验证。 db.config.aggregate([ { $out: 'config2' } ]); // 如果添加过滤条件,则只是复制其中的部分数据 db.config.aggregate([{ $match: { ModuleId : 'a' } }, { $out: 'config2' } ]);
该章节记录查询相关的
这个查询的业务性很强,用到的场景应该不多。为了了解这个查询是做什么的,先介绍一下具体的业务场景:模块(一个业务上的概念,非JS中的模块),他是一个功能完成的业务单元,包括添加记录、编辑记录、列表(包括搜索、导出)等;新建、编辑页面的表单展示哪些控件,列表显示哪些列时候需要排序、搜索等这些都是基于配置的;首先模块有一个默认的配置(从0到1),其次在不同的应用(或者叫做产品)中使用他时可以有不同的配置,可以在默认配置的基础上修改,再次不同的企业买了应用之后也可以在应用级的基础上对模块配置进行修改,甚至每个用户都有自己的配置。这里我们将默认配置、应用下的配置、企业下的配置、用户配置等叫做层级配置,并且在同一层级内还支持版本的概念,随着时间的推移以及业务的变化,我们的配置可能会有变化。
说了一大堆,不知道说清楚了没有,哈哈,就当说清楚了。这个查询的目的是:按照给定的层级参数,查询最新的配置。规则是:指定层级配置 如果存在,则取最新记录;如果不存在,则获取上一级的最新配置,直到最顶级。
先看一下数据结构
{ "_id" : "", "ModuleId" : "a", "Level1" : null, "Level2" : null, "Level3" : null, "FormConfig" : "", "ListConfig" : "", "CreateDate" : "" }
为了验证查询语句,还是先造点数据:
// 两个方法,用于生成一个随机时间 // 更多方法:https://github.com/xiaodu114/a2bei4 function getInRangeInteger(num1, num2) { num1 = Number.isInteger(num1) ? num1 : 0; num2 = Number.isInteger(num2) ? num2 : 0; if (num1 > num2) [num1, num2] = [num2, num1]; return Math.round(Math.random() * (num2 - num1)) + num1; } function date_GetInRangeDate(date1, date2) { let v1 = new Date(date1).valueOf(), v2 = new Date(date2).valueOf(); if (isNaN(v1) && isNaN(v2)) { v1 = 0; v2 = new Date().valueOf(); } else { if (isNaN(v1)) v1 = 0; if (isNaN(v2)) v2 = 0; } return new Date(getInRangeInteger(Math.abs(v1 - v2)) + Math.min(v1, v2)); } let moduleIds = ["a", "b", "c"], level1s = ["lv1_1", "lv1_2", "lv1_3"], level2s = ["lv2_1", "lv2_2", "lv2_3"], level3s = ["lv3_1", "lv3_2", "lv3_3"]; let trs = []; // 标识 moduleIds.forEach((moduleId) => { for (let i = 0; i < 3; i++) { trs.push({ ModuleId: moduleId, Level1: null, Level2: null, Level3: null }); // 自定义:第一级 level1s.forEach((lv1) => { trs.push({ ModuleId: moduleId, Level1: lv1, Level2: null, Level3: null }); // 自定义:第一级 level2s.forEach((lv2) => { trs.push({ ModuleId: moduleId, Level1: lv1, Level2: lv2, Level3: null }); // 自定义:第一级 level3s.forEach((lv3) => { trs.push({ ModuleId: moduleId, Level1: lv1, Level2: lv2, Level3: lv3 }); }); }); }); } }); trs.forEach(tr => { tr["FormConfig"] = ""; tr["ListConfig"] = {pc:[],mobile:[]}; tr["CreateDate"] = date_GetInRangeDate("2022-01-01 08:00:00", "2022-05-15 08:00:00"); }); db.config.insertMany(trs);
下面就是查询语句了
// 模拟一个层级配置查询类 let levelObj = { ModuleId: 'a', Level1: 'lv1_1', Level2: 'lv2_1', Level3: 'lv3_1' }; db.getCollection('config').aggregate([{ $match: { $or: [{ ModuleId: levelObj.ModuleId, Level1: null, Level2: null, Level3: null }, { ModuleId: levelObj.ModuleId, Level1: levelObj.Level1, Level2: null, Level3: null, }, { ModuleId: levelObj.ModuleId, Level1: levelObj.Level1, Level2: levelObj.Level2, Level3: null }, { ModuleId: levelObj.ModuleId, Level1: levelObj.Level1, Level2: levelObj.Level2, Level3: levelObj.Level3, } ] } }, { $sort: { CreateDate: -1 } }, { $group: { _id: { ModuleId: '$ModuleId', Level1: '$Level1', Level2: '$Level2', Level3: '$Level3' }, latestItem: { $first: '$$ROOT' } } }, { $sort: { _id: -1 } }, { $limit: 1 }, { $replaceRoot: { newRoot: '$latestItem' } } ])
上面介绍的查询是单个的,下面再说批量查询的,下面介绍两种方式:
db.getCollection('config').aggregate([{ $match: { $or: [{ ModuleId: 'a', Level1: null, Level2: null, Level3: null }, { ModuleId: 'a', Level1: 'lv1_1', Level2: null, Level3: null, }, { ModuleId: 'a', Level1: 'lv1_1', Level2: 'lv2_1', Level3: null }, { ModuleId: 'a', Level1: 'lv1_1', Level2: 'lv2_1', Level3: 'lv3_1' }, { ModuleId: 'b', Level1: null, Level2: null, Level3: null }, { ModuleId: 'b', Level1: 'lv1_2', Level2: null, Level3: null, }, { ModuleId: 'b', Level1: 'lv1_2', Level2: 'lv2_2', Level3: null }, { ModuleId: 'b', Level1: 'lv1_2', Level2: 'lv2_2', Level3: 'lv3_2' } ] } }, { $sort: { CreateDate: -1 } }, { $group: { _id: { $concat: ['$ModuleId', '->', { $ifNull: ['$Level1', 'null'] }, '->', { $ifNull: ['$Level2', 'null'] }, '->', { $ifNull: ['$Level3', 'null'] } ] }, latestItem: { // $arrayElemAt: ['$items', 0] // 获取数组第一项 $first: '$$ROOT' } } }, // 下面的两个管道将查询结果(数组)转成对象 { $group: { _id: '1', results: { $push: '$$ROOT' } } }, { $replaceRoot: { newRoot: { $arrayToObject: { $map: { input: '$results', as: 'item', in: ['$$item._id', '$$item.latestItem'] } } } } } ])
db.getCollection('config').aggregate([{ $match: { $or: [{ ModuleId: 'a', Level1: null, Level2: null, Level3: null }, { ModuleId: 'a', Level1: 'lv1_1', Level2: null, Level3: null, }, { ModuleId: 'a', Level1: 'lv1_1', Level2: 'lv2_1', Level3: null }, { ModuleId: 'a', Level1: 'lv1_1', Level2: 'lv2_1', Level3: 'lv3_1' }, { ModuleId: 'b', Level1: null, Level2: null, Level3: null }, { ModuleId: 'b', Level1: 'lv1_2', Level2: null, Level3: null, }, { ModuleId: 'b', Level1: 'lv1_2', Level2: 'lv2_2', Level3: null }, { ModuleId: 'b', Level1: 'lv1_2', Level2: 'lv2_2', Level3: 'lv3_2' } ] } }, { $group: { _id: { $concat: ['$ModuleId', '->', { $ifNull: ['$Level1', 'null'] }, '->', { $ifNull: ['$Level2', 'null'] }, '->', { $ifNull: ['$Level3', 'null'] } ] }, items: { $push: "$$ROOT" } } }, { $project: { latestItem: { $arrayElemAt: [ '$items', { $indexOfArray: [ '$items.CreateDate', { $max: '$items.CreateDate' } ] } ] } } }, // 下面的两个管道将查询结果(数组)转成对象 { $group: { _id: '1', results: { $push: '$$ROOT' } } }, { $replaceRoot: { newRoot: { $arrayToObject: { $map: { input: '$results', as: 'item', in: ['$$item._id', '$$item.latestItem'] } } } } } ])
还是先介绍一下业务场景:表单基于表单构造器,表单字段是用户自己配的,所以不确定有哪些字段,因此在存储表单数据时选用的是数组结构,形如:
{ _id: "", FormData: [{ key: "表单控件ID", value: "对应的值" }, { key: "表单控件ID", value: "对应的值" }, // …… 还有很多表单字段 ], CreateDate: new Date(), // …… 其他字段省略 }
在查询之前还是先伪造一些数据,下面是语句:
在没有处理通用查询(.net 和 mongodb这篇有介绍)之前或者说刚从关系型数据转到mongodb后,这种查询感觉很别扭,于是便想着能不能在查询之前先将数据处理一下:将表单中的字段拿出来放到最外层,表单控件ID作为列名称,和"_id"处于同一层级。实现该功能至今有好几个版本了,在这里汇总一下。链接一下之前的笔记:表单生成器(Form Builder)之mongodb表单数据——整理数据
// 参考链接: https://jira.mongodb.org/browse/SERVER-5947 db.instance.aggregate([{ $addFields: { FormDataObj: { $arrayToObject: { $map: { input: '$FormData', as: 'kv', in: ['$$kv.key', '$$kv.value'] } } } } }, { $addFields: { 'FormDataObj._id': '$_id', 'FormDataObj.Title': '$Title', 'FormDataObj.FormData': '$FormData', 'FormDataObj.CreateUserId': '$CreateUserId', 'FormDataObj.CreateUserName': '$CreateUserName', 'FormDataObj.CreateDate': '$CreateDate', 'FormDataObj.ModifyUserId': '$ModifyUserId', 'FormDataObj.ModifyUserName': '$ModifyUserName', 'FormDataObj.LastOperateDate': '$LastOperateDate', 'FormDataObj.ModuleId': '$ModuleId', 'FormDataObj.ModuleVersion': '$ModuleVersion' } }, { $replaceRoot: { newRoot: '$FormDataObj' } } ])
db.instance.aggregate([{ $addFields: { FormDataObj: { $arrayToObject: { $map: { input: '$FormData', as: 'kv', in: ['$$kv.key', '$$kv.value'] } } } } }, { $addFields: { TempFormData: { $objectToArray: '$FormDataObj' } } }, { $addFields: { replaceObj: { $arrayToObject: { $map: { input: { $concatArrays: ['$TempFormData', { $objectToArray: '$$ROOT' } ] }, as: 'kv', in: ['$$kv.k', '$$kv.v'] } } } } }, { $replaceRoot: { newRoot: '$replaceObj' } }, { $project: { FormDataObj: 0, TempFormData: 0 } } ])
// $mergeObjects 版本要求比较高,当时没有用这个 db.instance.aggregate([{ $replaceRoot: { newRoot: { $mergeObjects: [{ $cond: [{ $isArray: '$FormData' }, { $arrayToObject: { $map: { input: '$FormData', as: 'kv', in: ['$$kv.key', '$$kv.value'] } } }, {} ] }, '$$ROOT' ] } } } ])
db.instance.aggregate([{ $replaceRoot: { newRoot: { $arrayToObject: { $concatArrays: [{ $cond: [{ $isArray: '$FormData' }, { $map: { input: '$FormData', as: 'kv', in: { 'k': '$$kv.key', 'v': '$$kv.value' } } }, []] }, { $objectToArray: '$$ROOT' } ] } } } } ])
还是先介绍一下业务场景:一个模块的列表展示数据,首先有一些必要的过滤条件(例如,上面提到的层级参数:哪一个模块,哪一个应用,哪一个企业等);之后企业可能给不同的角色或者用户分配不同的权限(每个权限包括数据筛选条件、操作按钮等),用户可能拥有多个不同的权限。此时就用到了或查询,我们要对用户的权限取并集。到这里还不是特别麻烦,麻烦的是我们还想知道查询到的每一条记录命中了哪一个权限,只有这样我们才能知道用户针对某一条记录有哪些操作权限(导出、打印、编辑、删除等)。
// $facet管道中的每个key(这里是a和b)即是一个权限的唯一标识 db.instance.aggregate([{ $match: { ModuleId: '507048044944691000' } }, { $facet: { a: [{ $match: { _id: { $ne: '' } } } ], b: [{ $match: { CreateUserId: 'user10007' } } ] } }, { $addFields: { 'a.PowerKey': 'a', 'a.PowerKey': 'a' } }, { $project: { FacetItems: { $concatArrays: ['$$ROOT.a', '$$ROOT.b'] } } }, { $unwind: '$FacetItems' }, { $replaceRoot: { newRoot: '$FacetItems' } }, { $group: { _id: '$_id', GroupItems: { $push: '$$ROOT' } } }, { $addFields: { ReplaceItem: { $arrayElemAt: ['$GroupItems', 0] } } }, { $addFields: { 'ReplaceItem.PowerKeys': { $map: { input: '$GroupItems', as: 'tempItem', in: '$$tempItem.PowerKey' } } } }, { $replaceRoot: { newRoot: '$ReplaceItem' } }, { $project: { PowerKey: 0 } }, { $addFields: { FormDataObj: { $arrayToObject: { $map: { input: '$FormData', as: 'field', in: ['$$field.key', '$$field.value'] } } } } }, { $addFields: { TempFormData: { $objectToArray: '$FormDataObj' } } }, { $addFields: { FormValueObj: { $arrayToObject: { $map: { input: { $concatArrays: ['$TempFormData', { $objectToArray: '$$ROOT' } ] }, as: 'kv', in: ['$$kv.k', '$$kv.v'] } } } } }, { $replaceRoot: { newRoot: '$FormValueObj' } }, { $project: { FormDataObj: 0, TempFormData: 0 } }, { $sort: { CreateDate: -1 } }, { $skip: 0 }, { $limit: 30 } ], { allowDiskUse: true, collation: { locale: 'zh@collation=gb2312han' } })
随着时间的推移,各种各样的模块越来越多,模块的数据也在不断增长,于是查询就慢了……除了速度变慢之外还有一个问题就是:如果
PlanExecutor error during aggregation :: caused by :: document constructed by $facet is 104857980 bytes, which exceeds the limit of 104857600 bytes
先解决
db.instance.aggregate([{ $match: { ModuleId: '507048044944691000' } }, { $facet: { a: [{ $match: { _id: { $ne: '' } } }, { $project: { _id: 1, CreateDate: 1, PowerKey: 'a' } }, { $sort: { CreateDate: -1 } }, { $limit: 30 } ], b: [{ $match: { CreateUserId: 'user10007' } }, { $project: { _id: 1, CreateDate: 1, PowerKey: 'b' } }, { $sort: { CreateDate: -1 } }, { $limit: 30 } ] } }, { $addFields: { FacetItems: { $setUnion: ['$$ROOT.a', '$$ROOT.b'] } } }, { $unwind: '$FacetItems' }, { $replaceRoot: { newRoot: '$FacetItems' } }, { $group: { _id: '$_id', PowerKeys: { $push: '$$ROOT.PowerKey' } } }, { $lookup: { from: 'instance', localField: '_id', foreignField: '_id', as: 'LookupItems' } }, { $unwind: '$LookupItems' }, { $addFields: { 'LookupItems.PowerKeys': '$PowerKeys' } }, { $replaceRoot: { newRoot: '$LookupItems' } }, { $replaceRoot: { newRoot: { $arrayToObject: { $concatArrays: [{ $cond: [{ $isArray: '$FormData' }, { $map: { input: '$FormData', as: 'kv', in: { 'k': '$$kv.key', 'v': '$$kv.value' } } }, []] }, { $objectToArray: '$$ROOT' } ] } } } }, { $sort: { CreateDate: -1 } }, { $skip: 0 }, { $limit: 30 } ], { allowDiskUse: true, collation: { locale: 'zh@collation=gb2312han' } })
已测试这个语句,懵了,太慢了。全表总共333万,ModuleId === '507048044944691000'下有37万。耗时:2.5分钟。忍不了啊!谁都忍不了啊!没想到将
// 将下面的代码,添加到$facet前面即可 { $project: { _id: 1, '1632378042062': { $arrayElemAt: [{ $reduce: { input: "$FormData", initialValue: [], in: { '$concatArrays': ['$$value', { '$cond': [{ '$eq': ['$$this.key', '1632378042062'] }, ['$$this.value'], []] } ] } } }, 0] } } }
关联自身查询除了上面方式,还有另外一种方式,这里记录一下:
// 下面的代码可以替换上面$lookup和之后三个管道 [{ $lookup: { from: 'instance', localField: '_id', foreignField: '_id', as: 'LookupItems' } }, { $project: { PowerKeys: 1, ReplaceRootItem: { $arrayElemAt: ['$LookupItems', 0] } } }, { $addFields: { 'ReplaceRootItem.PowerKeys': '$PowerKeys' } }, { $replaceRoot: { newRoot: '$ReplaceRootItem' } } ]
db.instance.aggregate([{ $match: { ModuleId: '507048044944691000' } }, { $match: { $or: [{ _id: { $ne: '' } }, { CreateUserId: 'user10007' } ] } }, { $replaceRoot: { newRoot: { $arrayToObject: { $concatArrays: [{ $cond: [{ $isArray: '$FormData' }, { $map: { input: '$FormData', as: 'kv', in: { 'k': '$$kv.key', 'v': '$$kv.value' } } }, []] }, { $objectToArray: '$$ROOT' } ] } } } }, { $sort: { CreateDate: -1 } }, { $skip: 0 }, { $limit: 30 }, { $facet: { a: [{ $match: { _id: { $ne: '' } } }, { $addFields: { PowerKey: 'a' } } ], b: [{ $match: { CreateUserId: 'user10007' } }, { $addFields: { PowerKey: 'b' } } ] } }, { $addFields: { FacetItems: { $setUnion: ['$$ROOT.a', '$$ROOT.b'] } } }, { $unwind: '$FacetItems' }, { $replaceRoot: { newRoot: '$FacetItems' } }, { $group: { _id: '$_id', PowerKeys: { $push: '$$ROOT.PowerKey' }, GroupItems: { $push: '$$ROOT' } } }, { $addFields: { 'GroupItems.PowerKeys': '$PowerKeys' } }, { $replaceRoot: { newRoot: { $first: '$GroupItems' } } }, { $project: { PowerKey: 0 } } ], { allowDiskUse: true, collation: { locale: 'zh@collation=gb2312han' } })
更新相关
上面的查询章节最新层级配置查询中
为了实现上面需求有一下几种实现方式:
db.config.find({}).forEach(dr => { dr.ListConfig._id = ISODate().valueOf().toString(); dr.ListConfig.name = "默认列表"; db.config.update({ _id: dr._id }, { $set: { ListConfigs: [dr.ListConfig] } }); });
这种方式太慢了,总共有4万条数据,更新时间将近半个小时……本来想试试
// 这里可能需要权限验证 db.config.aggregate([{ $addFields: { ListConfigs: [{ $mergeObjects: [{ _id: ISODate().valueOf().toString(), name: '默认列表' }, '$ListConfig'] } ] } }, { $out: 'config' } ])
这种方式太牛了,同样的数据不到一分钟就更新完了……但是来了一个新问题,我们mongodb的版本(3.4.6)太低了,不支持
// 这里可能需要权限验证 db.config.aggregate([{ $addFields: { ListConfigCopy: '$ListConfig' } }, { $addFields: { 'ListConfigCopy._id': ISODate().valueOf().toString(), 'ListConfigCopy.name': '默认列表' } }, { $addFields: { ListConfigs: ['$ListConfigCopy'] } }, { $project: { ListConfigCopy: 0 } }, { $out: 'config' } ])
// 这里可能需要权限验证 db.config.aggregate([{ $addFields: { ListConfigs: [{ $arrayToObject: { $setUnion: [{ $objectToArray: { _id: ISODate().valueOf().toString(), name: '默认列表' } }, { $objectToArray: '$ListConfig' } ] } } ] } }, { $out: 'config' } ])
这个批量操作支持的很全面:新增、修改、替换、删除……详见:Bulk Write Operations — MongoDB Manual
db.config.bulkWrite([{ insertOne: { "document": { "ModelId": "d", "Level1": null, "Level2": null, "Level3": null, "Config": "", "CreateDate": new ISODate() } } }, { insertOne: { "document": { "ModelId": "d", "Level1": "lv1_1", "Level2": null, "Level3": null, "Config": "", "CreateDate": new ISODate() } } }, { updateOne: { "filter": { "_id": "1" }, "update": { $set: { "ModelId": "d", "Level1": "lv1_1", "Level2": "lv2_1", "Level3": null, "Config": "", "CreateDate": new ISODate() } } } }, { replaceOne: { "filter": { "_id": "2" }, "replacement": { "ModelId": "d", "Level1": "lv1_1", "Level2": "lv2_1", "Level3": "lv3_1", "Config": "", "CreateDate": new ISODate() } } }, { deleteMany: { "filter": { "_id": { $in: ["3", "4"] } } } } ]);