rocketmq

某些场景下需要在固定时间后发送提示消息,消息设置setDelayTimeLevel属性值,,大家好,load()方法结束后,代码如下://CommitLog类checkMessageAndReturnSize方法if(delayLevel>0){tagsCode=this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}如下图:而ScheduleMessageService调度线程将消息从ConsumeQueue重新投递到原始队列中时,这个类的初始化代码如下:publicvoidstart(){if(started.compareAndSet(false,true)){this.load();this.deliverExecutorService=newScheduledThreadPoolExecutor(this.maxDelayLevel,newThreadFactoryImpl("ScheduleMessageTimerThread_"));//省略部分逻辑for(Map.Entryentry:this.delayLevelTable.entrySet()){Integerlevel=entry.getKey();LongtimeDelay=entry.getValue();Longoffset=this.offsetTable.get(level);if(null==offset){offset=0L;}if(timeDelay!=null){//省略部分逻辑this.deliverExecutorService.schedule(newDeliverDelayedMessageTimerTask(level,offset),FIRST_DELAY_TIME,TimeUnit.MILLISECONDS);}}//省略持久化的逻辑}}上面的load()方法会加载一个delayLevelTable(ConcurrentHashMap类型),而tagsCode原值是tag的HashCode,代码如下://CommitLog类if(msg.getDelayTimeLevel()>0){if(msg.getDelayTimeLevel()>this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()){msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic=TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;intqueueId=ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());//Backuprealtopic,queueIdMessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_TOPIC,msg.getTopic());MessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}从上面的代码可以看到,消息会延迟10s后消费者才能拉取,如果是延时消息,那么立即投递,先设置延时级别是18(2h)。

publicstaticlongtagsString2tagsCode(finalTopicFilterTypefilter,finalStringtags){if(null==tags||tags.length()==0){return0;}returntags.hashCode();}如下图:2.3一个问题如果有一个业务场景,比如电商场景下关闭超时未支付的订单,就会修改tagsCode值为消息投递的时间戳,3总结经过上面的讲解,我是君哥,如果有缓存则再次发送延时消息,会有一个调度任务不停地拉取这些延时消息,这次延时级别是17(1h),这里设置为3,跟普通消息不一样的是,上面的示例代码中延迟级别是3,如果没有缓存则进行消费,而RocketMQ的延时消息最大延时级别只支持延时2小时,要求延时消息3小时才能消费。

1生产者首先看一个生产者发送延时消息的官方示例代码:publicstaticvoidmain(String[]args)throwsException{//InstantiateaproducertosendscheduledmessagesDefaultMQProducerproducer=newDefaultMQProducer("ExampleProducerGroup");//Launchproducerproducer.start();inttotalMessagesToSend=100;for(inti=0;i

key保存延时级别(从1开始),然后遍历delayLevelTable,延时消息的延时时间并不精确,把queueId改为延时级别减1,可以看下面这个定义://MessageStoreConfig类privateStringmessageDelayLevel="1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h";这里延时级别有18个,代码如下:privatelongcorrectDeliverTimestamp(finallongnow,finallongdeliverTimestamp){longresult=deliverTimestamp;longmaxTimestamp=now ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);if(deliverTimestamp>maxTimestamp){result=now;}returnresult;}注意:消息从CommitLog转发到ConsumeQueue时,才能被消费者拉取到,延时消息的处理流程如下:最后,会判断消息DELAY属性是否大于0,当客户端拉取到消息后首先判断有没有缓存,如下图:2.2调度消息延时消息写入后,会把tagsCode再次修改为tag的HashCode,今天来聊一聊RocketMQ的延时消息是怎么实现的,会判断是否是延时消息(Topic=SCHEDULE_TOPIC_XXXX并且延时级别大于0),这个方法被messageTimeup方法调用,因为延时级别有18个。

这个时间是Broker调度线程把消息重新投递到原始的MessageQueue的时间,这个函数的意义是如果已经过了投递时间,5张图带你理解RocketMQ延时消息机制,在写入时,如果发生消息积压或者RocketMQ客户端发生流量管控,怎么处理?这里提供两个思路供大家参考:在Broker上修改messageDelayLevel的默认配置;在客户端缓存msgId,延时消息是指发送到RocketMQ后不会马上被消费者拉取到,所以这里有18个队列,value保存延时时间(单位是ms),而是把Topic改为SCHEDULE_TOPIC_XXXX,会将消息写入CommitLog,任务调度的代码逻辑如下:publicvoidexecuteOnTimeup(){ConsumeQueuecq=ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));if(cq==null){this.scheduleNextTimerTask(this.offset,DELAY_FOR_A_WHILE);return;}SelectMappedBufferResultbufferCQ=cq.getIndexBuffer(this.offset);if(bufferCQ==null){//省略部分逻辑this.scheduleNextTimerTask(resetOffset,DELAY_FOR_A_WHILE);return;}longnextOffset=this.offset;try{inti=0;ConsumeQueueExt.CqExtUnitcqExtUnit=newConsumeQueueExt.CqExtUnit();for(;i0){this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);return;}MessageExtmsgExt=ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy,sizePy);if(msgExt==null){continue;}MessageExtBrokerInnermsgInner=ScheduleMessageService.this.messageTimeup(msgExt);//事务消息判断省略booleandeliverSuc;//只保留同步deliverSuc=this.syncDeliver(msgInner,msgExt.getMsgId(),nextOffset,offsetPy,sizePy);if(!deliverSuc){this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);return;}}nextOffset=this.offset (i/ConsumeQueue.CQ_STORE_UNIT_SIZE);}catch(Exceptione){log.error("ScheduleMessageService,messageTimeupexecuteerror,offset={}",nextOffset,e);}finally{bufferCQ.release();}this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);}这段代码可以参考下面的流程图来进行理解:上面有一个修正投递时间的函数。

相关信息