本篇文章给大家带来了关于PHP的相关知识,其中主要介绍了关于消息队列RabbitMQ入门还有一些实战详解,消息队列是应用间的通信方式,下面一起来看一下,希望对大家有帮助。
|
本篇文章给大家带来了关于PHP的相关知识,其中主要介绍了关于消息队列RabbitMQ入门还有一些实战详解,消息队列是应用间的通信方式,下面一起来看一下,希望对大家有帮助。
推荐学习:《PHP视频教程》 消息队列介绍以及消息队列应用场景RabbitMQ说明 为什么使用消息中间件? 异步处理 应用解耦 流量削峰 Rabbitmq特性 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 RabbitMQ的工作原理 Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。 Virtual host: 类似于mysql的数据库,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。 Connection: publisher/consumer和broker之间的TCP连接。 Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。 Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。 rabbitmq安装启动RabbitMQ官方地址:http://www.rabbitmq.com 第一步:erlang 安装
# 系统 centos 7# 下载erlang包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白 # 安装 yum install erlang-23.3.4.4-1.el7.x86_64.rpm # 验证安装是否成功 erl
第二步:安装rabbitmq
# 系统 centos 7# 下载rabbitmq包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白 # 安装 yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm # 启动 systemctl start rabbitmq-server # 关闭 systemctl stop rabbitmq-server # 查看默认端口服务是否启动 netstat -tunlp
php消息队列rabbitmq各种模式使用rabbitmq管理界面及命令行使用4369:epmd(Erlang Port Mapper Daemon),erlang服务端口 5672 :client端通信口 15672:HTTP API客户端,管理UI(仅在启用了管理插件的情况下)不一定会启动 25672:用于节点间通信(Erlang分发服务器端口) rabbitmq 管理命令 # 启动rabbitmq_management插件 rabbitmq-plugins enable rabbitmq_management # 查看所有插件 rabbitmq-plugins list 测试访问UI界面:(此时非localhost地址是无法登录) rabbitmq 配置管理界面 # 新增一个用户 rabbitmqctl add_user 【用户名Username】 【密码Password】 rabbitmqctl add_user root root # 删除一个用户 rabbitmqctl delete_user Username # 修改用户的密码 rabbitmqctl change_password Username Newpassword # 查看当前用户列表 rabbitmqctl list_users # 设置用户角色的命令为: rabbitmqctl set_user_tags User Tag rabbitmqctl set_user_tags root administrator # User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。 命令行创建vhost以及php扩展安装 1)查看不同用户的vhost 创建vhost,以及分配权限 # 新增vhost rabbitmqctl add_vhost vhostname rabbitmqctl add_vhost order # 查看vhost列表 rabbitmqctl list_vhosts #为vhost添加用户 rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"rabbitmqctl set_permissions -p order root ".*" ".*" ".*" ".*" ".*" ".*"后边三个.*分别代表:配置权限、写权限、读权限
2)为php安装rabbitmq扩展安装 修改阿里云镜像 composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/ 开始下载–这里有时候会下载成2.8低版本的,需要指定版本
# 升级composer composer self-update #php.ini 打开 sockets 扩展 #下载指定版本 composer require php-amqplib/php-amqplib=^3.0 simple模式生产者消息推送到消息队列 简单的生产者与消息者 <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//生产者
//Connection: publisher/consumer和broker之间的TCP连接
//Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明队列名为:goods
$queue_name = 'goods';
$channel->queue_declare($queue_name, false, true, false, false);
//生产数据
$data = 'this is messge';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//发布消息
$channel->basic_publish($msg, $exchange = '', $queue_name);
//关闭连接
$channel->close();
$connection->close();运行生产者脚本: simple模式消费者接受消息http://localhost/rabbitmq/simple/con.php <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明队列名为:goods
$queue_name = 'goods';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
};
//开启消费
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//不断的循环进行消费
while ($channel->is_open()) {
$channel->wait();
}
//关闭连接
$channel->close();
$connection->close();worker模式生产消费消息rabbitmq Work Queues <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//生产者
//Connection: publisher/consumer和broker之间的TCP连接
//Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明队列名为:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
for ($i = 0; $i < 10; $i++) {
//生产数据
$data = 'this is messge' . $i;
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//发布消息
$channel->basic_publish($msg, $exchange = '', $queue_name);
}
//关闭连接
$channel->close();
$connection->close();消费者worker1 <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明队列名为:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
};
//开启消费
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//不断的循环进行消费
while ($channel->is_open()) {
$channel->wait();
}
//关闭连接
$channel->close();
$connection->close();消费者worker2,代码和worker1一样,同时运行开启后会一起消费 消费者消费消息ack确认 用以确认不会丢失消息 消费消息 消费端代码 <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明队列名为:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
//确认消息已被消费,从生产队列中移除
$msg->ack();
};
//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);
//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//不断的循环进行消费
while ($channel->is_open()) {
$channel->wait();
}
//关闭连接
$channel->close();
$connection->close();fanout模式生产者推送到交换器发布/订阅模式 文档:rabbitmq Publish/Subscribe rabbitmq Exchange类型 交换器、路由键、绑定
Exchange:交换器。发送消息的AMQP实体。交换器拿到一个消息之后将它路由给一个或几个队列。它使用哪种路由算法是由交换机类型和被称作绑定(Binding)的规则所决定的。RabbitMQ有四种类型。
RoutingKey:路由键。生产者将消息发送给交换器。一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终失效。
Binding:绑定。绑定(Binding)是交换机(Exchange)将消息(Message)路由给队列(Queue)所需遵循的规则。
# 四种模式
Direct 定向 消息与一个特定的路由键完全匹配
Topic 通配符 路由键和某模式进行匹配
Fanout 广播 发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列
Headers 不处理路由键,而是根据发送的消息内容中的headers属性进行匹配exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。试探性申请一个交换器,若该交换器不存在,则创建;若存在,则跳过。
<?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'exch';
$channel->exchange_declare($exc_name, 'fanout', false, false, false);
//声明数据
$data = 'this is fanout message';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//发布消息
$channel->basic_publish($msg, $exc_name);
//关闭连接
$channel->close();
$connection->close();
fanout模式消费者消费消息 当消费端运行时才会显示该队列 <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'exch';
$channel->exchange_declare($exc_name, 'fanout', false, false, false);
//获取系统生成的消息队列名称
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
//将队列名与交换器名进行绑定
$channel->queue_bind($queue_name,$exc_name);
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
//确认消息已被消费,从生产队列中移除
$msg->ack();
};
//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);
//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//不断的循环进行消费
while ($channel->is_open()) {
$channel->wait();
}
//关闭连接
$channel->close();
$connection->close();direct模式消息队列使用文档: 用来指定不同的交换机和指定routing_key,在消费端进行消费 <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'info';
//指定交换机类型为direct
$channel->exchange_declare($exc_name, 'direct', false, false, false);
//声明数据
$data = 'this is ' . $routing_key . ' message';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//发布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//关闭连接
$channel->close();
$connection->close();消费者代码 <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'info';
$channel->exchange_declare($exc_name, 'direct', false, false, false);
//获取系统生成的消息队列名称
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
//将队列名与交换器名进行绑定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
//确认消息已被消费,从生产队列中移除
$msg->ack();
};
//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);
//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//不断的循环进行消费
while ($channel->is_open()) {
$channel->wait();
}
//关闭连接
$channel->close();
$connection->close();topic模式消息队列使用通配符的匹配模式 如消费端中routing_key = ‘user.*’; 生产者: <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'topic_log';
//指定routing_key
$routing_key = 'user.top';
//指定交换机类型为direct
$channel->exchange_declare($exc_name, 'topic', false, false, false);
//声明数据
$data = 'this is ' . $routing_key . ' message';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//发布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//关闭连接
$channel->close();
$connection->close();消费者 <?php
require_once "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
//建立connction
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order');
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'user.*';
$channel->exchange_declare($exc_name, 'topic', false, false, false);
//获取系统生成的消息队列名称
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
//将队列名与交换器名进行绑定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function ($msg) {
echo 'received = ', $msg->body . "\n";
//确认消息已被消费,从生产队列中移除
$msg->ack();
};
//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);
//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//不断的循环进行消费
while ($channel->is_open()) {
$channel->wait();
}
//关闭连接
$channel->close();
$connection->close();推荐学习:《PHP视频教程》 以上就是消息队列RabbitMQ入门与PHP实例详解的详细内容,更多请关注模板之家(www.mb5.com.cn)其它相关文章! |
