POSIX 线程(Pthread) 之前的 TCP 回射服务器通过 fork 子进程来维护多用户的请求, 但是 […]
UDP 套接字编程
UDP 套接字编程 UDP 套接字编程相较于TCP 套接字编程会简单一些, UDP 仅提供无连接的不可靠数据报 […]
TCP回射 服务端程序
TCP回射 服务端程序 本例为多进程的 TCP 回射程序(服务端) [crayon-607f8afaaf400 […]
swoole websocket 起步 入门
WebSocket connection to ‘ws://127.0.0.1:9502/’ failed: Error in connection establishment: net::ERR_CONNECTION_REFUSED
浏览器缓存设置
浏览器缓存设置 一、浏览器级缓存设置 减少服务器请求, 节省流量 设置 cache-control cache […]
URI URL URN 图解
URI URL URN 图解 一、URI 和 URL refer to RFC (一) 定义 Uniform […]
基于 WebSocket 的即时聊天 PHP
WebSocket PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
<?php class WebSocket { // 日志存储位置 const LOG_PATH = '/liveChat/tmp/'; const LISTEN_SOCKET_NUM = 9; /** * @var array $sockets 存储所有客户端的 socket 请求 * [ * // 将 $socket 转换成整形, 作为索引 * (int)$socket => [ * 'resource' => $socket, * 'uname' => '', * 'handshake' => false, * 'ip' => $ip, * 'port' => $port, * ] * ] */ private $sockets = []; private $master; public function __construct($host, $port) { error_reporting(E_ALL); set_time_limit(0);// 设置超时时间为无限,防止超时 date_default_timezone_set('Asia/shanghai'); /** * 创建连接, 绑定端口, 监听端口. **/ try { $this->master = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); // 设置IP和端口重用,在重启服务器后能重新使用此端口; socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1); // 将IP和端口绑定在服务器socket上; socket_bind($this->master, $host, $port); // 创建,绑定之后,它可以监听要连接的客户端. socket_listen($this->master, self::LISTEN_SOCKET_NUM); } catch (\Exception $e) { // 如有异常, 填写日志, 网页里无法输出的. $err_code = socket_last_error(); $err_msg = socket_strerror($err_code); $this->error(['error_init_server', $err_code, $err_msg]); } $this->sockets[0] = ['resource' => $this->master]; // 返回当前当前进程的 id, Windows 不能用 $pid = posix_getpid(); $this->debug(["server: {$this->master} started,pid: {$pid}"]); while (true) { try { $this->doServer(); } catch (\Exception $e) { $this->error(['error_do_server', $e->getCode(), $e->getMessage() ]); } } } private function doServer() { $write = $except = NULL; // 将所有 resource 整合成新的 $sockets 数组 $sockets = array_column($this->sockets, 'resource'); /* //这个函数是同时接受多个连接的关键,我的理解它是为了阻塞程序继续往下执行。 socket_select ($sockets, $write = NULL, $except = NULL, NULL); $sockets可以理解为一个数组,这个数组中存放的是文件描述符。当它有变化(就是有新消息到或者有客户端连接/断开)时,socket_select函数才会返回,继续往下执行。 $write是监听是否有客户端写数据,传入NULL是不关心是否有写变化。 $except是$sockets里面要被排除的元素,传入NULL是”监听”全部。 最后一个参数是超时时间 如果为0:则立即结束 如果为n>1: 则最多在n秒后结束,如遇某一个连接有新动态,则提前返回 如果为null:如遇某一个连接有新动态,则返回 */ $read_num = socket_select($sockets, $write, $except, NULL); // 如果 read_num 没有连接, 跳过这一层循环 if (false === $read_num) { // 发生错误直接跳出, 记录错误, 存储日志. $this->error([ 'error_select', $err_code = socket_last_error(), socket_strerror($err_code)]); return; } foreach ($sockets as $socket) { // 如果可读的是服务器 socket, 则处理连接逻辑, 应该只会使用第一次。 if ($socket == $this->master) { // 创建,绑定,监听后 accept 函数将会接受客户端 socket 要来的连接,一旦有一个连接成功,将会返回一个新的socket资源用以交互,如果是一个多个连接的队列,只会处理第一个,如果没有连接的话,进程将会被阻塞,直到连接上. $client = socket_accept($this->master); if (false === $client) { $this->error([ 'err_accept', $err_code = socket_last_error(), socket_strerror($err_code) ]); continue; } else { // 将 socket 添加到已连接列表,但握手状态留空, 记录日志 self::connect($client); continue; } } else { // 如果可读的是其他已连接的 socket, 则读取其数据, 并处理应答逻辑, 数据传到 $buffer $bytes = @socket_recv($socket, $buffer, 2048, 0); // 接收来的数据过小, 断开连接 if ($bytes < 9) { $recv_msg = $this->disconnect($socket); } else { // 正常情况, 没有握手, 现在去握手 if (!$this->sockets[(int)$socket]['handshake']) { self::handShake($socket, $buffer); continue; } else { 如果已经握手, 解析客户端发送来的数据. $recv_msg = self::parse($buffer); } } array_unshift($recv_msg, 'receive_msg'); $msg = self::dealMsg($socket, $recv_msg); $this->broadcast($msg); } } } /** * 将socket添加到已连接列表,但握手状态留空; * * @param $socket */ public function connect($socket) { socket_getpeername($socket, $ip, $port); $socket_info = [ 'resource' => $socket, 'uname' => '', 'handshake' => false, 'ip' => $ip, 'port' => $port, ]; $this->sockets[(int)$socket] = $socket_info; $this->debug(array_merge(['socket_connect'], $socket_info)); } /** * 客户端关闭连接 * * @param $socket * * @return array */ private function disconnect($socket) { $recv_msg = [ 'type' => 'logout', 'content' => $this->sockets[(int)$socket]['uname'], ]; // 退出链接, 删除掉 $sockets 数组中相应的 socket unset($this->sockets[(int)$socket]); return $recv_msg; } /** * 用公共握手算法握手 * * @param $socket * @param $buffer * * @return bool */ public function handShake($socket, $buffer) { // 获取到客户端的升级密匙 $line_with_key = substr($buffer, strpos($buffer, 'Sec-WebSocket-Key:') + 18); $key = trim(substr($line_with_key, 0, strpos($line_with_key, "\r\n"))); // 生成升级密匙,并拼接websocket升级头 $upgrade_key = base64_encode(sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));// 升级key的算法 $upgrade_message = "HTTP/1.1 101 Switching Protocols\r\n"; $upgrade_message .= "Upgrade: websocket\r\n"; $upgrade_message .= "Sec-WebSocket-Version: 13\r\n"; $upgrade_message .= "Connection: Upgrade\r\n"; $upgrade_message .= "Sec-WebSocket-Accept:" . $upgrade_key . "\r\n\r\n"; socket_write($socket, $upgrade_message, strlen($upgrade_message));// 向socket里写入升级信息 $this->sockets[(int)$socket]['handshake'] = true; // 获取客户端的 $ip 和 $port, 方便填写日志 socket_getpeername($socket, $ip, $port); $this->debug([ 'hand_shake', $socket, $ip, $port ]); // 向客户端发送握手成功消息, 以触发客户端发送用户名动作; $msg = [ 'type' => 'handshake', 'content' => 'done', ]; $msg = $this->build(json_encode($msg)); socket_write($socket, $msg, strlen($msg)); return true; } /** * 解析数据 * * @param $buffer * * @return bool|string */ private function parse($buffer) { $decoded = ''; // 返回首个字符的 ASCII 值/ 取正值 $len = ord($buffer[1]) & 127; if ($len === 126) { $masks = substr($buffer, 4, 4); $data = substr($buffer, 8); } else if ($len === 127) { $masks = substr($buffer, 10, 4); $data = substr($buffer, 14); } else { $masks = substr($buffer, 2, 4); $data = substr($buffer, 6); } for ($index = 0; $index < strlen($data); $index++) { $decoded .= $data[$index] ^ $masks[$index % 4]; } return json_decode($decoded, true); } /** * 将普通信息组装成websocket数据帧 * * @param $msg * * @return string */ private function build($msg) { $frame = []; $frame[0] = '81'; $len = strlen($msg); if ($len < 126) { $frame[1] = $len < 16 ? '0' . dechex($len) : dechex($len); } else if ($len < 65025) { $s = dechex($len); $frame[1] = '7e' . str_repeat('0', 4 - strlen($s)) . $s; } else { $s = dechex($len); $frame[1] = '7f' . str_repeat('0', 16 - strlen($s)) . $s; } $data = ''; $l = strlen($msg); for ($i = 0; $i < $l; $i++) { $data .= dechex(ord($msg{$i})); } $frame[2] = $data; $data = implode('', $frame); return pack("H*", $data); } /** * 拼装信息 * * @param $socket * @param $recv_msg * [ * 'type'=>user/login * 'content'=>content * ] * * @return string */ private function dealMsg($socket, $recv_msg) { $msg_type = $recv_msg['type']; $msg_content = $recv_msg['content']; $response = []; switch ($msg_type) { case 'login': $this->sockets[(int)$socket]['uname'] = $msg_content; // 取得最新的名字记录 $user_list = array_column($this->sockets, 'uname'); $response['type'] = 'login'; $response['content'] = $msg_content; $response['user_list'] = $user_list; break; case 'logout': $user_list = array_column($this->sockets, 'uname'); $response['type'] = 'logout'; $response['content'] = $msg_content; $response['user_list'] = $user_list; break; case 'user': $uname = $this->sockets[(int)$socket]['uname']; $response['type'] = 'user'; $response['from'] = $uname; $response['content'] = $msg_content; break; } return $this->build(json_encode($response)); } /** * 广播消息 * * @param $data */ private function broadcast($data) { // 给每一个客户端都发送数据 foreach ($this->sockets as $socket) { if ($socket['resource'] == $this->master) { continue; } socket_write($socket['resource'], $data, strlen($data)); } } /** * 记录debug信息 * * @param array $info */ private function debug(array $info) { $time = date('Y-m-d H:i:s'); array_unshift($info, $time); $info = array_map('json_encode', $info); file_put_contents(self::LOG_PATH . 'websocket_debug.log', implode(' | ', $info) . "\r\n", FILE_APPEND); } /** * 记录错误信息, 写进日志 * * @param array $info 所有错误 */ private function error(array $info) { $time = date('Y-m-d H:i:s'); array_unshift($info, $time); $info = array_map('json_encode', $info); file_put_contents(self::LOG_PATH . 'websocket_error.log', implode(' | ', $info) . "\r\n", FILE_APPEND); } } $ws = new WebSocket("127.0.0.1", "8080"); |
本文基本参照于 […]
What are WebSockets? — todo
What are WebSockets? An introduction WebSockets represe […]
自适应协议的链接,双斜杠 // 开始的 URL 连接
链接协议自适应 SSL 潮流的未来时代, 如网站设置过 SSL, 欲安全访问, 需要使用 https:// 的 […]
本地域名解析-本地解析域名
解析本地域名