基于Morphia实现MongoDB按小时、按天聚合操作方法
MongoDB按照天数或小时聚合
需求
最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
涉及到的技术栈分别为:SpringBoot,MongoDB,Morphia.
数据模型
@Data
@Builder
@Entity(value="rawDevStatus",noClassnameStored=true)
//设备状态索引
@Indexes({
//设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作)
@Index(fields=@Field("time"),options=@IndexOptions(expireAfterSeconds=3600*24*72)),
@Index(fields={@Field("userId"),@Field(value="time",type=IndexType.DESC)})
})
publicclassRawDevStatus{
@Id
@JsonProperty(access=JsonProperty.Access.WRITE_ONLY)
privateObjectIdobjectId;
privateStringuserId;
privateInstanttime;
@Embedded("points")
ListprotocolPoints;
@Data
@AllArgsConstructor
publicstaticclassPoint{
/**
*协议类型
*/
privateProtocolprotocol;
/**
*设备总数
*/
privateIntegertotal;
/**
*设备在线数目
*/
privateIntegeronlineNum;
/**
*处于启用状态设备数目
*/
privateIntegerenableNum;
}
}
上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.
@Data
@Builder
@Entity(value="aggregationDevStatus",noClassnameStored=true)
@Indexes({
@Index(fields=@Field("expireAt"),options=@IndexOptions(expireAfterSeconds=0)),
@Index(fields={@Field("userId"),@Field(value="time",type=IndexType.DESC)})
})
publicclassAggregationDevStatus{
@Id
@JsonProperty(access=JsonProperty.Access.WRITE_ONLY)
privateObjectIdobjectId;
/**
*用户ID
*/
privateStringuserId;
/**
*设备总数
*/
privateDoubletotal;
/**
*设备在线数目
*/
privateDoubleonlineNum;
/**
*处于启用状态设备数目
*/
privateDoubleenableNum;
/**
*聚合类型(按照小时还是按照天聚合)
*/
@Property("aggDuration")
privateAggregationDurationaggregationDuration;
privateInstanttime;
/**
*动态设置文档过期时间
*/
privateInstantexpireAt;
}
上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.
聚合操作符介绍
聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.
此次聚合主要涉及以下操作:
•$project:指定输出文档中的字段.
•$unwind:拆分数据中的数组;
•match:选择要处理的文档数据;
•group:根据key分组聚合结果.
原始聚合语句
db.getCollection('raw_dev_status').aggregate([
{$match:
{
time:{$gte:ISODate("2019-06-27T00:00:00Z")},
}
},
{$unwind:"$points"},
{$project:
{
userId:1,points:1,
tmp:{$dateToString:{format:"%Y:%m:%dT%H:00:00Z",date:"$time"}}
}
},
{$project:
{
userId:1,points:1,
groupTime:{$dateFromString:{dateString:"$tmp",format:"%Y:%m:%dT%H:%M:%SZ",}}
}
},
{$group:
{
_id:{user_id:'$userId',cal_time:'$groupTime'},
devTotal:{'$avg':'$points.total'},
onlineTotal:{'$avg':'$points.onlineNum'},
enableTotal:{'$avg':'$points.enableNum'}
}
},
])
上述代码是按小时聚合数据,以下来逐步介绍处理思路:
(1)$match
根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.
(2)$unwind
raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;
(3)$project
{$project:
{
userId:1,points:1,
tmp:{$dateToString:{format:"%Y:%m:%dT%H:00:00Z",date:"$time"}}
}
}
选择需要输出的数据,分别为:userId,points以及tmp.
需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.
如果需要按天聚合,则format数据可修改为:%Y:%m:%dT00:00:00Z即可满足要求.
(4)$project
{$project:
{
userId:1,points:1,
groupTime:{$dateFromString:{dateString:"$tmp",format:"%Y:%m:%dT%H:%M:%SZ",}}
}
}
因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.
(5)$group
对聚合结果进行分类操作,并生成最终输出结果.
{$group:
{
#根据_id进行分组操作,依据是`user_id`以及`$groupTime`
_id:{user_id:'$userId',cal_time:'$groupTime'},
#求设备总数平均值
devTotal:{'$avg':'$points.total'},
#求设备在线数平均值
onlineTotal:{'$avg':'$points.onlineNum'},
#...
enableTotal:{'$avg':'$points.enableNum'}
}
}
代码编写
此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.
/**
*创建聚合条件
*
*@parampastTime过去时间段
*@paramdateToString格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
*@return聚合条件
*/
privateAggregationPipelinecreateAggregationPipeline(InstantpastTime,StringdateToString,StringstringToDate){
Queryquery=datastore.createQuery(RawDevStatus.class);
returndatastore.createAggregation(RawDevStatus.class)
.match(query.field("time").greaterThanOrEq(pastTime))
.unwind("points",newUnwindOptions().preserveNullAndEmptyArrays(false))
.match(query.field("points.protocol").equal("ALL"))
.project(Projection.projection("userId"),
Projection.projection("points"),
Projection.projection("convertTime",
Projection.expression("$dateToString",
newBasicDBObject("format",dateToString)
.append("date","$time"))
)
)
.project(Projection.projection("userId"),
Projection.projection("points"),
Projection.projection("convertTime",
Projection.expression("$dateFromString",
newBasicDBObject("format",stringToDate)
.append("dateString","$convertTime"))
)
)
.group(
Group.id(Group.grouping("userId"),Group.grouping("convertTime")),
Group.grouping("total",Group.average("points.total")),
Group.grouping("onlineNum",Group.average("points.onlineNum")),
Group.grouping("enableNum",Group.average("points.enableNum"))
);
}
/**
*获取聚合结果
*
*@parampipeline聚合条件
*@return聚合结果
*/
privateListgetAggregationResult(AggregationPipelinepipeline){
Liststatuses=newArrayList<>();
IteratorresultIterator=pipeline.aggregate(
AggregationMidDevStatus.class,AggregationOptions.builder().allowDiskUse(true).build());
while(resultIterator.hasNext()){
statuses.add(resultIterator.next());
}
returnstatuses;
}
//......................................................................................
//获取聚合结果(省略若干代码)
AggregationPipelinepipeline=createAggregationPipeline(pastTime,dateToString,stringToDate);
ListmidStatuses=getAggregationResult(pipeline);
if(CollectionUtils.isEmpty(midStatuses)){
log.warn("Cannotgetdevstatusaggregationresult.");
return;
}
总结
以上所述是小编给大家介绍的基于Morphia实现MongoDB按小时、按天聚合操作方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对毛票票网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!