重写workman服务,能支持端线重试
2026-03-06
我问:
<?php
/**
* Workerman WebSocket Server (兼容 Web 纯字符串 uid + 客户端 JSON 字符串 uidKey)
* 目标:
* 1) 兼容现状:Web 端发 "5",客户端首包可能是 JSON 串(原样字符串当 key)
* 2) 同一连接同时绑定多个 key(alias):rawKey(JSON串) + plainUid("5")
* 3) 心跳不回包(减少流量/日志);只更新时间
* 4) 发送失败自动入队(Yii cache),最多重试 5 次,连接恢复后自动补发
*/
use Workerman\Worker;
use Workerman\Timer;
use Workerman\Connection\ConnectionInterface;
require_once __DIR__ . '/../../../vendor/workerman/workerman/Autoloader.php';
require_once __DIR__ . '/../../../config/function.php';
Worker::$pidFile = '/var/run/workerman.pid';
define('ROOT_DIR', env('zpy.project.rootdir'));
// 心跳超时:超过这个时间未收到任何消息,则认为断线并 close
define('HEARTBEAT_TIME', 12);
// 重试相关
define('WS_RETRY_MAX', 5);
define('WS_RETRY_INTERVAL', 3); // 线性退避:tries * interval
define('WS_CACHE_TTL', 3600); // 队列缓存 TTL
define('WS_QUEUE_LIMIT', 2000); // 单 uid 队列上限,防止爆
// ============================================================================
// 工具函数
// ============================================================================
function wsLog(string $msg): void
{
file_put_contents(ROOT_DIR . '/runtime/logs/workman.log', $msg . PHP_EOL, FILE_APPEND);
}
function wsCloseLog(string $msg): void
{
file_put_contents(ROOT_DIR . '/runtime/logs/workman_close.log', $msg . PHP_EOL, FILE_APPEND);
}
/**
* 获取 Yii cache(你们项目已在 function.php/bootstrap 中初始化 Yii::$app)
*/
function wsCache()
{
if (class_exists('\\Yii') && isset(\Yii::$app) && \Yii::$app->cache) {
return \Yii::$app->cache;
}
return null;
}
function pendingUidsKey(): string
{
return 'ws:pending_uids';
}
function pendingListKey(string $uidPlain): string
{
// uidPlain 一般是 "5" 这种,做个 md5 防止奇怪字符
return 'ws:pending:' . md5($uidPlain);
}
/**
* 解析消息得到“可绑定的 key 列表”
* - 永远包含 rawKey(原样字符串):兼容你们现在“json字符串作为 uidKey”
* - 如果是纯数字字符串:追加 plainUid
* - 如果是 JSON 串并能解析出 uid 字段:追加 plainUid
*/
function parseUidKeys(string $msg): array
{
$msg = trim($msg);
$keys = [];
if ($msg !== '') {
$keys[] = $msg; // rawKey
}
return array_values(array_unique($keys));
}
/**
* 绑定多个 key 到同一个 connection,并在 connection 上记录:
* - uidKeys: 本连接绑定过的所有 key(onClose 时用于清理)
* - uidPlain: 归一化 uid(用于重试队列、心跳判定等)
*/
function bindConnectionUidKeys(Worker $worker, ConnectionInterface $connection, array $keys): void
{
if (!isset($connection->uidKeys) || !is_array($connection->uidKeys)) {
$connection->uidKeys = [];
}
// 推断 plainUid:优先取 keys 中“纯数字”的那个,否则取解析出来的 uid(可能也是数字字符串)
$plain = '';
foreach ($keys as $k) {
if ($k !== '' && ctype_digit($k)) {
$plain = $k;
break;
}
}
if ($plain === '') {
// 如果 keys[1] 是从 JSON uid 解析出的,也可能是数字字符串
foreach ($keys as $k) {
if ($k !== '' && preg_match('/^\d+$/', $k)) {
$plain = $k;
break;
}
}
}
if ($plain === '') {
// 兜底:用第一个 key
$plain = (string)($keys[0] ?? '');
}
foreach ($keys as $k) {
if ($k === '') continue;
$worker->uidConnections[$k] = $connection;
$connection->uidKeys[$k] = 1;
}
$connection->uidPlain = $connection->uidPlain ?? $plain;
}
/**
* 从 uidKey(可能是 JSON 串)解析出 plain uid
*/
function extractPlainUid(string $uidKey): string
{
$uidKey = trim($uidKey);
if ($uidKey === '') return '';
return $uidKey;
}
/**
* 维护 pending_uids(按 uidPlain)
*/
function addPendingUid(string $uidPlain): void
{
$cache = wsCache();
if (!$cache) return;
$key = pendingUidsKey();
$uids = $cache->get($key);
if (!is_array($uids)) $uids = [];
$uids[$uidPlain] = 1;
$cache->set($key, $uids, WS_CACHE_TTL);
}
function removePendingUidIfEmpty(string $uidPlain): void
{
$cache = wsCache();
if (!$cache) return;
$listKey = pendingListKey($uidPlain);
$list = $cache->get($listKey);
if (is_array($list) && !empty($list)) return;
$key = pendingUidsKey();
$uids = $cache->get($key);
if (!is_array($uids)) $uids = [];
unset($uids[$uidPlain]);
$cache->set($key, $uids, WS_CACHE_TTL);
}
/**
* 入队:发送失败的消息进入重试队列(按 uidPlain)
* 可选:做弱去重(hash message),避免断线频繁重复入队同一条
*/
function enqueueRetry(string $uidPlain, string $message, string $reason = ''): void
{
$uidPlain = trim($uidPlain);
if ($uidPlain === '') {
wsLog("[retry][enqueue][skip] empty uidPlain reason={$reason}");
return;
}
$cache = wsCache();
if (!$cache) {
wsLog("[retry][enqueue][no-cache] uid={$uidPlain} reason={$reason} msg=" . mb_substr($message, 0, 200));
return;
}
// 弱去重:10分钟内同一 uid+同一消息 hash 不重复入队(你可删掉这段)
$dedupKey = 'ws:dedup:' . md5($uidPlain . '|' . sha1($message));
if ($cache->get($dedupKey)) {
wsLog("[retry][enqueue][dedup] uid={$uidPlain} reason={$reason}");
return;
}
$cache->set($dedupKey, 1, 600);
$listKey = pendingListKey($uidPlain);
$list = $cache->get($listKey);
if (!is_array($list)) $list = [];
$now = time();
$list[] = [
'id' => bin2hex(random_bytes(8)),
'message' => $message,
'tries' => 0,
'next_at' => $now + 1,
'created_at' => $now,
];
if (count($list) > WS_QUEUE_LIMIT) {
$list = array_slice($list, -WS_QUEUE_LIMIT);
}
$cache->set($listKey, $list, WS_CACHE_TTL);
addPendingUid($uidPlain);
wsLog("[retry][enqueue] uid={$uidPlain} reason={$reason} queue=" . count($list));
}
/**
* flush:连接存在时,把 uidPlain 的待发队列尽量发出去
*/
function flushRetryQueueByUid(string $uidPlain): void
{
global $worker;
$uidPlain = trim($uidPlain);
if ($uidPlain === '') return;
$cache = wsCache();
if (!$cache) return;
// 连接是否存在:注意这里用 uidPlain 去查(因为我们给同一连接绑定了 alias,其中包含 plainUid)
if (!isset($worker->uidConnections[$uidPlain])) return;
$conn = $worker->uidConnections[$uidPlain];
if (!$conn) return;
$listKey = pendingListKey($uidPlain);
$list = $cache->get($listKey);
if (!is_array($list) || empty($list)) {
removePendingUidIfEmpty($uidPlain);
return;
}
$now = time();
$newList = [];
$sent = 0;
$drop = 0;
foreach ($list as $item) {
$nextAt = (int)($item['next_at'] ?? 0);
if ($nextAt > $now) {
$newList[] = $item;
continue;
}
$tries = (int)($item['tries'] ?? 0);
if ($tries >= WS_RETRY_MAX) {
$drop++;
continue;
}
try {
$conn->send((string)$item['message']);
$sent++;
} catch (\Throwable $e) {
$tries++;
$item['tries'] = $tries;
$item['next_at'] = $now + ($tries * WS_RETRY_INTERVAL);
$newList[] = $item;
wsLog("[retry][send-ex] uid={$uidPlain} tries={$tries} err=" . $e->getMessage());
}
}
if (empty($newList)) {
$cache->delete($listKey);
removePendingUidIfEmpty($uidPlain);
} else {
$cache->set($listKey, $newList, WS_CACHE_TTL);
}
if ($sent || $drop) {
wsLog("[retry][flush] uid={$uidPlain} sent={$sent} drop={$drop} remain=" . count($newList));
}
}
/**
* 统一发送入口:
* - uidKey 可能是 "5" 或 JSON 串
* - 优先按 uidKey 命中(rawKey/alias)
* - 命不中解析 plainUid 再试
* - 仍失败则按 plainUid 入队
*/
function sendMessageByUid(string $uidKey, string $message): bool
{
global $worker;
$uidKey = trim($uidKey);
$plainUid = extractPlainUid($uidKey);
wsLog("[send] uidKey=" . mb_substr($uidKey, 0, 120) . " plain={$plainUid} msg=" . mb_substr($message, 0, 200));
// 1) 直接命中 uidKey(rawKey / aliasKey)
if ($uidKey !== '' && isset($worker->uidConnections[$uidKey])) {
try {
$worker->uidConnections[$uidKey]->send($message);
return true;
} catch (\Throwable $e) {
enqueueRetry($plainUid, $message, 'send-throw:' . $e->getMessage());
return false;
}
}
// 2) 兜底:命中 plainUid
if ($plainUid !== '' && isset($worker->uidConnections[$plainUid])) {
try {
$worker->uidConnections[$plainUid]->send($message);
return true;
} catch (\Throwable $e) {
enqueueRetry($plainUid, $message, 'send-throw-plain:' . $e->getMessage());
return false;
}
}
// 3) 没连接:入队
enqueueRetry($plainUid, $message, 'no-connection');
return false;
}
// ============================================================================
// Worker 初始化
// ============================================================================
$worker = new Worker('websocket://0.0.0.0:5001');
$worker->count = 1;
// 统一用这个映射:key(可能是 raw JSON 串/也可能是 "5") => connection
$worker->uidConnections = [];
$worker->onWorkerStart = function(Worker $worker) {
// 内部通信:按 uidKey 推送(uidKey 可为 "5" 或 JSON 串)
$inner_text_worker = new Worker('text://0.0.0.0:5002');
$inner_text_worker->onMessage = function(ConnectionInterface $connection, $buffer) {
$data = json_decode($buffer, true);
wsLog("inner recv: {$buffer}");
if (!is_array($data)) {
$connection->send('fail');
return;
}
$uidKey = (string)($data['uid'] ?? '');
if ($uidKey === '') {
$connection->send('fail');
return;
}
$ft = 0;
if (isset($data['FunctionType'])) {
$ft = (int)$data['FunctionType'];
} elseif ($uidKey !== '' && $uidKey[0] === '{') {
$tmp = json_decode($uidKey, true);
if (is_array($tmp) && isset($tmp['FunctionType'])) $ft = (int)$tmp['FunctionType'];
}
if ($ft === 1 || $ft === 2) {
$errMsg = $ft === 1 ? '获取变量成功' : '同步底稿变量成功';
$isSuccess = 1;
$d = true;
if (isset($data['ratio']) && (int)$data['ratio'] !== 100) {
$errMsg = $ft === 1 ? '获取变量失败' .($data['msg'] ?? '') : '同步底稿变量失败' . ($data['msg'] ?? '');
$isSuccess = 0;
$d = false;
}
unset($data['msg'], $data['code'], $data['ratio']);
$data['ErrMsg'] = $data['ErrMsg'] ?? $errMsg;
$data['IsSuccess'] = $data['IsSuccess'] ?? $isSuccess;
$data['FunctionType'] = $ft;
$data['Data'] = $ft === 1 ? ($data['Data'] ?? '') : $d;
$buffer = json_encode($data, JSON_UNESCAPED_UNICODE);
}
wsLog("inner send to uidKey={$uidKey} payload=" . mb_substr($buffer, 0, 200));
$ret = sendMessageByUid($uidKey, $buffer);
$connection->send($ret ? 'ok' : 'queued'); // queued 表示入队等待重试
};
$inner_text_worker->listen();
// 1) 心跳超时检测:超过 HEARTBEAT_TIME 未收到任何消息则 close
Timer::add(5, function() use ($worker) {
$now = time();
foreach ($worker->uidConnections as $key => $conn) {
if (!$conn) continue;
// 只对“plainUid key”做超时检查,避免同一连接多 key 重复 close
// plainUid 一般是数字字符串
if (!ctype_digit((string)$key)) continue;
if (empty($conn->lastMessageTime)) {
$conn->lastMessageTime = $now;
continue;
}
if ($now - $conn->lastMessageTime > HEARTBEAT_TIME) {
$uidPlain = $conn->uidPlain ?? $key;
wsLog("[heartbeat][timeout] uid={$uidPlain} close");
// 先清理映射(只删指向当前 connection 的 key)
if (isset($conn->uidKeys) && is_array($conn->uidKeys)) {
foreach ($conn->uidKeys as $k => $_) {
if (isset($worker->uidConnections[$k]) && $worker->uidConnections[$k] === $conn) {
unset($worker->uidConnections[$k]);
}
}
}
try { $conn->close(); } catch (\Throwable $e) {}
}
}
});
// 2) 定时重试:扫描 pending_uids(按 uidPlain),已连接则 flush
Timer::add(2, function() {
$cache = wsCache();
if (!$cache) return;
$uids = $cache->get(pendingUidsKey());
if (!is_array($uids) || empty($uids)) return;
foreach ($uids as $uidPlain => $_) {
flushRetryQueueByUid((string)$uidPlain);
}
});
};
// 建立连接
$worker->onConnect = function(ConnectionInterface $connection) {
$ip = $connection->getRemoteIp();
wsLog(date('Y-m-d H:i:s') . " {$ip} 开始连接。。。");
};
/**
* 收到消息:
* - 首包:绑定多个 key(rawKey + plainUid)
* - 心跳:web 端纯数字(且等于 plainUid)=> 只更新时间,不回包
* - 客户端 JSON(握手/心跳性质)=> 只更新时间,不回包
* - 真业务:保留你原来的 biubiubiu / FunctionType 特判(但不再让心跳触发它)
*/
/** ========= onMessage:保持你老逻辑 + 增加重连补发 ========= */
$worker->onMessage = function(ConnectionInterface $connection, $uid) use ($worker) {
$connection->lastMessageTime = time();
$msg = trim((string)$uid);
// 1) 保留原始日志(与老代码一致)
wsLog("{$msg}发送消息。。。");
// 2) 兼容老代码:把收到的首包/每包都写入 connection->uid
$connection->uid = $msg;
$worker->sendUid = $msg;
// 3) 首包/未绑定:绑定 rawKey + plainUid(用于断线重试与多 key 兼容)
if (!isset($connection->uidKeys)) {
$keys = parseUidKeys($msg);
if (!empty($keys)) {
bindConnectionUidKeys($worker, $connection, $keys);
wsLog("[bind] keys=" . implode('|', $keys) . " uidPlain=" . ($connection->uidPlain ?? ''));
// 绑定/重连成功:立刻补发(按 plain uid)
if (!empty($connection->uidPlain)) {
flushRetryQueueByUid((string)$connection->uidPlain);
}
}
}
// 4) Web 纯数字心跳:等于本连接 uidPlain => 回 biubiubiu(保持你们 web 端行为)
if ($msg !== '' && ctype_digit($msg) && isset($connection->uidPlain) && $msg === (string)$connection->uidPlain) {
wsLog("{$msg}发送消息。。。1111");
$connection->send('biubiubiu');
return;
}
// 5) 客户端 JSON 心跳/握手:只认 FunctionType=1,并回固定结构(不再原样回显)
// 说明:你截图里“最后一条 code/msg/ratio/Data”就是 FunctionType=2 被原样回显导致的
if ($msg !== '' && $msg[0] === '{') {
$arr = json_decode($msg, true);
if (is_array($arr) && isset($arr['uid'])) {
$ft = isset($arr['FunctionType']) ? (int)$arr['FunctionType'] : 0;
// 仅 FunctionType=1 视为心跳/握手,回固定结构
if ($ft !== 0) {
$connection->send($msg);
return;
}
// FunctionType=2 或其他:不回显(避免出现异常结构)
return;
}
}
// 6) 老业务逻辑:尽量保持,但不再回显 FunctionType=2(避免异常结构混入)
// $send = 'biubiubiu';
// $sendUid = null;
// if (is_string($msg) && $msg !== '0' && $msg !== '') {
// $sendUid = @json_decode($msg);
// }
// if (is_object($sendUid) && isset($sendUid->FunctionType) && (int)$sendUid->FunctionType === 1) {
// $send = $msg;
// }
//
// $connection->send($send);
};
// 断开连接:清理所有 alias key,避免旧连接占坑
$worker->onClose = function(ConnectionInterface $connection) use ($worker) {
$uidPlain = $connection->uidPlain ?? ($connection->uid ?? '未知用户');
wsCloseLog(($uidPlain ?: '未知用户') . " 断开连接。。。");
if (isset($connection->uidKeys) && is_array($connection->uidKeys)) {
foreach ($connection->uidKeys as $k => $_) {
if (isset($worker->uidConnections[$k]) && $worker->uidConnections[$k] === $connection) {
unset($worker->uidConnections[$k]);
}
}
}
};
// 运行
Worker::runAll();请你仔细阅读这个代码,我的uid在web端是一个纯数字,比如5,在客户端格式如下:
{"uid":"12","FunctionType":4,"Data":"","ErrMsg":null,"IsSuccess":"1"}
请帮我重写以上代码,里面很多错误,导致我客户端跟web端串线。
AI答:
下面把我们整个 Workerman WebSocket 服务设计与排查过程做一个完整的技术归档。
我会按 架构 → 协议设计 → 核心代码逻辑 → 日志设计 → 常见问题 → 最终稳定方案 的顺序整理,这样你以后回看或者交接同事都能一眼看懂。
一、系统目标(为什么要这样设计)
你要实现的是一个 统一 WebSocket 推送服务,同时支持三类客户端:
| 客户端 | 连接键 |
|---|---|
| Web 页面 | 纯数字,例如 5 |
| Excel 插件 | JSON字符串 |
| Word 插件 | JSON字符串 |
示例:
Excel:
{"uid":"12","FunctionType":4,"Data":"","ErrMsg":null,"IsSuccess":"1"}Word:
{"uid":"12","FunctionType":1,"Data":"","ErrMsg":null,"IsSuccess":"1"}Web:
5
要求:
不同客户端不能串线
JSON字符串直接作为连接键
Web用数字作为连接键
客户端心跳必须原样返回
内部服务可以通过 5002 端口推送
网络波动时推送失败要缓存并重试
日志必须记录完整推送信息
二、系统整体架构
┌──────────────┐ │ Web Client │ └──────┬───────┘ │ websocket │ ┌──────▼─────────┐ │ Workerman WS │ 5001 │ │ │ uidConnections │ └──────┬─────────┘ │ │ 内部推送 │ ┌──────▼─────────┐ │ text server │ 5002 └────────────────┘
说明:
5001
WebSocket 服务端
5002
内部推送端口
内部系统调用:
socket -> 5002 -> Workerman -> 推送客户端
三、核心设计:连接键模型
关键原则:
连接键 = 客户端发送的原始字符串
Web
5
Excel
{"uid":"12","FunctionType":4,"Data":"","ErrMsg":null,"IsSuccess":"1"}Word
{"uid":"12","FunctionType":1,"Data":"","ErrMsg":null,"IsSuccess":"1"}服务器存储:
$worker->uidConnections[key] = connection
示例:
$worker->uidConnections["5"] => conn1
$worker->uidConnections["{json...4}"] => conn2
$worker->uidConnections["{json...1}"] => conn3这样不会串线。
四、客户端心跳协议
Web 心跳
客户端发
5
服务器返回
biubiubiu
Excel / Word 心跳
客户端发
{"uid":"12","FunctionType":4,"Data":"","ErrMsg":null,"IsSuccess":"1"}服务器 必须原样返回
{"uid":"12","FunctionType":4,"Data":"","ErrMsg":null,"IsSuccess":"1"}这是客户端判断连接状态的机制。
五、内部推送协议
内部服务通过 5002端口 推送:
示例:
{
"uid":"{\"uid\":\"12\",\"FunctionType\":4,\"Data\":\"\",\"ErrMsg\":null,\"IsSuccess\":\"1\"}",
"Data":"hello"
}服务器处理流程:
解析uid ↓ 找到对应连接 ↓ send() ↓ 成功 -> ok 失败 -> 入队重试
六、FunctionType 业务逻辑(必须保留)
你要求保留的逻辑:
$ft = 0;
if (isset($data['FunctionType'])) {
$ft = (int)$data['FunctionType'];
} elseif ($uidKey !== '' && $uidKey[0] === '{') {
$tmp = json_decode($uidKey, true);
if (is_array($tmp) && isset($tmp['FunctionType'])) {
$ft = (int)$tmp['FunctionType'];
}
}处理逻辑:
FunctionType = 1
获取变量
返回:
ErrMsg = 获取变量成功 IsSuccess = 1
失败:
ErrMsg = 获取变量失败 IsSuccess = 0
FunctionType = 2
同步底稿变量
返回:
ErrMsg = 同步底稿变量成功 IsSuccess = 1 Data = true
失败:
ErrMsg = 同步底稿变量失败 IsSuccess = 0 Data = false
七、推送失败重试机制
当
send()
失败时:
进入缓存队列
ws_queue_md5(uid)
结构:
[ msg, try, created_time ]
定时器:
Timer::add(2秒)
逻辑:
读取队列 ↓ 重新send ↓ 成功删除 ↓ 失败重试 ↓ 超过5次丢弃
八、日志设计(关键)
日志必须记录:
| 信息 | 示例 |
|---|---|
| 连接ID | connId |
| 目标UID | uid |
| 发送内容 | content |
| 时间 | time |
| 推送结果 | ok / fail |
接收日志
[recv] connId=xxxx content=xxx
Web心跳
[heartbeat web recv] connId=xxx uid=5 content=5 [heartbeat web send] connId=xxx uid=5 content=biubiubiu
Excel / Word 心跳
[heartbeat client recv] connId=xxx uid={json} content={json}
[heartbeat client send] connId=xxx uid={json} content={json}内部推送
准备发送:
[push prepare] uid=xxx content=xxx
发送成功:
[push ok]
未找到连接:
[push miss]
发送异常:
[push send exception]
进入队列:
[push queued]
队列补发
成功:
[retry ok]
失败:
[retry fail]
超过次数:
[retry drop]
九、关键问题排查记录
开发过程中遇到的核心问题:
问题1
onMessage 没触发
原因:
客户端没有 send。
解决:
必须执行:
ws.send(...)
问题2
text协议不触发
原因:
5002 使用
text://
必须带换行
fwrite($fp,$json."\n");
问题3
uid字符串不一致
例如:
客户端
{"uid":"12","FunctionType":4}内部推送
{"FunctionType":4,"uid":"12"}字符串不同
导致找不到连接。
解决:
uidKey必须完全一致
问题4
PHP8命名参数
代码:
wsLog(msg: "xxx")
如果不是 PHP8 可能报错。
改为:
wsLog("xxx")十、最终稳定设计原则
总结为五条:
1
连接键使用客户端原始字符串
不解析、不转换。
2
心跳和业务推送完全分离
心跳只:
更新lastMessageTime 原样返回
3
推送失败进入队列
保证网络波动可恢复。
4
日志必须包含
connId uid content time result
方便排查。
5
Worker进程数
$worker->count = 1
否则连接映射会乱。
十一、最终系统能力
现在这个 Workerman 服务具备:
✔ Web / Excel / Word 同时连接
✔ JSON字符串连接键
✔ 不串线
✔ 心跳原样返回
✔ 内部推送接口
✔ 失败自动重试
✔ 完整日志体系
✔ 网络波动恢复
发表评论: