豌豆Ai站群搜索引擎系统 V.25.10.25 网址:www.wd.chat

🗣 Wd提问: php分页消息队列

🤖 Ai回答:
我来为您介绍PHP分页消息队列的实现方案。

1、基于Redis的消息队列分页

消息队列基础类

php
<?php
class RedisMessageQueue {
private $redis;
private $queueName;

public function __construct($queueName, $redisConfig = []) {
$this-queueName = $queueName;
$this-redis = new Redis();
$this-redis-connect($redisConfig['host'] ?? '127.0.0.1',
$redisConfig['port'] ?? 6379);
}

// 入队
public function push($message) {
$data = [
'id' = uniqid(),
'message' = $message,
'created_at' = time(),
'status' = 'pending'
];
return $this-redis-lpush($this-queueName, json_encode($data));
}

// 出队
public function pop() {
$data = $this-redis-rpop($this-queueName);
return $data ? json_decode($data, true) : null;
}

// 获取队列长度
public function getLength() {
return $this-redis-llen($this-queueName);
}
}

分页消息队列类

php
<?php
class PaginatedMessageQueue extends RedisMessageQueue {

// 获取分页消息
public function getMessages($page = 1, $limit = 10) {
$start = ($page 1) * $limit;
$end = $start + $limit 1;

// 获取指定范围的消息
$messages = $this-redis-lrange($this-queueName, $start, $end);
$total = $this-getLength();

$result = [];
foreach ($messages as $message) {
$result[] = json_decode($message, true);
}

return [
'data' = array_reverse($result), // 反转以显示最新消息在前
'total' = $total,
'page' = $page,
'limit' = $limit,
'pages' = ceil($total / $limit)
];
}

// 根据状态筛选消息
public function getMessagesByStatus($status, $page = 1, $limit = 10) {
$allMessages = $this-redis-lrange($this-queueName, 0, -1);
$filteredMessages = [];

foreach ($allMessages as $message) {
$data = json_decode($message, true);
if ($data['status'] === $status) {
$filteredMessages[] = $data;
}
}

$total = count($filteredMessages);
$start = ($page 1) * $limit;
$pagedMessages = array_slice(array_reverse($filteredMessages), $start, $limit);

return [
'data' = $pagedMessages,
'total' = $total,
'page' = $page,
'limit' = $limit,
'pages' = ceil($total / $limit)
];
}

// 更新消息状态
public function updateMessageStatus($messageId, $status) {
$allMessages = $this-redis-lrange($this-queueName, 0, -1);
$this-redis-del($this-queueName);

foreach ($allMessages as $message) {
$data = json_decode($message, true);
if ($data['id'] === $messageId) {
$data['status'] = $status;
$data['updated_at'] = time();
}
$this-redis-rpush($this-queueName, json_encode($data));
}
}
}

2、基于数据库的消息队列分页

数据库表结构

sql
CREATE TABLE message_queue (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(32) NOT NULL UNIQUE,
message TEXT NOT NULL,
status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
priority TINYINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status (status),
INDEX idx_priority_created (priority, created_at)
);

数据库消息队列类

php
<?php
class DatabaseMessageQueue {
private $pdo;
private $tableName;

public function __construct($pdo, $tableName = 'message_queue') {
$this-pdo = $pdo;
$this-tableName = $tableName;
}

// 入队
public function push($message, $priority = 0) {
$sql = INSERT INTO {$this-tableName} (message_id, message, priority) VALUES (?, ?, ?);
$stmt = $this-pdo-prepare($sql);
return $stmt-execute([uniqid(), json_encode($message), $priority]);
}

// 出队
public function pop() {
$this-pdo-beginTransaction();
try {
// 获取优先级最高的待处理消息
$sql = SELECT * FROM {$this-tableName} WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE;
$stmt = $this-pdo-query($sql);
$message = $stmt-fetch(PDO::FETCH_ASSOC);

if ($message) {
// 更新状态为处理中
$updateSql = UPDATE {$this-tableName} SET status = 'processing', updated_at = NOW() WHERE id = ?;
$updateStmt = $this-pdo-prepare($updateSql);
$updateStmt-execute([$message['id']]);

$this-pdo-commit();
return $message;
}

$this-pdo-commit();
return null;
} catch (Exception $e) {
$this-pdo-rollback();
throw $e;
}
}

// 分页获取消息
public function getMessages($page = 1, $limit = 10, $filters = []) {
$offset = ($page 1) * $limit;

$where = 1=1;
$params = [];

// 状态筛选
if (!empty($filters['status'])) {
$where .= AND status = ?;
$params[] = $filters['status'];
}

// 优先级筛选
if (isset($filters['priority'])) {
$where .= AND priority = ?;
$params[] = $filters['priority'];
}

// 时间范围筛选
if (!empty($filters['start_time'])) {
$where .= AND created_at = ?;
$params[] = $filters['start_time'];
}

if (!empty($filters['end_time'])) {
$where .= AND created_at <= ?;
$params[] = $filters['end_time'];
}

// 获取总数
$countSql = SELECT COUNT(*) FROM {$this-tableName} WHERE {$where};
$countStmt = $this-pdo-prepare($countSql);
$countStmt-execute($params);
$total = $countStmt-fetchColumn();

// 获取分页数据
$order = $filters['order'] ?? 'created_at DESC';
$sql = SELECT * FROM {$this-tableName} WHERE {$where} ORDER BY {$order} LIMIT ? OFFSET ?;
$params[] = $limit;
$params[] = $offset;

$stmt = $this-pdo-prepare($sql);
$stmt-execute($params);
$messages = $stmt-fetchAll(PDO::FETCH_ASSOC);

return [
'data' = $messages,
'total' = (int)$total,
'page' = (int)$page,
'limit' = (int)$limit,
'pages' = ceil($total / $limit)
];
}

// 更新消息状态
public function updateStatus($messageId, $status) {
$sql = UPDATE {$this-tableName} SET status = ?, updated_at = NOW() WHERE message_id = ?;
$stmt = $this-pdo-prepare($sql);
return $stmt-execute([$status, $messageId]);
}

// 批量更新状态
public function batchUpdateStatus($messageIds, $status) {
$placeholders = str_repeat('?,', count($messageIds) 1) 、'?';
$sql = UPDATE {$this-tableName} SET status = ?, updated_at = NOW() WHERE message_id IN ({$placeholders});
$params = array_merge([$status], $messageIds);

$stmt = $this-pdo-prepare($sql);
return $stmt-execute($params);
}
}

3、使用示例

Redis分页队列使用

php
<?php
// 初始化队列
$queue = new PaginatedMessageQueue('user_notifications', [
'host' = '127.0.0.1',
'port' = 6379
]);

// 添加消息
$queue-push(['user_id' = 1, 'content' =

Ai作答

📣 商家广告


高防vps

姓名

贵阳用友软件

广告招商

公司


0

IP地址: 138.38.29.33

搜索次数: 0

提问时间: 2025-11-06 00:37:20

🛒 域名购买
jjxs.cn
xuanhuo.com.cn
txqg.cn
021.hk
396.net.cn
8.tj.cn
961.net.cn
c-a.com.cn
e-j.com.cn
jc9.com.cn

❓️ 热门提问
怎么搭建云vps
r7625服务器
love域名是哪个国家的
.co.jp域名
rhub服务器
专业网店网店托管
电脑主机什么牌子出名
vue开发门户网站
debian ip
cn域名续费是多少
豌豆Ai站群搜索引擎系统

🌐 域名评估
evtol.fund
yy.nm.cn
wk.gs
oo.hk.cn
wt.cx
1688.com
163.com
longcat.org.cn
jf.cx
rh.cx

⛏ 最新挖掘
凯里酸汤鱼
贵阳烤脑花
都匀毛尖茶
花溪牛肉粉
贵阳烤生蚝
贵阳烤豆腐
贵阳纸包鱼
贵阳特色菜

🖌 热门作画

🤝 关于我们:
豌豆Ai 域名 建站 站群 留痕 推广 评估 源码
开发Ai 工具 日记 价格 加盟 广告 流量 留言 联系

🗨 加入群聊
群

🔗 友情链接
广州月饼厂家  좋아요 순위  ai提问

🧰 站长工具
Ai工具  whois查询  搜索

📢 温馨提示:本站所有问答由Ai自动创作,内容仅供参考,若有误差请用“联系”里面信息通知我们人工修改或删除。

👉 技术支持:本站由豌豆Ai提供技术支持,使用的最新版:《豌豆Ai站群搜索引擎系统 V.25.10.25》搭建本站。

上一篇 65907 65908 65909 下一篇