在IM系统的消息交互设计中,我们支持了文件,视频,图片,语音等消息类型,并支持已发送消息的回复和跳转以及消息的点赞和点踩,用户可以艾特群成员,并弹出消息提醒对应的群成员有艾特消息。用户还可以在时间线展示的情况下查看历史消息列表,以及对消息进行撤回操作。

支持多类型消息-文件、视频、图片、语音

在用户发送消息界面,用户可以发送多种消息类型,比如文件、视频、图片、语音等消息,我们可以将这些消息抽象起来,将它们都看作是文件,并存储在oss的url中。

因此我们只需要添加一个上传的接口,前端只需要记录上传这些类型的消息后得到的url,提交给后端即可。

IM不同的消息类型展示

其中的表设计中,包含两个重要的字段,一个是type,也就是指定消息是什么类型,一个是extra,放置不同类型消息的详情,如果是特别重要的消息,可以通过设置关联表的方式,扩展出去,比如红包类型消息。

IM消息类型所对应的表字段

不同类型的消息在被回复时效果不同,因此可以设计一个策略模式,定义消息的几个不同的策略。在保存消息的时候,先保存基础的消息数据,再根据不同的消息类型保存自己特殊的元素。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    @Override
    @Transactional
    public Long sendMsg(ChatMessageReq request, Long uid) {
        check(request, uid);
        //根据消息类型来获取不同的策略类
        AbstractMsgHandler<?> msgHandler = MsgHandlerFactory.getStrategyNoNull(request.getMsgType());
        Long msgId = msgHandler.checkAndSaveMsg(request, uid);
        //发布消息发送事件
        applicationEventPublisher.publishEvent(new MessageSendEvent(this, msgId));
        return msgId;
    }

    public class MsgHandlerFactory {
        public static final Map<Integer,AbstractMsgHandler> STRATEGY_MAP = new HashMap<>();
        public static void register(Integer code,AbstractMsgHandler strategy){
            STRATEGY_MAP.put(code,strategy);
        }
        public static AbstractMsgHandler getStrategyNoNull(Integer code){
            AbstractMsgHandler strategy = STRATEGY_MAP.get(code);
            AssertUtil.isNotEmpty(strategy, CommonErrorEnum.PARAM_INVALID);
            return strategy;
        }
    }

艾特群成员

由于前端存了一个成员列表库,可以将其作为艾特好友的数据源,用户艾特一次只需要拉取一次群成员列表。

后续的更新都依赖于群成员列表上下线的推送,以及发送新消息的人,都会触发前端用户库的更新,这样库中就含有所有活跃成员的信息,前端可以自行根据用户库去做名称匹配,本地运行速度也更快。

我们可以通过艾特列表,消息页面,成员页面去进行好友的艾特。前端艾特后,保存一个被艾特的所有好友uidlist,传给后端,这样后端就非常方便。

我们的消息展示比较简单,本质就是一串文本,不像qq那样能够反解析出用户信息。如果要反解析的话也容易,只需要根据返回的展示结果,给出艾特成员的uid和用户名,这样前端就能很容易地去匹配出艾特用户,高亮出用户信息。

消息列表交互

消息发送

我们统一将消息推送,消息列表,消息发送3个功能都复用消息详情展示方法getMsgRespBatch。在发消息时,需要用户态,通过拦截器解析出uid,也就是RequestHolder.get().getUid()获得。

在发送消息时,调用service的sendMsg消息发送,结束后返回消息id,然后复用消息展示的接口getMsgResp将消息组装成前端可展示的消息对象返回给前端,这样用户发完消息后就能够通过接口直接快速看见自己的消息,不用等待后台的主动推送了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    @Override
    public ChatMessageResp getMsgResp(Message message, Long receiveUid) {
        return CollUtil.getFirst(getMsgRespBatch(Collections.singletonList(message), receiveUid));
    }

    private List<ChatMessageResp> getMsgRespBatch(List<Message> messages, Long receiveUid) {
        if (CollectionUtil.isEmpty(messages)) {
            return new ArrayList<>();
        }
        //查询消息标志
        List<MessageMark> msgMark = messageMarkDao.getValidMarkByMsgIdBatch(messages.stream().map(Message::getId).collect(Collectors.toList()));
        return MessageAdapter.buildMsgResp(messages, msgMark, receiveUid);
    }

    public static List<ChatMessageResp> buildMsgResp(List<Message> messages, List<MessageMark> msgMark,Long receiveUid){
        Map<Long, List<MessageMark>> markMap = msgMark.stream().collect(Collectors.groupingBy(MessageMark::getMsgId));
        return messages.stream().map(a->{
            ChatMessageResp resp = new ChatMessageResp();
            resp.setFromUser(buildFromUser(a.getFromUid()));
            resp.setMessage(buildMessage(a,markMap.getOrDefault(a.getId(),new ArrayList<>()),receiveUid));
            return resp;
        })
                .sorted(Comparator.comparing(a->a.getMessage().getSendTime()))//排好序发给前端
                .collect(Collectors.toList());
    }

消息回复和跳转

在回复消息的时候,本质也是发消息,只不过带上了一个回复消息的id。

这时如何做到点击回复消息,能跳转到原消息的位置呢?

分两种情况讨论

1)原消息已拉取:如果原始消息在之前就已经被客户端拉取加载过了,那点击回复消息的时候就已经获取到回复消息的id了,此时前端直接根据消息id跳转到原消息即可。

2)原消息未拉取:如果原消息不存在客户端中,想要跳转的话,就需要去后端加载历史消息,直到加载出原消息为止。但是重点是不能让原消息无限制的跳转,也就是当用户翻到历史记录中的远古消息,并艾特回复时,大家都去点击原文,这样会直接把服务器拉爆。

我们可以设置一定的规则来判断原消息是否可以跳转。比如两条消息间隔超过多少条时,就不让跳转了。此时又出现了问题,那就是具体啥时候去查间隔条数呢?此时也分两种情况。

1)拉取消息时计算:当用户每次拉取消息列表时,判断如果有回复消息,就去计算间隔条数,就像是读放大场景,当有一千个用户拉取时,就需要算一千次,这种方法对于服务器不友好。

2)发送消息时计算:也就是谁回复,谁计算,计算花费的时间由消息发送者承担,这样只需要计算一次消息间隔,然后记录在数据库中,以后读取消息就不用再计算了,只需要获取数据库中该消息对应的消息间隔数即可,当超过一定条数时,不进行跳转。节省服务器性能。

当获取到原消息id后,如何加载原消息呢?

首先后端会返回消息间隔的条数,前端需要计算与原消息间还有多少条消息未加载,然后直接复用后端的历史消息列表,将size=间隔记录条数为参数,即可直接一次性加载到原消息。

消息推送

用户发送消息,主要是将消息入库,通过发送spring事件的方式,进行一层解耦。

事件的监听者会将消息id组装成前端展示的内容,推送给前端所有在线用户。监听者里面调用了websocket的发送所有在线用户消息的方法,底层通过一个异步线程池的批量发送实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT,classes = MessageSendEvent.class,fallbackExecution = true)
    public void messageRoute(MessageSendEvent event){
        Long msgId = event.getMsgId();
        mqProducer.sendSecureMsg(MQConstant.SEND_MSG_TOPIC,new MsgSendMessageDTO(msgId),msgId);
    }

    /**
     * 发送可靠消息,在事务提交后保证发送成功
     * @param topic
     * @param body
     * @param key
     */
    @SecureInvoke(async = false)
    public void sendSecureMsg(String topic,Object body,Object key){
        Message<Object> build = MessageBuilder
                .withPayload(body)
                .setHeader("KEYS", key)
                .build();
        rocketMQTemplate.send(topic,build);
    }

历史消息列表

用户进入页面,首先会拉取一页最新的消息,访问的就是历史消息列表。

使用游标翻页能接近深翻页的问题,且适合不翻页的情况,也就很适合消息列表的查看。

在我们的项目中,设计了一个用户访问消息列表的接口,入参继承了一个基础的游标翻页对象,并且还有一个参数roomId,用于表明是哪个会话下的消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    @GetMapping("/public/msg/page")
    @ApiOperation("消息列表")
    public ApiResult<CursorPageBaseResp<ChatMessageResp>> getMsgPage(@Valid ChatMessagePageReq request) {
        CursorPageBaseResp<ChatMessageResp> msgPage = chatService.getMsgPage(request, RequestHolder.get().getUid());
        filterBlackMsg(msgPage);
        return ApiResult.success(msgPage);
    }

    public class ChatMessagePageReq extends CursorPageBaseReq {
        @NotNull
        @ApiModelProperty("会话id")
        private Long roomId;
    }    
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Data
@ApiModel("游标翻页请求")
@AllArgsConstructor
@NoArgsConstructor
public class CursorPageBaseReq {

    @ApiModelProperty("页面大小")
    @Min(0)
    @Max(100)
    private Integer pageSize = 10;

    @ApiModelProperty("游标(初始为null,后续请求附带上次翻页的游标)")
    private String cursor;

    //将前端的游标请求转换为内部数据库的请求,将当前的起始页转为分页第一页,相当于limit 0,再向后查一页,实现翻页
    //isSearchCount为false表示不查结果的总数,对于数据量大的情况下有好处
    public Page plusPage() {
        return new Page(1, this.pageSize,false);
    }

    @JsonIgnore
    public Boolean isFirstPage() {
        return StringUtils.isEmpty(cursor);
    }
}

查询方法较为简单,有两个功能,一个是先在数据库中查出一页的消息,然后再加载成前端需要展示的结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    @Override
    public CursorPageBaseResp<ChatMessageResp> getMsgPage(ChatMessagePageReq request, Long receiveUid) {
        //用最后一条消息的id,来限制被踢出的人能看到的最大一条消息
        Long lastMsgId = getLastMsgId(request.getRoomId(), receiveUid);
        // 游标查询原始信息
        CursorPageBaseResp<Message> cursorPage = messageDao.getCursorPage(request.getRoomId(), request, lastMsgId);
        if(cursorPage.isEmpty()){
            return CursorPageBaseResp.empty();
        }
        // 对获取到的消息详情进行组装
        return CursorPageBaseResp.init(cursorPage,getMsgRespBatch(cursorPage.getList(),receiveUid));
    }

1.游标查询:通过游标分页的工具类就很方便的实现这个功能,我们的消息目前都在一个表,主键自增,暂时可以通过唯一的id来做游标查询,再限制会话id,以及限制消息需要是一条正常消息。不需要查询禁用的消息,或撤回的消息。并设置查询消息的方向,是往前面查还是往后面查。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    public CursorPageBaseResp<Message> getCursorPage(Long roomId, CursorPageBaseReq request, Long lastMsgId) {
        return CursorUtils.getCursorPageByMysql(this, request, wrapper->{
            wrapper.eq(Message::getRoomId,roomId);
            wrapper.eq(Message::getStatus, NormalOrNoEnum.NORMAL.getStatus());
            wrapper.le(Objects.nonNull(lastMsgId),Message::getId,lastMsgId);
        },Message::getId);
    }

    public static <T> CursorPageBaseResp<T> getCursorPageByMysql(IService<T> mapper, CursorPageBaseReq request, Consumer<LambdaQueryWrapper<T>> initWrapper, SFunction<T,?> cursorColumn){
    //游标字段类型
    Class<?> cursorType = LambdaUtils.getReturnType(cursorColumn);
    LambdaQueryWrapper<T> wrapper = new LambdaQueryWrapper<>();
    //wrapper是查询的构造对象
    //额外条件,将当前用户uid作为额外条件
    initWrapper.accept(wrapper);
    //游标条件
    if(StrUtil.isNotBlank(request.getCursor())){
        wrapper.lt(cursorColumn,parseCursor(request.getCursor(),cursorType));
    }
    //游标方向
    wrapper.orderByDesc(cursorColumn);
    //plusPage表示查询后一页记录
    Page<T> page = mapper.page(request.plusPage(), wrapper);

    //取出游标,先判断非空,每次翻页都以上次的最后一个记录作为起始游标
    String cursor = Optional.ofNullable(CollectionUtil.getLast(page.getRecords()))
            .map(cursorColumn)
            .map(CursorUtils::toCursor)
            .orElse(null);
    //判断是否最后一页,通过当前查到的记录数是否等于请求的记录数来判断,这样可能会存在当前查询的结果刚好是最后一页,同时还是要查询记录数的倍数。这导致还需要多查一页才能判断是最后一页。
    //可以通过多查一条记录,但不将该记录展示,实现一次查询就能判断是否为最后一页
    Boolean isLast = page.getRecords().size() != request.getPageSize();
    return new CursorPageBaseResp<>(cursor,isLast,page.getRecords());
    }

2.消息组装将消息列表查到的原始数据,再聚合上用户信息消息标记(点赞点踩)消息url识别等前端需要展示的信息就行了。消息列表的展示就很像商品的列表页,也需要加载很多的依赖的资源。如果发现缺啥就一条一条的加载啥其实是效率很低的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ChatMessageResp {
    @ApiModelProperty("发送者消息")
    private UserInfo fromUser;
    @ApiModelProperty("消息详情")
    private Message message;

    @Data
    public static class UserInfo{
        @ApiModelProperty("用户id")
        private Long uid;
    }
    @Data
    public static class Message{
        @ApiModelProperty("消息id")
        private Long id;
        @ApiModelProperty("房间id")
        private Long roomId;
        @ApiModelProperty("消息发送时间")
        private Date sendTime;
        @ApiModelProperty("消息类型 1正常文本 2.撤回消息")
        private Integer type;
        @ApiModelProperty("消息内容不同的消息类型,内容体不同,见https://www.yuque.com/snab/mallcaht/rkb2uz5k1qqdmcmd")
        private Object body;
        @ApiModelProperty("消息标记")
        private MessageMark messageMark;
    }
    @Data
    public static class MessageMark {
        @ApiModelProperty("点赞数")
        private Integer likeCount;
        @ApiModelProperty("该用户是否已经点赞 0否 1是")
        private Integer userLike;
        @ApiModelProperty("举报数")
        private Integer dislikeCount;
        @ApiModelProperty("该用户是否已经举报 0否 1是")
        private Integer userDislike;
    }
}

一般都需要批量去查询出所有的资源,然后丢进适配器进行组装。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    public static List<ChatMessageResp> buildMsgResp(List<Message> messages, List<MessageMark> msgMark,Long receiveUid){
        Map<Long, List<MessageMark>> markMap = msgMark.stream().collect(Collectors.groupingBy(MessageMark::getMsgId));
        return messages.stream().map(a->{
            ChatMessageResp resp = new ChatMessageResp();
            resp.setFromUser(buildFromUser(a.getFromUid()));
            resp.setMessage(buildMessage(a,markMap.getOrDefault(a.getId(),new ArrayList<>()),receiveUid));
            return resp;
        })
                .sorted(Comparator.comparing(a->a.getMessage().getSendTime()))//排好序发给前端
                .collect(Collectors.toList());
    }

消息撤回

我们通过对消息类型进行标识,标识不同的消息,1正常文本 2.撤回消息,在展示时,根据不同的消息类型,选择不同的展示。

时间线展示

前端的时间展示,是仿微信的时间展示。是根据每20条消息,或者是上下两条消息间隔5分钟就展示时间。

主要的目的就是,时间跨度大才展示时间。

表情包功能

我们的功能和微信类似,存储的适合考虑复用性,只在用户第一次上传时,保存到minio中,后续发送时直接指向对应的图片。当其他用户也保存了该图片文件时,也是指向同一地址,减少冗余。

对于较大的表情包,在上传时,自动通过算法对表情包进行压缩,减少存储空间占用。

保存表情包的时间点主要有两个,一个是用户自己主动上传,一个是添加其他人发送的表情包,还有一个是将其他人发送的图片保存成表情包,这个过程中,前端会拉取图片到本地,再自动进行压缩算法保存到minio中。

消息点赞点踩

我们可以把点击的动作,包括点赞和点踩,统称为消息的标记,消息标记之后,还需要推送给前端,让所有用户实时看到标记值变化。

数据库设计:将点赞和点踩抽象成对消息的标记,并专门为消息做一张标记表。每条消息对应多条消息标记的记录。

接口设计:入参包括markType:标记消息类型1点赞2点踩,msgId:消息id,actType:动作类型1确认2取消。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class ChatMessageMarkReq {
    @NotNull
    @ApiModelProperty("消息id")
    private Long msgId;

    @NotNull
    @ApiModelProperty("标记类型 1点赞 2举报")
    private Integer markType;

    @NotNull
    @ApiModelProperty("动作类型 1确认 2取消")
    private Integer actType;
}

实现细节:由于需要通知到所有在线用户,因此需要进行频控限制,防止出现乱刷接口的情况。

1
2
3
4
5
6
7
    @PutMapping("/msg/mark")
    @ApiOperation("消息标记")
    @FrequencyControl(time = 10,count = 5,target = FrequencyControl.Target.UID)
    public ApiResult<Void> setMsgMark(@Valid @RequestBody ChatMessageMarkReq request){
        chatService.setMsgMark(RequestHolder.get().getUid(),request);
        return ApiResult.success();
    }

由于消息的标记涉及到对用户的消息进行操作,因此需要使用分布式锁来进行资源的隔离。在加上锁之后,就可以直接对消息进行标记,并发布一个消息标记事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    @Override
    @RedissonLock(key = "#uid")
    public void setMsgMark(Long uid, ChatMessageMarkReq request) {
        AbstractMsgMarkStrategy strategy = MsgMarkFactory.getStrategyNoNull(request.getMarkType());
        switch (MessageMarkActTypeEnum.of(request.getActType())) {
            case MARK:
                strategy.mark(uid,request.getMsgId());
                break;
            case UN_MARK:
                strategy.unMark(uid,request.getMsgId());
                break;
        }
    }

传统的消息标记,通过查看历史有没有标记,有的话就改动标记,没有的话就新增一条标记记录。

但因为有点赞和点踩的互斥,使用if-else实现起来较复杂。

可以使用设计模式将消息的标记做成一个抽象类,包含标记和取消标记两种方法,并实现了两个子类,包括点赞和点踩,这样就能够覆盖2 * 2种动作。其中抽象类实现了基本功能,对于有扩展需求的,可以重写方法,自己实现。

比如点赞的时候需要点踩。我们重写doMark方法,直接调用父类公共方法实现点赞的标记,再去额外的调用点踩的取消标记方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
public class LikeStrategy extends AbstractMsgMarkStrategy{
    @Override
    protected MessageMarkTypeEnum getTypeEnum() {
        return MessageMarkTypeEnum.LIKE;
    }
    @Override
    public void doMark(Long uid,Long msgId){
        super.doMark(uid, msgId);
        //同时取消点踩的动作
        MsgMarkFactory.getStrategyNoNull(MessageMarkTypeEnum.DISLIKE.getType()).unMark(uid, msgId);
    }

}
@Component
public class DisLikeStrategy extends AbstractMsgMarkStrategy{

    @Override
    protected MessageMarkTypeEnum getTypeEnum() {
        return MessageMarkTypeEnum.DISLIKE;
    }

    @Override
    protected void doMark(Long uid, Long msgId) {
        super.doMark(uid, msgId);
        // 同时取消点赞的动作
        MsgMarkFactory.getStrategyNoNull(MessageMarkTypeEnum.LIKE.getType()).unMark(uid,msgId);
    }
}

具体的执行代码在抽象类中,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    protected void exec(Long uid, Long msgId, MessageMarkActTypeEnum actTypeEnum) {
        Integer markType = getTypeEnum().getType();
        Integer actType = actTypeEnum.getType();
        MessageMark oldMark = messageMarkDao.get(uid, msgId, markType);
        if (Objects.isNull(oldMark) && actTypeEnum == MessageMarkActTypeEnum.UN_MARK) {
            //取消的类型,数据库一定有记录,没有的话直接跳过
            return;
        }
        //插入一条新消息,或者修改一条消息
        MessageMark insertOrUpdate = MessageMark.builder()
                .id(Optional.ofNullable(oldMark).map(MessageMark::getId).orElse(null))
                .uid(uid)
                .msgId(msgId)
                .type(markType)
                .status(transformAct(actType))
                .build();
        boolean modify = messageMarkDao.saveOrUpdate(insertOrUpdate);
        if (modify) {
            //修改成功,发送消息标记事件
            ChatMessageMarkDTO dto = new ChatMessageMarkDTO(uid, msgId, markType, actType);
            applicationEventPublisher.publishEvent(new MessageMarkEvent(this, dto));
        }
    }

当修改成功后,才会发放消息标记事件。消息标记的监听者,会执行几个操作

  1. 判断消息标记到10条,给用户发徽章(幂等)
  2. 推送给所有在线用户,消息标记改动了。 这里有一个细节,如果告诉前端消息被点赞后,由前端给标记数+1,这样遇到丢消息的情况时容易错乱,最好是后端计算好后,直接告诉后端目前的标记数量,哪怕某次消息数据丢失,最终还是能够趋于一致的。

—END—

参考文献

https://cloud.tencent.com/document/product/269/2720

https://www.yuque.com/snab/mallchat/ce2pu00d3m9iywpx