一、背景
最近手头上的项目使用mongoDB存储物联网设备采集上来的实时数据,增删改查与传统关系数据库差别很大,开发过程中也踩了不少坑,记录下来供有需要的朋友参考。
二、概述
MongoTemplate是由org.springframework.data.mongodb.core包提供一个Java类。它提供了一组用于与MongoDB交互的丰富特性,并充当Spring的MongoDB支持的中心类。此外,MongoTemplate是线程安全的,可以跨多个实例调用。MongoTemplate类实现了接口MongoOperations,提供了流畅的API进行Query, Criteria, Update等基本操作,此外,也支持泛型的方法实现。使用起来也非常方便,可直接将MongoTemplate作为类中的属性来使用。
三、基本操作
1、Insert操作
public <T> T insert(T objectToSave, String collectionName)
插入一个新的User对象,collectionName作为可选参数,可以指定mongodb的某个collection插入
Useruser=newUser(); user.setName("Tom"); mongoTemplate.insert(user, "user");
2、Save – Insert
这也是开发中最常用的一种操作行为,“保存或更新”,如果数据库中有此id,则执行更新,如果没有,则执行插入操作。
添加新用户:
Useruser=newUser(); user.setName("Albert"); mongoTemplate.save(user, "user");
修改现有用户数据:
user = mongoTemplate.findOne( Query.query(Criteria.where("name").is("Jack")), User.class); user.setName("Jim"); mongoTemplate.save(user, "user");
在上面的示例,save使用了update的语义,因为我们更新现存用户信息。
3、UpdateFirst
updateFirst更新与查询匹配的第一条记录。
例如:
数据库中有两条同名记录
[ { "_id" : ObjectId("55b5ffa5511fee0e45ed614b"), "name" : "Alex" }, { "_id" : ObjectId("55b5ffa5511fee0e45ed614c"),
"name" : "Alex" } ]
当我们执行updateFirst
Query query = new Query(); query.addCriteria(Criteria.where("name").is("Alex"));
Update update = new Update(); update.set("name", "James"); mongoTemplate.updateFirst(query, update, User.class);
第一条记录将被更新。
4、UpdateMulti
UpdateMulti更新与给定查询匹配的所有数据记录
Query query = new Query(); query.addCriteria(Criteria.where("name").is("Eugen"));
Update update = new Update(); update.set("name", "Victor"); mongoTemplate.updateMulti(query, update, User.class);
执行之后数据库中的现有对象都将被更新。
5、FindAndModify
跟updateMulti类似,但是它能返回修改对象之前的数据。
Query query = new Query(); query.addCriteria(Criteria.where("name").is("Markus"));
Update update = new Update(); update.set("name", "Nick");
User user = mongoTemplate.findAndModify(query, update, User.class);
返回的用户对象具有与数据库中初始状态相同的值。
6、Upsert
如果记录存在,则更新它,否则通过结合查询和更新对象创建一个新记录。
Query query = new Query(); query.addCriteria(Criteria.where("name").is("Markus"));
Update update = new Update(); update.set("name", "Nick"); mongoTemplate.upsert(query, update, User.class);
7、Remove
删除数据
mongoTemplate.remove(user, "user");
8、findAll(className) OR findAll(className, collectionName)
以上两种方法从数据库中获取List格式的数据。这里T是类名。如果类名和集合名都相同,则使用findAll(T.class),否则使用findAll(T.class, " collectionName ")。
List<User> list = mongoTemplate.findAll(User.class);
9、findById(id, entityClass) OR findById(id, entityClass, collectionName)
我们使用此方法从数据库集合中使用PK(ID)获取数据。如果Id存在,则将JSON Document转换为Object,否则返回null(表示没有对象)。
User user= mongoTemplate.findById(1, User.class);
10、findAndRemove(query, entityClassName)
删除实际从查询条件中获取的数据
代码语言:javascript复制Query query= new Query();
query.addCriteria(Criteria.where("cost").is(1749.0));
mongoTemplate.findAndRemove(query, Book.class);
findAllAndRemove(query, entityClassName)
批量删除实际从查询条件中获取的数据
代码语言:javascript复制Query query= new Query();
代码语言:javascript复制query.addCriteria(Criteria.where("cost").gte(1000.0));
mongoTemplate.findAllAndRemove(query, Book.class);
11、总结:
四、数据查询
上面的示例中我们使用Query对象来查询数据
1、常用数据查询
Query对象 1、 创建一个query对象(用来封装所有条件对象),再创建一个criteria对象(用来构建条件) 2 、精准条件:criteria.and(“key”).is(“条件”) 模糊条件:criteria.and(“key”).regex(“条件”) 3、封装条件:query.addCriteria(criteria) 4、大于(创建新的criteria):Criteria gt = Criteria.where(“key”).gt(“条件”) 小于(创建新的criteria):Criteria lt = Criteria.where(“key”).lt(“条件”) 5、Query.addCriteria(new Criteria().andOperator(gt,lt)); 6、一个query中只能有一个andOperator()。其参数也可以是Criteria数组。 7、排序 :query.with(new Sort(Sort.Direction.ASC, "age"). and(new Sort(Sort.Direction.DESC, "date")))
常用查询示例:
代码语言:javascript复制if(null == startDate && null == endDate){
query.addCriteria(Criteria.where("sqid").gte(calendar.getTime()).lte(calendar2.getTime()));
}else if(null != startDate && null != endDate){
query.addCriteria(Criteria.where("sqid").gte(startDate).lte(endDate));
}else if(null != startDate){
query.addCriteria(Criteria.where("sqid").gte(startDate.toInstant()));
}else if(null != endDate){
query.addCriteria(Criteria.where("sqid").lte(endDate.toInstant()));
}
if(!StringUtils.isEmpty(equipmentId)){
query.addCriteria(Criteria.where("equipmentID").is(equipmentId));
}
使用count查询记录数:
long count = mongoTemplate.count(new Query().with(new Sort(
Sort.Direction.ASC, "username")), User.class);
代码语言:javascript复制组合条件查询:
Query query = new Query(Criteria
.where("username").is(student.getUsername())
.and("gender").is(student.getGender())
.and("age").gt(student.getAge()));
List<Student> students = mongoTemplate.find(query, Student.class);
模糊查询(模糊查询以 ^开始 以$结束 .*相当于Mysql中的%):
String regex = String.format("%s%s%s", "^.*", username, ".*$");
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
Query query = new Query(Criteria.where("username").regex(pattern));
List<Student> students = mongoTemplate.find(query, Student.class);
2、使用游标MongoCursor查询数据
因为项目中存储的是每秒一条的实时数据,数据量很大,特定的业务需求
会遍历表中的数据。刚开始是先查询总数,然后再根据总数进行分页查询,
如果数据量特别大,查询到后面的页会越来越慢。
我们使用游标来实现在mongoDB海量数据的查询。
MongoCursor<Document> iterator = mongoTemplate.getCollection(equipment.getEquipmentCode()).find(query.getQueryObject()).noCursorTimeout(true).batchSize(BATCH_SIZE).iterator();
MongoWits58 lastEntity = null;
MongoCursor iterator1 = mongoTemplate.getCollection(equipment.getEquipmentCode()).find(query.getQueryObject()).iterator();while (iterator.hasNext()){
Document document = (Document)iterator.next();
//
if(lastEntity == null){
****
}else {
****
lastEntity = entity;
}
//if(taskExist())
}
cursor
cursor的获取:find() 方法返回的是一个 FindIterable 对象,对此对象的控制即可对 cursor 的属性进行控制,根据FindIterable获取一个Cursor 。
1)batchSize(int size):每次网络请求返回的document条数,比如你需要查询500条数据,mongodb不会一次性全部load并返回给client,而是每次返回batchSize条,遍历完之后后再通过网路IO获取直到cursor耗尽。默认情况下,首次批量获取101个document或者1M的数据,此后每次4M,当然我们可以通过此方法来覆盖默认值,如果文档尺寸较小,则建议batchSize可以大一些。
2)skip(int number)、limit(int number):同SQL中的limit字句,即表示在符合匹配规则的结果集中skip一定数量的document,并最终返回limit条数据。可以实现分页查询。
3)maxTime(int time,TimeUnit unit):表示此次操作保持的最长时间,即server端保持cursor状态的最长时间,如果超时server端将移除此cursor,即再次通过此cursor遍历数据将会error。
4)sort(Bson bson):根据指定field排序,参与排序的字段最好是索引,如果不是,将会在内存中排序,如果参与排序的数据尺寸大于32M,将会抛出error。1表示正序,-1表示倒叙,比如"age":1表示按照age正序排序。
5)noCursorTimeout(boolean timeout):如果cursor空闲一定时间后(10分钟),server端是否将其移除,默认为false,即server会将空闲10分钟的cursor移除以节约内存。如果为true,则表示server端不需要移除空闲的cursor,而是等待用户手动关闭。无论如何,开发者都需要注意,手动关闭cursor。
6)partial(boolean partial):对于sharding集群,如果一个或者多个shard不可达,是否允许返回部分数据(只从正常的shard中获取数据)。
7)cursorType():指定cursor类型,当cursor遍历完毕后是否关闭cursor,默认是关闭,无论何时都建议手动关闭cursor(不管是否耗尽curosr);当然有些开发场景可能需要保持cursor的活性,遍历到cursor的最后一条后,不关闭cursor,继续等待,此后一段时间内如果有新数据插入到cursor之后,则可以继续遍历,这就是Tailable Cursor,通常对于Capped Collection中使用。目前支持支持3种类型的Cursor:NonTailable、Tailable、TailableAwait。
五、MongoDB 聚合查询
MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果。
管道在Unix和Linux中一般用于将当前命令的输出结果作为下一个命令的参数。
MongoDB的聚合管道将MongoDB文档在一个管道处理完毕后将结果传递给下一个管道处理。管道操作是可以重复的。
表达式:处理输入文档并输出。表达式是无状态的,只能用于计算当前聚合管道的文档,不能处理其它的文档。
聚合管道的每个阶段在文档通过时对文档进行转换。输入文档经过一个阶段后,它不一定会产生一个输出文档。有些阶段可能生成多个文档作为输出。
$project
对输入中的记录进行再次投影,按照我们需要的格式生成结果集。例如,通过添加新字段或删除现有字段。对于每个输入数据,只有一个输出。
$match
按匹配过滤记录,只允许匹配的记录未经修改地传递到下一个管道阶段。对于每个输入,输出要么是一个记录(匹配),要么是0个(不匹配)。
$group
按指定的标识符表达式对输入文档进行分组,并对每个组应用累加器表达式(如果指定了)。$group使用所有输入文档,并为每个不同的组输出一个文档。输出文档只包含标识符字段(组id),如果指定,则包含累计字段。
$sort
按指定的排序对文档流重新排序。一个输入一个输出。
$skip
跳过前n个文档(其中n是指定的跳过号),并将剩余的文档未经修改地传递给管道。对于每个输入文档,输出要么是零文档(对于前n个文档),要么是一个文档(在前n个文档之后)$limit
将前n个未修改的文档传递到n为指定限制的管道。对于每个输入文档,输出要么是一个文档(对于前n个文档),要么是0个文档(在前n个文档之后).
$unwind
将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值
下表展示了一些聚合的表达式:
示例: operations.add(Aggregation.match(Criteria.where("course").is(studentScore.getCourse()))); operations.add(Aggregation.group("course").sum("score").as("totleScore")); } Aggregation aggregation = Aggregation.newAggregation(operations); //查询、并获取结果 AggregationResults<StudentScore> results = mongoTemplate.aggregate( aggregation, "studentScore", StudentScore.class); double totleScore = results.getUniqueMappedResult().getTotleScore(); 5、总结 本文记录个人学习使用MongoTemplate操作MongoDB一些基本的语句,使用过程中还发现需要注意的一些问题: mongodb返回数据过大,查询报错,一次性查出N条数据并进行 sort 排序,然后在使用Java代码查询时候, 直接抛出了异常 。 建立索引时间过长,MongoDB 提供了两种建索引的访问,一种是 background 方式,不需要长时间占用写锁,另一种是非 background 方式,需要长时间占用锁。使用 background 方式就可以解决问题。 日期格式问题 mongodb的日期时间格式是UTC时间,中国时间 = UTC时间 8