安裝好 tp5 的 rabbitmq 擴展后,在項目根目錄文件添加文件 rabbitmq.php 引導啟動 rabbitmq。
<?php define('APP_PATH', __DIR__ . '/application/'); define('BIND_MODULE','rabbitmq/Client'); // 加載框架引導文件 require __DIR__ . '/thinkphp/start.php';
生成者
??private?function?queueEvent($message) ????{ //????????error_log("n******"?.?date("His")?.?"********n"?.?print_r($message,?1)?.?"n*************n",?3,? 'messag_event.log'); ????????dump($message); ????????//設置你的連接 ????????$conn_args?=?array('host'?=>?'ip',?'port'?=>?'5672',?'login'?=>?'ymq',?'password'?=>?'123456', ????????'vhost'=>'/'); ? ? ????????$content?=?$message; //創建連接和channel ????????$conn?=?new?AMQPConnection($conn_args); ????????if?(!$conn->connect())?{ ????????????die("Cannot?connect?to?the?broker!n"); ????????} ????????$channel?=?new?AMQPChannel($conn); ? //創建交換機 ????????$e_name?=?'MQTT_device_event';?//交換機名 ????????$ex?=?new?AMQPExchange($channel); ????????$ex->setName($e_name); //????????$ex->setType(AMQP_EX_TYPE_TOPIC);?//direct類型 ????????$ex->setType(AMQP_EX_TYPE_DIRECT);?//direct類型 ????????$ex->setFlags(AMQP_DURABLE);?//持久化 ????????$ex->declareExchange(); ????}
相關推薦:《ThinkPHP教程》
立即學習“PHP免費學習筆記(深入)”;
運行 php 目錄運行生產者
消費者
?public?function?index() ????{ ????????//連接RabbitMQ ????????$conn_args?=?array('host'?=>?'ip',?'port'?=>?'5672',?'login'?=>?'ymq',?'password'?=>?'123456',?'vhost'? ????????=>?'/'); ? ????????$e_name?=?'MQTT_device_event';?//交換機名 ????????$q_name?=?'q_event';?//隊列名 ????????$k_route?=?'key_event';?//路由key? //創建連接和channel ????????$conn?=?new?AMQPConnection($conn_args); ????????if?(!$conn->connect())?{ ????????????die("Cannot?connect?to?the?broker!n"); ????????} ????????$channel?=?new?AMQPChannel($conn);? //創建交換機 ????????$ex?=?new?AMQPExchange($channel); ????????$ex->setName($e_name); ????????$ex->setType(AMQP_EX_TYPE_DIRECT);?//direct類型 ????????$ex->setFlags(AMQP_DURABLE);?//持久化 ????????$ex->declareExchange(); //創建隊列 ????????$q?=?new?AMQPQueue($channel); ????????$q->setName($q_name); ????????$q->setFlags(AMQP_DURABLE);?//持久化 ????????$q->declareQueue();?????//最好隊列object在這里declare()下,否則如果是新的queue會報錯? //綁定交換機與隊列,并指定路由鍵,可以多個路由鍵 ????????$q->bind($e_name,?$k_route); //$q->bind($e_name,?'key_33');?? //阻塞模式接收消息 ????????echo?"Message:n"; ????????while(True){ ????????????$q->consume(function($envelope,?$queue)?{ ????????????????$msg?=?$envelope->getBody(); ????????????????//處理數據 ????????????????echo?$msg?.?PHP_EOL;?//處理消息 ????????????????$queue->ack($envelope->getDeliveryTag());?//手動發送ACK應答 ????????????}); ????????????//$q->consume('processMessage',?AMQP_AUTOACK);?//自動ACK應答 ????????} ? ????????$conn->disconnect();? ????}
執行下命令 php rabbitmq
啟動即可
查看隊列是否被消費
登錄 http://127.0.0.115672/#/queues? 地址
? 版權聲明
文章版權歸作者所有,未經允許請勿轉載。
THE END