业务场景:
社区作为一个轻互动论坛,必不可少会涉及很多计数统计,比如个人主页要展示用户相关一些计数(他的粉丝数、发贴数),feed信息流会展示每个贴子查看数、点赞数和评论数。如果每个计数都作为一个独立的个体,独立管理,每次用的时候就要从不同地方去取,维护成本肯定很高。
为了更好的管理,从页面功能角度对这些计数做了分类,分为用户维度、贴子维度、小组维度,而每个维度下又有各自的子项计数。
- 用户维度(关注数、粉丝数、发贴数、评论数、铜钱数、订阅小组数)
- 贴子维度(查看数、评论数、点赞数、收藏数)
- 小组相关(订阅数、贴子数、精华数)
- 消息相关(回复与&、赞、新粉丝数、私信往来的用户)
以用户维度的计数为例:
数据最终是持久到mysql存储,为了提升性能,中间会有一层redis,使用Hash数据结构
1)写操作
任何一个子项动作都会触发缓存的写操作,比如A关注了B,对于A用户来讲,A的关注数要增加1,其它子项计数不变;对于B用户来讲,B的粉丝数要增加1,其它子项计数不变。
public void updateMemberCount(MemberEditCountParam memberEditCountParam) {
MemberCountUpdateParam updateParam = new MemberCountUpdateParam();
updateParam.setUid(memberEditCountParam.getUid());
updateParam.setCopperCount(memberEditCountParam.getCopperCount());
updateParam.setPostCount(memberEditCountParam.getPostCount());
updateParam.setFollowCount(memberEditCountParam.getFollowCount());
updateParam.setFansCount(memberEditCountParam.getFansCount());
updateParam.setReplyCount(memberEditCountParam.getReplyCount());
updateParam.setTags(memberEditCountParam.getTagCount());
memberCountDao.updateMemberCount(updateParam);
memberCacheManager.editMemberCount(updateParam);
}
先写DB,再写cache。对于DB,通过mysql自身的行锁机制解决数据并发问题,‘posts = posts + #{postCount}’。但对于cache的维护,比较麻烦一些
常规思路:
value = hincrBy(String key, String field, long value) ;
|
|
|
|
if value ==1 , del cache
- 当缓存为空时,hincrBy会返回1;
- 如果用户某一个子项的计数为0,即使预热到cache,hincrBy 也会返回1。
- 所以根据是否等于1决定del cache,在用户量很大的情况下,cache的效率会比较差(只有所有的子项都大于0,且已预热到cache里才能避开这种情况)。另外如果 del cache 操作完成之前有读操作,返回的可能是脏数据
解决思路:
引入一个较大阈值,区分cache为空还是已经预热但值为0的情况
protected long incrCount(String hashName, String member, long by) {
long newValue = 0;
try {
// 线程安全
newValue = getRedisClient().hincrBy(hashName, member, by);
} catch (RedisException e) {
logger.error("[AbstractCacheManager.incrCount.hincrBy] invoke error!", e);
}
//COUNT_OFFSET为一个并发数,可以设定一个较大值,为了解决初始时cache为空,预热cache时又有其它
并发请求干扰带来的脏数据
if (newValue >= COUNT_OFFSET) {
return newValue - COUNT_OFFSET;
} else {
try {
getRedisClient().del(hashName);
} catch (RedisException e) {
logger.error("[AbstractCacheManager.incrCount.del] invoke error!", e);
}
return Long.MIN_VALUE;
}
}
注意:
- 删除或者初始化cache,将key作为一个整体(非单个子项)来操作,减少复杂性
- 为了便于维护,cache的预热放在查询阶段,如果cache为空,预热到cache中。
2)读操作
批量查多个用户的用户维度计数。
public List<SimpleMemberCountModel> batchGetSimpleMemberCountModel(List<Long> uidList) {
if (CollectionUtils.isEmpty(uidList)) {
return Collections.emptyList();
}
List<String> keyList = uidList.stream()
.map(this::getKeyMemberCount).collect(Collectors.toList());
// key:用户,value:不同子项的计数
Map<String, Map<String, Long>> countMaps = super.batchGetCountMap(keyList);
List<SimpleMemberCountModel> countModelList = Lists.newArrayListWithCapacity(countMaps.size());
for (int i = 0; i < uidList.size(); ++i) {
Map<String, Long> countMap = countMaps.get(keyList.get(i));
if (countMap != null) {
final Long fansCount = countMap.get(FIELD_FANS_COUNT);
final Long postCount = countMap.get(FIELD_POST_COUNT);
final Long followCount = countMap.get(FIELD_FOLLOW_COUNT);
final Long replyCount = countMap.get(FIELD_REPLY_COUNT);
final Long tags = countMap.get(FIELD_TAGS_COUNT);
if (fansCount != null && postCount != null && followCount != null && replyCount != null && tags != null) {
SimpleMemberCountModel countModel = new SimpleMemberCountModel();
Long uid = uidList.get(i);
countModel.setUid(uid);
countModel.setFansCount(fansCount);
countModel.setPostcount(postCount);
countModel.setFollowCount(followCount);
countModel.setReplyCount(replyCount);
countModel.setTagCount(tags);
countModelList.add(countModel);
}
}
}
return countModelList;
}
如果缓存为空,从DB查询数据并预热到缓存中。
public void batchSaveSimpleMemberCountModel(List<SimpleMemberCountModel> countModelList) {
Map<String , Map<String, Long>> batchCountMap = Maps.newHashMapWithExpectedSize(countModelList.size());
countModelList.forEach(countModel -> {
String hash = getKeyMemberCount(countModel.getUid());
Map<String, Long> countMap = Maps.newHashMapWithExpectedSize(6);
countMap.put(FIELD_UID, countModel.getUid());
countMap.put(FIELD_FANS_COUNT, countModel.getFansCount());
countMap.put(FIELD_POST_COUNT, countModel.getPostcount());
countMap.put(FIELD_FOLLOW_COUNT, countModel.getFollowCount());
countMap.put(FIELD_REPLY_COUNT, countModel.getReplyCount());
countMap.put(FIELD_TAGS_COUNT, countModel.getTagCount());
batchCountMap.put(hash, countMap);
});
// 缓存有效期12个小时
super.batchSaveCountHash(batchCountMap, HALF_DAY_EXPIRE);
}
//初始化计数器hash
protected void batchSaveCountHash(Map<String, Map<String, Long> >batchCountMap, int ttl) {
if (batchCountMap == null || batchCountMap.size() == 0) {
return;
}
Map<Node, List<String>> nodeMap = getNodeMap(batchCountMap.keySet());
Map<String, Map<String, String>> batchStrMap = Maps.newHashMapWithExpectedSize(batchCountMap.size());
batchCountMap.forEach((key, countMap) -> {
//组装map
Map<String, String> strMap = Maps.newHashMap();
//缓存里存的计数是实际计数+计数器基数
countMap.forEach((field, count) -> strMap.put(field, String.valueOf(count + COUNT_OFFSET)));
batchStrMap.put(key, strMap);
});
//对于同一个节点上的map, 批量执行
nodeMap.forEach((node, keyList) -> {
try {
getRedisClient().pipelined(new PipelineBlock() {
@Override
public void execute() {
keyList.forEach(key -> {
Map<String, String> strMap = batchStrMap.get(key);
//先删除key, 再设置每一个field, 最后设置超时时间
try {
del(key);
hmset(key, strMap);
expire(key, ttl);
} catch (RedisException e) {
logger.error("[AbstractCacheManager.batchSaveCountHash.execute] invoke error!", e);
}
});
}
@Override
public Node getTargetNode() {
return node;
}
});
} catch (RedisException e) {
logger.error("[AbstractCacheManager.batchSaveCountHash] invoke error!", e);
}
});
}
注意:时间久了,缓存中的数据可能会存在与DB不一致情况,目前设置的缓存有效期是12个小时,然后缓存会失效,需再次从数据库同步数据
3)计数的准确性
时间久了,数据库表里的计数值也未必准确,比如关注数,每天凌晨会有定时任务,借助关注表计算准确的数字更新到数据库和缓存中。