黄p网站_在线看中文字幕_亚洲电影免费观看_成人激情视频_欧美成视频_中国av在线

EasySwoole RPC

很多傳統(tǒng)的 Phper 并不懂 RPC 是什么,RPC 全稱 Remote Procedure Call,中文譯為 遠(yuǎn)程過(guò)程調(diào)用,其實(shí)你可以把它理解為是一種架構(gòu)性上的設(shè)計(jì),或者是一種解決方案。

例如在某龐大商場(chǎng)系統(tǒng)中,你可以把整個(gè)商場(chǎng)拆分為 N 個(gè)微服務(wù)(理解為 N 個(gè)獨(dú)立的小模塊也行),例如:

  • 訂單系統(tǒng)
  • 用戶管理系統(tǒng)
  • 商品管理系統(tǒng)
  • 等等

那么在這樣的架構(gòu)中,就會(huì)存在一個(gè) API 網(wǎng)關(guān) 的概念,或者是叫 服務(wù)集成者。我的 API 網(wǎng)關(guān) 的職責(zé),就是把一個(gè)請(qǐng)求,拆分成 N 個(gè)小請(qǐng)求,分發(fā)到各個(gè)小服務(wù)里面,再整合各個(gè)小服務(wù)的結(jié)果,返回給用戶。例如在某次下單請(qǐng)求中,那么大概發(fā)送的邏輯如下:

  • API 網(wǎng)關(guān)接受請(qǐng)求
  • API 網(wǎng)關(guān)提取用戶參數(shù),請(qǐng)求用戶管理系統(tǒng),獲取用戶余額等信息,等待結(jié)果
  • API 網(wǎng)關(guān)提取商品參數(shù),請(qǐng)求商品管理系統(tǒng),獲取商品剩余庫(kù)存和價(jià)格等信息,等待結(jié)果
  • API 網(wǎng)關(guān)融合用戶管理系統(tǒng)、商品管理系統(tǒng)的返回結(jié)果,進(jìn)行下一步調(diào)用(假設(shè)滿足購(gòu)買條件)
  • API 網(wǎng)關(guān)調(diào)用用戶管理信息系統(tǒng)進(jìn)行扣款,調(diào)用商品管理系統(tǒng)進(jìn)行庫(kù)存扣減,調(diào)用訂單系統(tǒng)進(jìn)行下單(事務(wù)邏輯和撤回可以用 請(qǐng)求 id 保證,或者自己實(shí)現(xiàn)其他邏輯調(diào)度)
  • API 網(wǎng)關(guān)返回綜合信息給用戶

而在以上發(fā)生的行為,就稱為 遠(yuǎn)程過(guò)程調(diào)用。而調(diào)用過(guò)程實(shí)現(xiàn)的通訊協(xié)議可以有很多,比如常見的 HTTP 協(xié)議。而 EasySwoole RPC 采用自定義短鏈接的 TCP 協(xié)議實(shí)現(xiàn),每個(gè)請(qǐng)求包,都是一個(gè) JSON,從而方便實(shí)現(xiàn)跨平臺(tái)調(diào)用。

全新特性

  • 協(xié)程調(diào)度
  • 服務(wù)自動(dòng)發(fā)現(xiàn)
  • 服務(wù)熔斷
  • 服務(wù)降級(jí)
  • Openssl 加密
  • 跨平臺(tái)、跨語(yǔ)言支持
  • 支持接入第三方注冊(cè)中心

安裝

composer require easyswoole/rpc=4.x

執(zhí)行流程

服務(wù)端:
注冊(cè)RPC服務(wù),創(chuàng)建相應(yīng)的服務(wù)swoole table表(ps:記錄調(diào)用成功和失敗的次數(shù)) 注冊(cè)worker,tick進(jìn)程

woker進(jìn)程監(jiān)聽:
客戶端發(fā)送請(qǐng)求->解包成相對(duì)應(yīng)的格式->執(zhí)行對(duì)應(yīng)的服務(wù)->返回結(jié)果->客戶端

tick進(jìn)程:
注冊(cè)定時(shí)器發(fā)送心跳包到本節(jié)點(diǎn)管理器
啟用廣播:每隔幾秒發(fā)送本節(jié)點(diǎn)各個(gè)服務(wù)信息到其他節(jié)點(diǎn)
啟用監(jiān)聽:監(jiān)聽其他節(jié)點(diǎn)發(fā)送的信息,發(fā)送相對(duì)應(yīng)的命令(心跳|下線)到節(jié)點(diǎn)管理器處理
進(jìn)程關(guān)閉:主動(dòng)刪除本節(jié)點(diǎn)的信息,發(fā)送下線廣播到其他節(jié)點(diǎn)

Rpc-Server

場(chǎng)景

例如在一個(gè)商場(chǎng)系統(tǒng)中,我們將商品庫(kù)和系統(tǒng)公告兩個(gè)服務(wù)切分開到不同的服務(wù)器當(dāng)中。當(dāng)用戶打開商場(chǎng)首頁(yè)的時(shí)候, 我們希望App向某個(gè)網(wǎng)關(guān)發(fā)起請(qǐng)求,該網(wǎng)關(guān)可以自動(dòng)的幫我們請(qǐng)求商品列表和系統(tǒng)公共等數(shù)據(jù),合并返回。

服務(wù)定義

每一個(gè)Rpc服務(wù)其實(shí)就一個(gè)EasySwoole\Rpc\AbstractService類。 如下:

定義商品服務(wù)

namespace App\RpcService;

use EasySwoole\Rpc\AbstractService;

class Goods extends AbstractService
{

    /**
     *  重寫onRequest(比如可以對(duì)方法做ip攔截或其它前置操作)
     *
     * @param string $action
     * @return bool
     * CreateTime: 2020/6/20 下午11:12
     */
    protected function onRequest(?string $action): ?bool
    {
        return true;
    }

    public function serviceName(): string
    {
        return 'goods';
    }

    public function list()
    {
        $this->response()->setResult([
            [
                'goodsId'=>'100001',
                'goodsName'=>'商品1',
                'prices'=>1124
            ],
            [
                'goodsId'=>'100002',
                'goodsName'=>'商品2',
                'prices'=>599
            ]
        ]);
        $this->response()->setMsg('get goods list success');
    }
}

定義公共服務(wù)

namespace App\RpcService;

use EasySwoole\Rpc\AbstractService;

class Common extends AbstractService
{
    public function serviceName(): string
    {
        return 'common';
    }

    public function mailBox()
    {
        $this->response()->setResult([
            [
                'mailId'=>'100001',
                'mailTitle'=>'系統(tǒng)消息1',
            ],
            [
                'mailId'=>'100001',
                'mailTitle'=>'系統(tǒng)消息1',
            ],
        ]);
        $this->response()->setMsg('get mail list success');
    }

    public function serverTime()
    {
        $this->response()->setResult(time());
        $this->response()->setMsg('get server time success');
    }
}

服務(wù)注冊(cè)

Easyswoole全局的Event文件中,進(jìn)行服務(wù)注冊(cè)。至于節(jié)點(diǎn)管理、服務(wù)類定義等具體用法請(qǐng)看對(duì)應(yīng)章節(jié)。

namespace EasySwoole\EasySwoole;

use App\RpcService\Common;
use App\RpcService\Goods;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Redis\Config\RedisConfig;
use EasySwoole\RedisPool\RedisPool;
use EasySwoole\Rpc\NodeManager\RedisManager;
use EasySwoole\Rpc\Config as RpcConfig;
use EasySwoole\Rpc\Rpc;

class EasySwooleEvent implements Event
{

    public static function initialize()
    {
        // TODO: Implement initialize() method.
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        /*
         * 定義節(jié)點(diǎn)Redis管理器
         */
        $redisPool = new RedisPool(new RedisConfig([
            'host'=>'127.0.0.1'
        ]));
        $manager = new RedisManager($redisPool);
        //配置Rpc實(shí)例
        $config = new RpcConfig();
        //這邊用于指定當(dāng)前服務(wù)節(jié)點(diǎn)ip,如果不指定,則默認(rèn)用UDP廣播得到的地址
        $config->setServerIp('127.0.0.1');
        $config->setNodeManager($manager);
        /*
         * 配置初始化
         */
        Rpc::getInstance($config);
        //添加服務(wù)
        Rpc::getInstance()->add(new Goods());
        Rpc::getInstance()->add(new Common());
        Rpc::getInstance()->attachToServer(ServerManager::getInstance()->getSwooleServer());
    }

    public static function onRequest(Request $request, Response $response): bool
    {
        // TODO: Implement onRequest() method.
        return true;
    }

    public static function afterRequest(Request $request, Response $response): void
    {
        // TODO: Implement afterAction() method.
    }
}

為了方便測(cè)試,我把兩個(gè)服務(wù)放在同一臺(tái)機(jī)器中注冊(cè)。實(shí)際生產(chǎn)場(chǎng)景應(yīng)該是N臺(tái)機(jī)注冊(cè)商品服務(wù),N臺(tái)機(jī)器注冊(cè)公告服務(wù),把服務(wù)分開。

Rpc-Client

控制器聚合調(diào)用

namespace App\HttpController;

use EasySwoole\Http\AbstractInterface\Controller;
use EasySwoole\Rpc\Response;
use EasySwoole\Rpc\Rpc;

class Index extends Controller
{

    function index()
    {
        $ret = [];
        $client = Rpc::getInstance()->client();
        /*
         * 調(diào)用商品列表
         */
        $client->addCall('goods','list',['page'=>1])
            ->setOnSuccess(function (Response $response)use(&$ret){
                $ret['goods'] = $response->toArray();
            })->setOnFail(function (Response $response)use(&$ret){
                $ret['goods'] = $response->toArray();
            });
        /*
         * 調(diào)用信箱公共
         */
        $client->addCall('common','mailBox')
            ->setOnSuccess(function (Response $response)use(&$ret){
                $ret['mailBox'] = $response->toArray();
            })->setOnFail(function (Response $response)use(&$ret){
                $ret['mailBox'] = $response->toArray();
            });
        /*
        * 獲取系統(tǒng)時(shí)間
        */
        $client->addCall('common','serverTime')
            ->setOnSuccess(function (Response $response)use(&$ret){
                $ret['serverTime'] = $response->toArray();
            });

        $client->exec(2.0);

        $this->writeJson(200,$ret);
    }
}

注意,控制器中可以這樣調(diào)用,是因?yàn)榉?wù)端章節(jié)中,在EasySwoole的全局啟動(dòng)事件已經(jīng)對(duì)當(dāng)前的Rpc實(shí)例定義注冊(cè)了節(jié)點(diǎn)管理器。因此在控制器中調(diào)用的時(shí)候 該Rpc實(shí)例可以找到對(duì)應(yīng)的節(jié)點(diǎn)。一般來(lái)說(shuō),在做聚合網(wǎng)關(guān)的節(jié)點(diǎn),是不需要注冊(cè)服務(wù)進(jìn)去的,僅需注冊(cè)節(jié)點(diǎn)管理器即可。

客戶端

當(dāng)rpc服務(wù)和客戶端不在同一服務(wù)中時(shí),并且服務(wù)端客戶端使用的都是es

<?php
require_once 'vendor/autoload.php';

use EasySwoole\Rpc\Config;
use EasySwoole\Rpc\Rpc;
use EasySwoole\Rpc\NodeManager\RedisManager;
use EasySwoole\Rpc\Response;
$redisConfig = new \EasySwoole\Redis\Config\RedisConfig();
$redisConfig->setHost('127.0.0.1'); // 服務(wù)端使用的redis節(jié)點(diǎn)地址
$redisConfig->setPort('6379'); // 服務(wù)端使用的redis節(jié)點(diǎn)端口
$pool=new \EasySwoole\RedisPool\RedisPool($redisConfig);
$config = new Config();
$config->setServerIp('127.0.0.1'); // 指定rpc服務(wù)地址
$config->setListenPort(9502); // 指定rpc服務(wù)端口
$config->setNodeManager(new RedisManager($pool));
$rpc = new Rpc($config);

\Swoole\Coroutine::create(function () use ($rpc) {
    $client = $rpc->client();
    $client->addCall('UserService', 'register', ['arg1', 'arg2'])
        ->setOnFail(function (Response $response) {
            print_r($response->toArray());
        })
        ->setOnSuccess(function (Response $response) {
            print_r($response->toArray());
        });

    $client->exec();
});
swoole_timer_clear_all();

跨平臺(tái)

Rpc的請(qǐng)求響應(yīng)通過(guò)tcp協(xié)議,服務(wù)廣播使用udp協(xié)議,我們只需要實(shí)現(xiàn)網(wǎng)絡(luò)協(xié)議即可。

PHP示例代碼

<?php
/**
 * Created by PhpStorm.
 * User: xcg
 * Date: 2019/6/17
 * Time: 14:30
 */
$data = [
    'command' => 1,//1:請(qǐng)求,2:狀態(tài)rpc 各個(gè)服務(wù)的狀態(tài)
    'request' => [
        'serviceName' => 'UserService',
        'action' => 'register',//行為名稱
        'arg' => [
            'args1' => 'args1',
            'args2' => 'args2'
        ]
    ]
];

//$raw = serialize($data);//注意序列化類型,需要和RPC服務(wù)端約定好協(xié)議 $serializeType

$raw = json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);

$fp = stream_socket_client('tcp://127.0.0.1:9600');
fwrite($fp, pack('N', strlen($raw)) . $raw);//pack數(shù)據(jù)校驗(yàn)

$data = fread($fp, 65533);
//做長(zhǎng)度頭部校驗(yàn)
$len = unpack('N', $data);
$data = substr($data, '4');
if (strlen($data) != $len[1]) {
    echo 'data error';
} else {
    $data = json_decode($data, true);
//    //這就是服務(wù)端返回的結(jié)果,
    var_dump($data);//默認(rèn)將返回一個(gè)response對(duì)象 通過(guò)$serializeType修改
}
fclose($fp);

Go示例代碼

package main

import (
    "encoding/binary"
    "net"
)

func main() {
    var tcpAddr *net.TCPAddr
    tcpAddr,_ = net.ResolveTCPAddr("tcp","127.0.0.1:9600")
    conn,_ := net.DialTCP("tcp",nil,tcpAddr)
    defer conn.Close()
    sendEasyswooleMsg(conn)
}

func sendEasyswooleMsg(conn *net.TCPConn) {
    var sendData []byte
    data := `{"command":1,"request":{"serviceName":"UserService","action":"register","arg":{"args1":"args1","args2":"args2"}}}`
    b := []byte(data)
    // 大端字節(jié)序(網(wǎng)絡(luò)字節(jié)序)大端就是將高位字節(jié)放到內(nèi)存的低地址端,低位字節(jié)放到高地址端。
    // 網(wǎng)絡(luò)傳輸中(比如TCP/IP)低地址端(高位字節(jié))放在流的開始,對(duì)于2個(gè)字節(jié)的字符串(AB),傳輸順序?yàn)椋篈(0-7bit)、B(8-15bit)。
    sendData = int32ToBytes8(int32(len(data)))
    // 將數(shù)據(jù)byte拼裝到sendData的后面
    for _, value := range b {
        sendData = append(sendData, value)
    }
    conn.Write(sendData)
}

func int32ToBytes8(n int32) []byte {
    var buf = make([]byte, 4)
    binary.BigEndian.PutUint32(buf, uint32(n))
    return buf
}

Java

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class Main {
    public static void main(String[] args) throws IOException {
        byte[] msg = "{\"command\":1,\"request\":{\"serviceName\":\"UserService\",\"action\":\"register\",\"arg\":{\"args1\":\"args1\",\"args2\":\"args2\"}}}".getBytes();
        byte[] head = Main.toLH(msg.length);
        byte[] data = Main.mergeByteArr(head, msg);

        //創(chuàng)建Socket對(duì)象,連接服務(wù)器
        Socket socket=new Socket("127.0.0.1",9600);
        //通過(guò)客戶端的套接字對(duì)象Socket方法,獲取字節(jié)輸出流,將數(shù)據(jù)寫向服務(wù)器
        OutputStream out=socket.getOutputStream();
        out.write(data);

        //讀取服務(wù)器發(fā)回的數(shù)據(jù),使用socket套接字對(duì)象中的字節(jié)輸入流
        InputStream in=socket.getInputStream();
        byte[] response=new byte[1024];
        int len=in.read(response);
        System.out.println(new String(response,4, len-4));
        socket.close();
    }

    static byte[] toLH(int n) {
        byte[] b = new byte[4];
        b[3] = (byte) (n & 0xff);
        b[2] = (byte) (n >> 8 & 0xff);
        b[1] = (byte) (n >> 16 & 0xff);
        b[0] = (byte) (n >> 24 & 0xff);
        return b;
    }

    static byte[] mergeByteArr(byte[] a, byte[] b) {
        byte[] c= new byte[a.length + b.length];
        System.arraycopy(a, 0, c, 0, a.length);
        System.arraycopy(b, 0, c, a.length, b.length);
        return c;
    }
}

其他語(yǔ)言只需要實(shí)現(xiàn)tcp協(xié)議即可

EasySwoole RPC 自定義注冊(cè)中心

EasySwoole默認(rèn)為通過(guò)UDP廣播+自定義進(jìn)程定時(shí)刷新自身節(jié)點(diǎn)信息的方式來(lái)實(shí)現(xiàn)無(wú)主化/注冊(cè)中心的服務(wù)發(fā)現(xiàn)。在服務(wù)正常關(guān)閉的時(shí)候,自定義定時(shí)進(jìn)程的onShutdown方法會(huì)執(zhí)行deleteServiceNode方法來(lái)實(shí)現(xiàn)節(jié)點(diǎn)下線。在非正常關(guān)閉的時(shí)候,心跳超時(shí)也會(huì)被節(jié)點(diǎn)管理器踢出。

有些情況,不方便用UDP廣播的情況下,那么EasySwoole支持你自定義一個(gè)節(jié)點(diǎn)管理器,來(lái)變更服務(wù)發(fā)現(xiàn)方式。

例如用Redis來(lái)實(shí)現(xiàn)

namespace EasySwoole\Rpc\NodeManager;

use EasySwoole\RedisPool\RedisPool;
use EasySwoole\Rpc\ServiceNode;
use EasySwoole\Utility\Random;

class RedisManager implements NodeManagerInterface
{
    protected $redisKey;

    protected $pool;

    function __construct(RedisPool $pool, string $hashKey = 'rpc')
    {
        $this->redisKey = $hashKey;
        $this->pool = $pool;
    }

    function getServiceNodes(string $serviceName, ?string $version = null): array
    {
        $redis = $this->pool->getObj(15);
        try {
            $nodes = $redis->hGetAll("{$this->redisKey}_{$serviceName}");
            $nodes = $nodes ?: [];
            $ret = [];
            foreach ($nodes as $nodeId => $node) {
                $node = new ServiceNode(json_decode($node,true));
                /**
                 * @var  $nodeId
                 * @var  ServiceNode $node
                 */
                if (time() - $node->getLastHeartBeat() > 30) {
                    $this->deleteServiceNode($node);
                }
                if ($version && $version != $node->getServiceVersion()) {
                    continue;
                }
                $ret[$nodeId] = $node;
            }
            return $ret;
        } catch (\Throwable $throwable) {
            //如果該redis斷線則銷毀
            $this->pool->unsetObj($redis);
        } finally {
            $this->pool->recycleObj($redis);
        }
        return [];
    }

    function getServiceNode(string $serviceName, ?string $version = null): ?ServiceNode
    {
        $list = $this->getServiceNodes($serviceName, $version);
        if (empty($list)) {
            return null;
        }
        return Random::arrayRandOne($list);
    }

    function deleteServiceNode(ServiceNode $serviceNode): bool
    {
        $redis = $this->pool->getObj(15);
        try {
            $redis->hDel($this->generateNodeKey($serviceNode), $serviceNode->getNodeId());
            return true;
        } catch (\Throwable $throwable) {
            $this->pool->unsetObj($redis);
        } finally {
            $this->pool->recycleObj($redis);
        }
        return false;
    }

    function serviceNodeHeartBeat(ServiceNode $serviceNode): bool
    {
        if (empty($serviceNode->getLastHeartBeat())) {
            $serviceNode->setLastHeartBeat(time());
        }
        $redis = $this->pool->getObj(15);
        try {
            $redis->hSet($this->generateNodeKey($serviceNode), $serviceNode->getNodeId(), $serviceNode->__toString());
            return true;
        } catch (\Throwable $throwable) {
            $this->pool->unsetObj($redis);
        } finally {
            //這邊需要測(cè)試一個(gè)對(duì)象被unset后是否還能被回收
            $this->pool->recycleObj($redis);
        }
        return false;
    }

    protected function generateNodeKey(ServiceNode $node)
    {
        return "{$this->redisKey}_{$node->getServiceName()}";
    }
}

即使關(guān)閉了UDP定時(shí)廣,EasySwoole Rpctick進(jìn)程依舊會(huì)每3秒執(zhí)行一次serviceNodeHeartBeat用于更新自身的節(jié)點(diǎn)心跳信息。

主站蜘蛛池模板: 久久久久中文字幕 | 久草精品视频 | 国产精品视频不卡 | 精品日韩一区二区三区 | 久久三区| 成人免费在线观看视频 | 中文字幕在线影院 | 男女午夜网站 | 四虎在线视频 | 午夜精品久久久 | 免费国产一区 | 国产亚州av | 日日摸日日碰夜夜爽不卡dvd | 青草视频在线免费观看 | 亚洲人久久 | 日韩精品免费在线观看 | 欧美日本亚洲 | 中文字幕精品一区二区三区精品 | 99精品欧美一区二区三区综合在线 | 久久99国产精品久久99大师 | 亚洲一区二区视频在线观看 | 精品一区二区av | 蜜桃官网 | 操操操av| 夏同学福利网 | 超碰97人人人人人蜜桃 | 久久久亚洲精品中文字幕 | 欧美一级艳片视频免费观看 | 成人a毛片 | 亚洲一区免费观看 | 欧美精品欧美精品系列 | 天堂资源在线 | 男女免费视频 | 日韩一二三区 | 成人av在线播放 | 欧美日韩亚洲视频 | h网站在线观看 | 日韩成人在线网站 | 97精品国产 | 久草精品在线 | 亚洲欧美在线免费 |