Apache Pulsar延遲消息投遞解析,apache log4j2Apache Pulsar延遲消息投遞解析導(dǎo)語(yǔ)Apache Pulsar是一個(gè)多租戶、高性能的服務(wù)間消息傳輸解決方案,支持多租戶、低延時(shí)、讀寫分離、跨地域復(fù)制、快速擴(kuò)容、靈活容錯(cuò)等特性。騰訊數(shù)據(jù)平臺(tái)部MQ團(tuán)隊(duì)對(duì)Pulsar做了深入調(diào)研以及大量的性能和......
導(dǎo)語(yǔ)Apache Pulsar是一個(gè)多租戶、高性能的服務(wù)間消息傳輸解決方案,支持多租戶、低延時(shí)、讀寫分離、跨地域復(fù)制、快速擴(kuò)容、靈活容錯(cuò)等特性。騰訊數(shù)據(jù)平臺(tái)部MQ團(tuán)隊(duì)對(duì)Pulsar做了深入調(diào)研以及大量的性能和穩(wěn)定性方面優(yōu)化,目前已經(jīng)在騰訊云消息隊(duì)列TDMQ落地上線。本文主要介紹Pulsar延遲消息投遞的實(shí)現(xiàn),希望與大家一同交流。
一、什么是延遲消息投遞
延遲消息投遞在MQ應(yīng)用場(chǎng)景中十分普遍,它是指消息在發(fā)國(guó)際快遞MQ服務(wù)端后并不會(huì)立馬投遞,而是根據(jù)消息中的屬性延遲固定時(shí)間后才投遞給消費(fèi)者,一般分為定時(shí)消息和延遲消息兩種:
·定時(shí)消息:Producer將消息發(fā)國(guó)際快遞MQ服務(wù)端,但并不期望這條消息立馬投遞,而是推遲到在當(dāng)前時(shí)間點(diǎn)之后的某一個(gè)時(shí)間投遞到Consumer進(jìn)行消費(fèi)。
·延遲消息:Producer將消息發(fā)國(guó)際快遞MQ服務(wù)端,但并不期望這條消息立馬投遞,而是延遲一定時(shí)間后才投遞到Consumer進(jìn)行消費(fèi)。
目前在業(yè)界,騰訊云的CMQ和阿里云的RocketMQ也都支持延遲消息投遞:
·CMQ:將消息延遲期間定義為”飛行狀態(tài)“,可通過設(shè)置DelaySeconds配置延遲范圍,取值范圍為03600秒,即消息最長(zhǎng)不可見時(shí)長(zhǎng)為1小時(shí)。
·RocketMQ:開源版本延遲消息臨時(shí)存儲(chǔ)在一個(gè)內(nèi)部主題中,支持特定的level,例如定時(shí)5s,10s,1m等,商業(yè)版本支持任意時(shí)間精度。
開源的NSQ、RabbitMQ、ActiveMQ和Pulsar也都內(nèi)置了延遲消息的處理能力。雖然每個(gè)MQ項(xiàng)目的使用和實(shí)現(xiàn)方式不同,但核心實(shí)現(xiàn)思路都一樣:Producer將一個(gè)延遲消息發(fā)國(guó)際快遞某個(gè)Topic中,Broker將延遲消息放到臨時(shí)存儲(chǔ)進(jìn)行暫存,延遲跟蹤服務(wù)(Delayed Tracker Service)會(huì)檢查消息是否到期,將到期的消息進(jìn)行投遞。
二、延遲消息投遞的使用場(chǎng)景
延遲消息投遞是要暫緩對(duì)當(dāng)前消息的處理,在未來(lái)的某個(gè)時(shí)間點(diǎn)再觸發(fā)投遞,實(shí)際的應(yīng)用場(chǎng)景非常多,比如異常檢測(cè)重試、訂單超時(shí)取消、預(yù)約提醒等。
·服務(wù)請(qǐng)求異常,需要將異常請(qǐng)求放到單獨(dú)的隊(duì)列,隔5分鐘后進(jìn)行重試;
·用戶購(gòu)買商品,但一直處于未支付狀態(tài),需要定期提醒用戶支付,超時(shí)則關(guān)閉訂單;
·面試或者會(huì)議預(yù)約,在面試或者會(huì)議開始前半小時(shí),發(fā)快遞通知再次提醒;
TDMQ最近就有個(gè)使用Pulsar延遲消息的Case:業(yè)務(wù)要對(duì)兩套系統(tǒng)的日志消息進(jìn)行關(guān)聯(lián),其中一套系統(tǒng)由于查詢Hbase可能會(huì)超時(shí)或失敗,需要將失敗的關(guān)聯(lián)任務(wù)在集群空閑的時(shí)候再次調(diào)度。
三、如何使用Pulsar延遲消息投遞
Pulsar最早是在2.4.0引入了延遲消息投遞的特性,在Pulsar中使用延遲消息,可以精確指定延遲投遞的時(shí)間,有deliverAfter和deliverAt兩種方式。其中deliverAt可以指定具體的時(shí)間戳;deliverAfter可以指定在當(dāng)前多長(zhǎng)時(shí)間后執(zhí)行。兩種方式的本質(zhì)是一樣的,Client會(huì)計(jì)算出時(shí)間戳國(guó)際快遞Broker。
1.deliverAfter發(fā)快遞
producer.newMessage()
.deliverAfter(long time, TimeUnit unit)
.send();
2.deliverAt發(fā)快遞
producer.newMessage()
.deliverAt(long timestamp)
.send();
在Pulsar中,可以支持跨度很大的延時(shí)消息,比方說(shuō)一個(gè)月、半年;同時(shí)在一個(gè)Topic里,既支持延時(shí)消息,也支持非延時(shí)消息。下圖展示了Pulsar中延遲消息的具體過程:
producer發(fā)快遞的m1/m3/m4/m5有不同的延遲時(shí)間,m2是不需要延遲投遞的正常消息,consumer消費(fèi)時(shí)會(huì)根據(jù)不同的延遲時(shí)間進(jìn)行ack。
四、Pulsar延遲消息投遞實(shí)現(xiàn)原理
從上面的使用方式可以看出,Pulsar支持的是秒級(jí)精度的延遲消息投遞,不同于開源RocketMQ支持固定時(shí)間level的延遲。
Pulsar實(shí)現(xiàn)延遲消息投遞的方式比較簡(jiǎn)單,所有延遲投遞的消息會(huì)被Delayed Message Tracker記錄對(duì)應(yīng)的index。index是由timestampLedgerIDEntryID三部分組成,其中LedgerIDEntryID用于定位該消息,timestamp除了記錄需要投遞的時(shí)間,還用于delayed index優(yōu)先級(jí)隊(duì)列排序。
Delayed Message Tracker在堆外內(nèi)存維護(hù)著一個(gè)delayed index優(yōu)先級(jí)隊(duì)列,根據(jù)延遲時(shí)間進(jìn)行堆排序,延遲時(shí)間最短的會(huì)放在頭上,時(shí)間越長(zhǎng)越靠后。consumer在消費(fèi)時(shí),會(huì)先去Delayed Message Tracker檢查,是否有到期需要投遞的消息,如果有到期的消息,則從Tracker中拿出對(duì)應(yīng)的index,找到對(duì)應(yīng)的消息進(jìn)行消費(fèi);如果沒有到期的消息,則直接消費(fèi)正常的消息。
如果集群出現(xiàn)Broker宕機(jī)或者topic的ownership轉(zhuǎn)移,Pulsar會(huì)重建delayed index隊(duì)列,來(lái)保證延遲投遞的消息能夠正常工作。
五、Pulsar延遲消息投遞面臨的挑戰(zhàn)
從Pulsar的延遲消息投遞實(shí)現(xiàn)原理可以看出,該方法簡(jiǎn)單高效,對(duì)Pulsar內(nèi)核侵入性較小,可以支持到任意時(shí)間的延遲消息。但同時(shí)發(fā)現(xiàn),Pulsar的實(shí)現(xiàn)方案無(wú)法支持大規(guī)模使用延遲消息,主要有以下兩個(gè)原因:
1.delayed index隊(duì)列受到內(nèi)存限制
一條延遲消息的delayed index由三個(gè)long組成,對(duì)于小規(guī)模的延遲消息來(lái)說(shuō),內(nèi)存開銷并不大。但由于index隊(duì)列是subscription級(jí)別,對(duì)于topic的同一個(gè)partition來(lái)說(shuō),有多少個(gè)subscription就需要維護(hù)多少個(gè)index隊(duì)列;同時(shí),由于延遲消息越多、延遲的時(shí)間越長(zhǎng),index隊(duì)列內(nèi)存占用也會(huì)更多。
2.delayed index隊(duì)列重建時(shí)間開銷
上面有提到,如果集群出現(xiàn)Broker宕機(jī)或者topic的ownership轉(zhuǎn)移,Pulsar會(huì)重建delayed index隊(duì)列。對(duì)于跨度時(shí)間長(zhǎng)的大規(guī)模延遲消息,重建時(shí)間可能會(huì)到小時(shí)級(jí)別。為了減小delayed index隊(duì)列重建時(shí)間,雖然可以給topic分更多的partition提高重建的并發(fā)度,但沒有徹底解決重建時(shí)間開銷問題。
六、Pulsar延遲消息投遞未來(lái)工作
Pulsar目前的延遲消息投遞方案簡(jiǎn)單高效,但處理大規(guī)模延遲消息時(shí)仍然存在風(fēng)險(xiǎn)。關(guān)于延遲消息投遞,社區(qū)和數(shù)據(jù)平臺(tái)部MQ團(tuán)隊(duì)下一步將聚焦在支持大規(guī)模延遲消息。目前討論的方案是在delayed index隊(duì)列加入時(shí)間分區(qū),Broker只加載當(dāng)前較近的時(shí)間片delayed index到內(nèi)存,其余時(shí)間片分區(qū)持久化磁盤,示例圖如下圖所示:
上圖中,我們按5分鐘的間隔對(duì)delayed index隊(duì)列進(jìn)行分區(qū),m5和m1放在了time partition 1,由于延遲時(shí)間最近,放在了內(nèi)存;m4和m3在time partition 2,延遲時(shí)間比較靠后,index存儲(chǔ)在了磁盤。該方案不僅可以減少delayed index隊(duì)列重建時(shí)間開銷,還可以降低對(duì)內(nèi)存的依賴。
結(jié)語(yǔ)
本文為大家介紹了延遲消息投遞的相關(guān)概念和使用場(chǎng)景,并詳細(xì)拓展了Apache Pulsar的實(shí)現(xiàn)原理。Pulsar目前方案簡(jiǎn)單高效,支持秒級(jí)精度的延遲消息投遞,但在處理大規(guī)模延遲消息時(shí)還有一些局限。
目前騰訊云消息隊(duì)列TDMQ上已上線了對(duì)Pulsar延遲消息投遞的支持,Pulsar社區(qū)和數(shù)據(jù)平臺(tái)部MQ團(tuán)隊(duì)下一步也將聚焦在支持大規(guī)模延遲消息上。
特別聲明:以上文章內(nèi)容僅代表作者本人觀點(diǎn),不代表ESG跨境電商觀點(diǎn)或立場(chǎng)。如有關(guān)于作品內(nèi)容、版權(quán)或其它問題請(qǐng)于作品發(fā)表后的30日內(nèi)與ESG跨境電商聯(lián)系。
二維碼加載中...
使用微信掃一掃登錄
使用賬號(hào)密碼登錄
平臺(tái)顧問
微信掃一掃
馬上聯(lián)系在線顧問
小程序
ESG跨境小程序
手機(jī)入駐更便捷
返回頂部