分析之前请大家务必了解消息队列的实现;tp5的消息队列是基于database redis 和tp官方自己实现的 Topthink;本章是围绕redis来做分析。
前言分析之前请大家务必了解消息队列的实现 tp5的消息队列是基于database redis 和tp官方自己实现的 Topthink 存储key:
执行命令work和listen的区别在下面会解释
行为标签
命令参数
模式区别1: 执行原理不同 listen: 父进程 + 子进程 的处理模式; 2: 退出时机不同 开发者可以选择捕获该异常,让父进程继续执行; 3: 性能不同 listen: 是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本; 因此 work 模式的性能会比listen模式高。 4: 超时控制能力 可通过 timeout 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束; expire 在配置文件中设置,指任务的过期时间 这个时间是全局的,影响到所有的work进程 5: 使用场景不同 work 适用场景是: listen 适用场景是: 01: 任务数量较少 公有操作由于我们是根据redis来做分析 所以只需要分析src/queue/connector/redis.php 在redis.php类的构造方法中的操作: 发布过程发布参数
立即执行push($job, $data, $queue) Queue::push(Test::class, ['id' => 1], 'test'); 一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName [ 'job' => $job, // 要执行任务的类 'data' => $data, // 任务数据 'id'=>'xxxxx' //任务id ] 写入 redis并且返回队列id 延迟发布later($delay, $job, $data, $queue) Queue::later(100, Test::class, ['id' => 1], 'test'); 跟上面的差不多 执行过程执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解; //守护进程开启 'worker_daemon_start' => [ \app\index\behavior\WorkerDaemonStart::class ], //内存超出 'worker_memory_exceeded' => [ \app\index\behavior\WorkerMemoryExceeded::class ], //重启守护进程 'worker_queue_restart' => [ \app\index\behavior\WorkerQueueRestart::class ], //任务开始执行之前 'worker_before_process' => [ \app\index\behavior\WorkerBeforeProcess::class ], //任务延迟执行 'worker_before_sleep' => [ \app\index\behavior\WorkerBeforeSleep::class ], //任务执行失败 'queue_failed' => [ \app\index\behavior\QueueFailed::class ] public function run(Output $output) { $output->write('<info>任务执行失败</info>', true); } 控制台执行 守护进程开启 任务延迟执行 失败的处理 如果有任务执行失败或者执行次数达到最大值 在 最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架 在其他项目中推任务php版本<?php class Index { private $redis = null; public function __construct() { $this->redis = new Redis(); $this->redis->connect('127.0.0.1', 6379); $this->redis->select(10); } public function push($job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->rPush('queues:' . $queue, $payload); } public function later($delay, $job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload); } private function createPayload($job, $data) { $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32)); return $this->setMeta($payload, 'attempts', 1); } private function setMeta($payload, $key, $value) { $payload = json_decode($payload, true); $payload[$key] = $value; $payload = json_encode($payload); if (JSON_ERROR_NONE !== json_last_error()) { throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); } return $payload; } private function random(int $length = 16): string { $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'; $randomString = ''; for ($i = 0; $i < $length; $i++) { $randomString .= $str[rand(0, strlen($str) - 1)]; } return $randomString; } } (new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test'); go版本package main import ( "encoding/json" "github.com/garyburd/redigo/redis" "math/rand" "time" ) type Payload struct { Id string `json:"id"` Job string `json:"job"` Data interface{} `json:"data"` Attempts int `json:"attempts"` } var RedisClient *redis.Pool func init() { RedisClient = &redis.Pool{ MaxIdle: 20, MaxActive: 500, IdleTimeout: time.Second * 100, Dial: func() (conn redis.Conn, e error) { c, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { return nil, err } _, _ = c.Do("SELECT", 10) return c, nil }, } } func main() { var data = make(map[string]interface{}) data["id"] = "1" later(10, "app\\index\\jobs\\Test", data, "test") } func push(job string, data interface{}, queue string) { payload := createPayload(job, data) queueName := "queues:" + queue _, _ = RedisClient.Get().Do("rPush", queueName, payload) } func later(delay int, job string, data interface{}, queue string) { m, _ := time.ParseDuration("+1s") currentTime := time.Now() op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix() createPayload(job, data) payload := createPayload(job, data) queueName := "queues:" + queue + ":delayed" _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload) } // 创建指定格式的数据 func createPayload(job string, data interface{}) (payload string) { payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1} jsonStr, _ := json.Marshal(payload1) return string(jsonStr) } // 创建随机字符串 func random(n int) string { var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") b := make([]rune, n) for i := range b { b[i] = str[rand.Intn(len(str))] } return string(b) } 更多thinkphp技术知识,请访问thinkphp教程栏目! 以上就是解析think-queue(围绕redis做分析)的详细内容,更多请关注模板之家(www.mb5.com.cn)其它相关文章! |