賊好用,這款精準定時任務和延時隊列框架
項目簡述
Mykit體系中提供的簡單、穩定、可擴展的延遲消息隊列框架,提供精準的定時任務和延遲隊列處理功能
項目模塊說明
mykit-delay-common: mykit-delay 延遲消息隊列框架通用工具模塊,提供全局通用的工具類
mykit-delay-config: mykit-delay 延遲消息隊列框架通用配置模塊,提供全局配置
mykit-delay-queue: mykit-delay 延遲消息隊列框架核心實現模塊,目前所有主要的功能都在此模塊實現
mykit-delay-controller: mykit-delay 延遲消息隊列框架Restful接口實現模塊,對外提供Restful接口訪問,兼容各種語言調用
mykit-delay-core: mykit-delay 延遲消息隊列框架的入口,整個框架的啟動程序在此模塊實現
mykit-delay-rpc:mykit-delay延時消息隊列的RPC模塊,支持Dubbo、brpc、grpc、Motan、Sofa、SpringCloud、SpringCloud Alibaba等主流RPC的實現
mykit-delay-test: mykit-delay 延遲消息隊列框架通用測試模塊,主要提供Junit單元測試用例
需求背景
用戶下訂單后未支付,30分鐘后支付超時
在某個時間點通知用戶參加系統活動
業務執行失敗之后隔10分鐘重試一次
類似的場景比較多 簡單的處理方式就是使用定時任務 假如數據比較多的時候 有的數據可能延遲比較嚴重,而且越來越多的定時業務導致任務調度很繁瑣不好管理。
隊列設計
整體架構設計如下圖所示。
開發前需要考慮的問題
及時性 消費端能按時收到
同一時間消息的消費權重
可靠性 消息不能出現沒有被消費掉的情況
可恢復 假如有其他情況 導致消息系統不可用了 至少能保證數據可以恢復
可撤回 因為是延遲消息 沒有到執行時間的消息支持可以取消消費
高可用 多實例 這里指HA/主備模式并不是多實例同時一起工作
消費端如何消費
當然初步選用Redis作為數據緩存的主要原因是因為redis自身支持zset的數據結構(score 延遲時間毫秒) 這樣就少了排序的煩惱而且性能還很高,正好我們的需求就是按時間維度去判定執行的順序 同時也支持Map list數據結構。
簡單定義一個消息數據結構
private String topic;/***topic**/private String id;/***自動生成 全局惟一 snowflake**/private String bizKey;private long delay;/***延時毫秒數**/private int priority;//優先級private long ttl;/**消費端消費的ttl**/private String body;/***消息體**/private long createTime=System.currentTimeMillis();private int status= Status.WaitPut.ordinal();
運行原理
用Map來存儲元數據。id作為key,整個消息結構序列化(JSON/…)之后作為value,放入元消息池中。
將id放入其中(有N個)一個zset有序列表中,以createTime delay priority作為score。修改狀態為正在延遲中
使用timer實時監控zset有序列表中top 10的數據 。 如果數據score<=當前時間毫秒就取出來,根據topic重新放入一個新的可消費列表(list)中,在zset中刪除已經取出來的數據,并修改狀態為待消費
客戶端獲取數據只需要從可消費隊列中獲取就可以了。并且狀態必須為待消費 運行時間需要<=當前時間的 如果不滿足 重新放入zset列表中,修改狀態為正在延遲。如果滿足修改狀態為已消費。或者直接刪除元數據。
客戶端
因為涉及到不同程序語言的問題,所以當前默認支持http訪問方式。
添加延時消息添加成功之后返回消費唯一ID POST /push {……消息體}
刪除延時消息 需要傳遞消息ID GET /delete?id=
恢復延時消息 GET /reStore?expire=true|false expire是否恢復已過期未執行的消息。
恢復單個延時消息 需要傳遞消息ID GET /reStore/id
獲取消息 需要長連接 GET /get/topic
用Nginx暴露服務,配置為輪詢 在添加延遲消息的時候就可以流量平均分配。
目前系統中客戶端并沒有采用HTTP長連接的方式來消費消息,而是采用MQ的方式來消費數據這樣客戶端就可以不用關心延遲消息隊列。只需要在發送MQ的時候攔截一下 如果是延遲消息就用延遲消息系統處理。
消息可恢復
實現恢復的原理 正常情況下一般都是記錄日志,比如mysql的binlog等。
這里我們直接采用mysql數據庫作為記錄日志。
目前創建以下2張表:
消息表 字段包括整個消息體
消息流轉表 字段包括消息ID、變更狀態、變更時間、zset掃描線程Name、host/ip
定義zset掃描線程Name是為了更清楚的看到消息被分發到具體哪個zset中。前提是zset的key和監控zset的線程名稱要有點關系 這里也可以是zset key。
支持消息恢復
假如redis服務器宕機了,重啟之后發現數據也沒有了。所以這個恢復是很有必要的,只需要從表1也就是消息表中把消息狀態不等于已消費的數據全部重新分發到延遲隊列中去,然后同步一下狀態就可以了。
當然恢復單個任務也可以這么干。
數據表設計
這里,我就直接給出創建數據表的SQL語句。
DROP TABLE IF EXISTS `mykit_delay_queue_job`;CREATE TABLE `mykit_delay_queue_job` ( `id` varchar(128) NOT NULL, `bizkey` varchar(128) DEFAULT NULL, `topic` varchar(128) DEFAULT NULL, `subtopic` varchar(250) DEFAULT NULL, `delay` bigint(20) DEFAULT NULL, `create_time` bigint(20) DEFAULT NULL, `body` text, `status` int(11) DEFAULT NULL, `ttl` int(11) DEFAULT NULL, `update_time` datetime(3) DEFAULT NULL, PRIMARY KEY (`id`), KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`), KEY `mykit_delay_queue_job_STATUS` (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Table structure for mykit_delay_queue_job_log-- ----------------------------DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;CREATE TABLE `mykit_delay_queue_job_log` ( `id` varchar(128) NOT NULL, `status` int(11) DEFAULT NULL, `thread` varchar(60) DEFAULT NULL, `update_time` datetime(3) DEFAULT NULL, `host` varchar(128) DEFAULT NULL, KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
關于高可用
分布式協調還是選用zookeeper。
如果有多個實例最多同時只能有1個實例工作 這樣就避免了分布式競爭鎖帶來的壞處,當然如果業務需要多個實例同時工作也是支持的,也就是一個消息最多只能有1個實例處理,可以選用zookeeper或者redis就能實現分布式鎖了。
最終做了一下測試多實例同時運行,可能因為會涉及到鎖的問題性能有所下降,反而單機效果很好。所以比較推薦基于docker的主備部署模式。
運行模式
支持 master,slave (HA)需要配置mykit.delay.registry.serverList zk集群地址列表
支持 cluster 會涉及到分布式鎖競爭 效果不是很明顯 分布式鎖采用redis的 setNx實現
StandAlone
目前,經過測試,推薦使用master slave的模式,并且,在升級版本中,進一步增強了Master Slave模式。后期會優化Cluster模式。
如何接入
為了提供一個統一的精準定時任務和延時隊列框架,mykit-delay提供了HTTP Rest接口和RPC方式供其他業務系統調用,接口使用簡單方便,只需要簡單的調用接口,傳遞相應的參數即可。
RPC方式調用,后續支持的方式有:
Dubbo(已實現)
brpc(預留支持)
grpc(預留支持)
Motan(預留支持)
Sofa(預留支持)
SpringCloud(預留支持)
SpringCloud Alibaba(預留支持)
HTTP方式接入
消息體
以JSON數據格式參數 目前提供了http 協議。
body 業務消息體
delay 延時毫秒 距createTime的間隔毫秒數
id 任務ID 系統自動生成 任務創建成功返回
status 狀態 默認不填寫
topic 標題
subtopic 保留字段
ttl 保留字段
createTime 創建任務時間 非必填 系統默認
啟動HTTP Rest服務
首先,從gitHub Clone項目到本地
git clone https://github.com/sunshinelyz/mykit-delay.git
然后進入mykit-delay框架目錄。
cd mykit-delay
執行Maven命令
mvn clean package -Dmaven.test.skip=true
接下來,進入 mykit-delay-core 的 target 目錄下,運行如下命令。
java -jar mykit-delay-core-xxx.jar
其中,xxx是版本號,以實際下載的版本號為準。
接下來,就可以調用HTTP Restful接口來使用mykit-delay框架了。
添加任務
/push POST application/json{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}
刪除任務
刪除任務 需要記錄一個JobId
/delete?jobId=xxx GET
恢復單個任務
用于任務錯亂 腦裂情況 根據日志恢復任務
/reStore?expire=true GET
參數expire 表示是否需要恢復已過期還未執行的數據
清空隊列數據
根據日志中未完成的數據清空隊列中全部數據。清空之后 會刪除緩存中的所有任務
/clearAll GET
Dubbo方式接入
消息體
以JSON數據格式參數 目前提供了http 協議。
body 業務消息體
delay 延時毫秒 距createTime的間隔毫秒數
id 任務ID 系統自動生成 任務創建成功返回
status 狀態 默認不填寫
topic 標題
subtopic 保留字段
ttl 保留字段
createTime 創建任務時間 非必填 系統默認
啟動Dubbo服務
首先,從GitHub Clone項目到本地
git clone https://github.com/sunshinelyz/mykit-delay.git
然后進入mykit-delay框架目錄。
cd mykit-delay
執行Maven命令
mvn clean package -Dmaven.test.skip=true
接下來,進入 mykit-rpc-dubbo模塊下的 mykit-rpc-dubbo-server服務 的 target 目錄下,運行如下命令。
mykit-rpc-dubbo-server-xxx.jar
其中,xxx是版本號,以實際下載的版本號為準。
引入mykit-delay依賴
以Dubbo方式接入mykit-delay,需要引入mykit-delay的依賴,如下所示。
<dependency> <groupId>io.mykit.delay</groupId> <artifactId>mykit-rpc-dubbo-common</artifactId> <version>1.0-SNAPSHOT</version></dependency>
然后,在需要調用Dubbo服務的類中以如下方式注入MykitDelayDubboInterface。
@DubboReference(version = "1.0.0")private MykitDelayDubboInterface mykitDelayDubboInterface;
其中,MykitDelayDubboInterface接口的定義如下所示。
/** * @author binghe * @version 1.0.0 * @description 發布的Dubbo接口 */public interface MykitDelayDubboInterface { /** * 推送消息 */ ResponseMessage push(JobWrapp jobMsg); /** * 刪除任務 */ ResponseMessage delete(String jobId); /** * 完成任務 */ ResponseMessage finish(String jobId); /** * 恢復單個任務 */ ResponseMessage reStoreJob(String jobId); /** * 提供一個方法 假設緩存中間件出現異常 以及數據錯亂的情況 提供恢復功能 * @param expire 過期的數據是否需要重發 true需要, false不需要 默認為true */ ResponseMessage reStore(Boolean expire); /** * 清除所有的任務 */ ResponseMessage clearAll();}
接下來,就可以以Dubbo方式接入mykit-delay框架了。
注意:無論是以HTTP方式,還是以RPC方式啟動mykit-delay服務,都需要通過如下方式加載基本配置信息。
StartGetReady.ready(ConsumeQueueProvider.class.getName());
客戶端獲取隊列方式
目前默認實現了RocketMQ與ActiveMQ的推送方式。依賴MQ的方式來實現延時框架與具體業務系統的解耦。同時,框架已SPI的形式加載相應的MQ,也就是說,集成MQ的方式是可擴展的。
消息體中消息與RocketMQ和 ActiveMQ 消息字段對應關系
mykit-delay RocketMQ ActiveMQ 備注
topic topic topic 點對點發送隊列名稱或者主題名稱
subtopic subtopic subtopic 點對點發送隊列子名稱或者主題子名稱
body 消息內容 消息內容 消息內容
關于系統配置
延遲框架與具體執行業務系統的交互方式通過延遲框架配置實現,具體配置文件位置為mykit-delay-config項目下的resources/properties/starter.properties文件中。
測試
需要配置好數據庫地址和Redis的地址 如果不是單機模式 也需要配置好Zookeeper
運行mykit-delay-test模塊下的測試類io.mykit.delay.test.PushTest添加任務到隊列中
啟動mykit-delay-test模塊下的io.mykit.delay.TestDelayQueue消費前面添加數據 為了方便查詢效果 默認的消費方式是consoleCQ 控制臺輸出
擴展
支持zset隊列個數可配置,避免大數據帶來高延遲的問題。進一步增強框架的高可用。
近期規劃
brpc、grpc、Motan、Sofa、SpringCloud、SpringCloud Alibaba等RPC擴展
支持RabbitMQ、Kafka等消息中間件
分區(buck)支持動態設置
redis與數據庫數據一致性的問題 (重要)
實現自己的推拉機制
支持可切換實現方式,目前只是依賴Redis實現,后續待優化,支持更多的可配置選項
支持Web控制臺管理隊列
實現消息消費TTL機制
增加對框架和定時任務的監控