国产三级农村妇女在线,国产精品毛片a∨一区二区三区,国产乱子伦视频大全,国产精品色拉拉,国产欧美日韩一区二区三区,

首頁(yè) > 技術(shù) > IOS開發(fā)

如何用RabbitMQ和Swoole實(shí)現(xiàn)一個(gè)異步任務(wù)系統(tǒng)

IOS開發(fā) 2023-02-16 22:19:00
目錄
  • 系統(tǒng)介紹
  • 事件生產(chǎn)者
  • 任務(wù)調(diào)度器
  • 消費(fèi)者
    • 正常任務(wù)
    • 延遲任務(wù)
  • 自定義調(diào)度器

    系統(tǒng)介紹

    如何用RabbitMQ和Swoole實(shí)現(xiàn)一個(gè)異步任務(wù)系統(tǒng)(圖1)

    從圖中可以看到,我們這個(gè)系統(tǒng)是一個(gè)基于事件的異步任務(wù)系統(tǒng)。就是說(shuō)當(dāng)一個(gè)事件產(chǎn)生時(shí),生產(chǎn)者將事件拋給調(diào)度器,調(diào)度器負(fù)責(zé)查詢事件下有哪些任務(wù),然后將這些任務(wù)丟到相應(yīng)的隊(duì)列中,最后由消費(fèi)者消費(fèi)任務(wù)隊(duì)列中的任務(wù)。

    在整個(gè)系統(tǒng)中主要分為三大部分

    1.事件生產(chǎn)者,即產(chǎn)生消息事件的一方。

    2.任務(wù)調(diào)度器(Scheduler),負(fù)責(zé)注冊(cè)事件并調(diào)度任務(wù)。

    3.消費(fèi)者(Worker),負(fù)責(zé)消費(fèi)任務(wù)隊(duì)列中的任務(wù)。

    事件生產(chǎn)者

    事件生產(chǎn)者很簡(jiǎn)單,在業(yè)務(wù)系統(tǒng)中直接調(diào)用即可,代碼如下。

    <?php
     
    require_once DIR.'/../autoload.php';
     
    use AsynclibEbatsEvent;
     
    try{
     
        $event = new Event('order_paied');  //定義事件
     
        $event->setOptions(['order_id' => 'FB138020392193312']); //事件產(chǎn)生的參數(shù)
     
        $event->publish();
     
    }catch (Exception $exc){
     
        echo $exc->getMessage();
     
    }

    任務(wù)調(diào)度器

    調(diào)度器主要做兩件事,一是注冊(cè)事件,另一個(gè)是調(diào)度任務(wù)。

    注冊(cè)事件代碼如下:

    //注冊(cè)事件
     
    EventManager::register('order_create', 'closeOrder', 'demo', 10);//關(guān)閉未付款訂單(延遲任務(wù))
     
    EventManager::register('order_paied', 'virtualShipping', 'demo'); //虛擬商品自動(dòng)發(fā)貨

    這樣就注冊(cè)了兩個(gè)事件,事件下各有一個(gè)任務(wù)。

    具體調(diào)度部分代碼很簡(jiǎn)單,就不多贅述,有興趣的可以去看代碼。

    消費(fèi)者

    重頭戲來(lái)了,一個(gè)異步任務(wù)系統(tǒng)最重要的就是消費(fèi)端了,現(xiàn)在讓我們來(lái)看下Worker的流程圖。

    如何用RabbitMQ和Swoole實(shí)現(xiàn)一個(gè)異步任務(wù)系統(tǒng)(圖2)

    可以看到,在這里我們采用了兩個(gè)交換器和兩個(gè)隊(duì)列,一個(gè)負(fù)責(zé)處理正常的任務(wù)即ntask,另一個(gè)負(fù)責(zé)處理需要延遲執(zhí)行的任務(wù)即dtask。簡(jiǎn)單描述下一個(gè)任務(wù)的生命周期。

    正常任務(wù)

    1、task產(chǎn)生,進(jìn)入正常任務(wù)的交換器Exchange[ebats_core_ntask]

    2、交換器根據(jù)topic將任務(wù)分發(fā)到對(duì)應(yīng)的隊(duì)列中

    3、子進(jìn)程ntask阻塞等待成功獲取到task,并執(zhí)行該任務(wù)

    4、執(zhí)行失敗,需要重試時(shí)拋出RetryException,不需要重試時(shí)拋出TaskException

    5、子進(jìn)程ntask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]

    6、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看

    延遲任務(wù)

    1、子進(jìn)程dtask阻塞等待成功獲取到task,并執(zhí)行該任務(wù)
    2、執(zhí)行失敗,需要重試時(shí)拋出RetryException,不需要重試時(shí)拋出TaskException
    3、子進(jìn)程dtask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]
    4、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看

    消費(fèi)者代碼如下:

    require_once DIR.'/../autoload.php';
     
    require_once DIR.'/task/TaskDemoModel.php';
     
    use AsynclibEbatsWorker;
     
      
     
    //執(zhí)行結(jié)果回調(diào)函數(shù)
     
    $callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){
     
      
     
    };
     
    $worker = new Worker($callback);  //支持多進(jìn)程消費(fèi)默認(rèn)為1
     
    $worker->setQueue('demo');  //隊(duì)列名和事件的topic一一對(duì)應(yīng)
     
    $worker->run();

    自定義調(diào)度器

    一般來(lái)說(shuō)這是一個(gè)基于事件的任務(wù)系統(tǒng),那么能不能直接產(chǎn)生任務(wù)呢。答案是肯定的。

    只需要?jiǎng)?chuàng)建一個(gè)自定義調(diào)度器,由您自行實(shí)現(xiàn)調(diào)度邏輯,最終生成一個(gè)任務(wù)即可。代碼如下:

    <?php
     
    require_once DIR.'/../autoload.php';
     
    use AsynclibEbatsTask;
     
    use AsynclibCoreConsumer;
     
    use AsynclibAmqExchangeTypes;
     
    use AsynclibExceptionExceptionInterface;
     
      
     
    /**
     
     * 本示例演示了如何創(chuàng)建一個(gè)自定義調(diào)度器,開發(fā)者可以根據(jù)自身需求開發(fā)自己的任務(wù)調(diào)度器
     
     */
     
    try{
     
        $worker = new Consumer();
     
        $worker->setExchange('order_fanout', ExchangeTypes::TOPIC);
     
        $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']);
     
        $worker->run(function($key, $msg){
     
            $order_data = json_encode($msg);
     
            echo " [$key] $order_data n";
     
            Task::create('demo', 'orderAsync', $msg);//創(chuàng)建任務(wù),之后消息將作為參數(shù)由任務(wù)接管處理
     
        });
     
    }catch (ExceptionInterface $exc){
     
        echo $exc->getMessage();
     
    }

    這樣,當(dāng)接收到消息時(shí)就會(huì)產(chǎn)生一個(gè)orderAsync的任務(wù),您只需要啟動(dòng)一個(gè)用來(lái)消費(fèi)這個(gè)Topic的Worker即可。

    也許你會(huì)覺(jué)得這里直接寫業(yè)務(wù)邏輯的代碼就可以了,實(shí)際上也確實(shí)可以。當(dāng)你可以忍受一個(gè)進(jìn)程慢慢消費(fèi)的時(shí)候是可以這樣做的。但大多數(shù)情況下我們還是希望它能夠盡快的消費(fèi)掉,所以建議這里只負(fù)責(zé)創(chuàng)建任務(wù),具體任務(wù)的業(yè)務(wù)邏輯由worker去執(zhí)行。

    以上就是如何用RabbitMQ和Swoole實(shí)現(xiàn)一個(gè)異步任務(wù)系統(tǒng)的詳細(xì)內(nèi)容,更多關(guān)于用RabbitMQ和Swoole實(shí)現(xiàn)一個(gè)異步任務(wù)系統(tǒng)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

    TAg

    加載中~

    本網(wǎng)站LOGO受版權(quán)及商標(biāo)保護(hù),版權(quán)登記號(hào):國(guó)作登字-2022-F-10126915,未經(jīng)湖南木星科技官方許可,嚴(yán)禁使用。
    Copyright ? 2012-2022 湖南木星科技有限公司(木星網(wǎng))版權(quán)所有
    轉(zhuǎn)載內(nèi)容版權(quán)歸作者及來(lái)源網(wǎng)站所有,本站原創(chuàng)內(nèi)容轉(zhuǎn)載請(qǐng)注明來(lái)源,商業(yè)媒體及紙媒請(qǐng)先聯(lián)系:aishangyiwan@126.com

    工信部備案號(hào):湘ICP備19012813號(hào)-5