`
May 18, 2018 本文阅读量

使用golang 实现JSON-RPC2.0

远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。


什么是RPC?

远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。

远程过程调用是一个分布式计算的客户端-服务器(Client/Server)的例子,它简单而又广受欢迎。远程过程调用总是由客户端对服务器发出一个执行若干过程请求,并用客户端提供的参数。执行结果将返回给客户端。由于存在各式各样的变体和细节差异,对应地派生了各式远程过程调用协议,而且它们并不互相兼容。——————源自维基百科


什么又是JSON-RPC?

JSON-RPC,是一个无状态且轻量级的远程过程调用(RPC)传送协议,其传递内容通过 JSON 为主。相较于一般的 REST 通过网址(如 GET /user)调用远程服务器,JSON-RPC 直接在内容中定义了欲调用的函数名称(如 {“method”: “getUser”}),这也令开发者不会陷于该使用 PUT 或者 PATCH 的问题之中。 更多JSON-RPC约定参见:https://zh.wikipedia.org/wiki/JSON-RPC

问题

服务端注册及调用

约定如**net/rpc**:

  • the method’s type is exported.
  • the method is exported.
  • the method has two arguments, both exported (or builtin) types.
  • the method’s second argument is a pointer.
  • the method has return type error.
// 这就是约定
func (t *T) MethodName(argType T1, replyType *T2) error

那么问题来了:

	问题1: Server怎么来注册`t.Methods`?
	问题2: JSON-RPC请求参数里面的Params给到args?

server端类型定义:

type methodType struct {
	method     reflect.Method // 用于调用
	ArgType    reflect.Type
	ReplyType  reflect.Type
}

type service struct {
	name   string                 // 服务的名字, 一般为`T`
	rcvr   reflect.Value          // 方法的接受者, 即约定中的 `t`
	typ    reflect.Type           // 注册的类型, 即约定中的`T`
	method map[string]*methodType // 注册的方法, 即约定中的`MethodName`的集合
}

// Server represents an RPC Server.
type Server struct {
	serviceMap sync.Map   // map[string]*service
}

解决问题1,参考了net/rpc中的注册调用。主要使用reflect这个包。代码如下:

// 解析传入的类型及相应的可导出方法,将rcvr的type,Methods的相关信息存放到Server.m中。
// 如果type是不可导出的,则会报错
func (s *Server) Register(rcvr interface{}) error {
	_service := new(service)
	_service.typ = reflect.TypeOf(rcvr)
	_service.rcvr = reflect.ValueOf(rcvr)
	sname := reflect.Indirect(_service.rcvr).Type().Name()

	if sname == "" {
		err_s := "rpc.Register: no service name for type " + _service.typ.String()
		log.Print(err_s)
		return errors.New(err_s)
	}

	if !isExported(sname) {
		err_s := "rpc.Register: type " + sname + " is not exported"
		log.Print(err_s)
		return errors.New(err_s)
	}
	_service.name = sname
	_service.method = suitableMethods(_service.typ, true)

	// sync.Map.LoadOrStore
	if _, dup := s.m.LoadOrStore(sname, _service); dup {
		return errors.New("rpc: service already defined: " + sname)
	}
	return nil
}

// 关于suitableMethods,也是使用reflect,
// 来获取_service.typ中的所有可导出方法及方法的相关参数,保存成*methodType

suitableMethods代码由此去:[https: //github.com/yeqown/rpc/blob/master/server.go#L105](https: //github.com/yeqown/rpc/blob/master/server.go#L105)

解决问题2,要解决问题2,且先看如何调用Method,代码如下:

// 约定:    func (t *T) MethodName(argType T1, replyType *T2) error
// s.rcvr: 即约定中的 t
// argv:   即约定中的 argType
// replyv: 即约定中的 replyType
func (s *service) call(mtype *methodType, req *Request, argv, replyv reflect.Value) *Response {
	function := mtype.method.Func
	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
	errIter := returnValues[0].Interface()

	errmsg := ""
	if errIter != nil {
		errmsg = errIter.(error).Error()
		return NewResponse(req.ID, nil, NewJsonrpcErr(InternalErr, errmsg, nil))
	}

	return NewResponse(req.ID, replyv.Interface(), nil)
}

看了如何调用,再加上JSON-RPC的约定,知道了传给服务端的是一个JSON,而且其中的Params是一个json格式的数据。那就变成了:interface{} - req.Params 到reflect.Value - argv。那么怎么转换呢?看代码:

func (s *Server) call(req *Request) *Response {
	// ....
	// 根据req.Method来查询method
	// req.Method 格式如:"ServiceName.MethodName"
	// mtype *methodType
	mtype := svc.method[methodName]

	// 根据注册时候的mtype.ArgType来生成一个reflect.Value
	argIsValue := false // if true, need to indirect before calling.
	var argv reflect.Value
	if mtype.ArgType.Kind() == reflect.Ptr {
		argv = reflect.New(mtype.ArgType.Elem())
	} else {
		argv = reflect.New(mtype.ArgType)
		argIsValue = true
	}

	if argIsValue {
		argv = argv.Elem()
	}

	// 为argv参数生成了一个reflect.Value,但是argv目前为止都还是是0值。
	// 那么怎么把req.Params 复制给argv ?
	// 我尝试过,argv = reflect.Value(req.Params),但是在调用的时候 会提示说:“map[string]interface{} as main.*Args”,
	// 这也就是说,并没有将参数的值正确的赋值给argv。
	// 后面才又了这个convert函数:
	// bs, _ := json.Marshal(req.Params)
	// json.Unmarshal(bs, argv.Interface())
	// 因此有一些限制~,就不多说了 
	convert(req.Params, argv.Interface())

	// Note: 约定中ReplyType是一个指针类型,方便赋值。
	// 根据注册时候的mtype.ReplyType来生成一个reflect.Value
	replyv := reflect.New(mtype.ReplyType.Elem())
	switch mtype.ReplyType.Elem().Kind() {
	case reflect.Map:
		replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
	case reflect.Slice:
		replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
	}

	return svc.call(mtype, req, argv, replyv)
}

支持HTTP调用

已经完成了上述的部分,再来谈支持HTTP就非常简单了。实现http.Handler接口就行啦~。如下:

// 支持之POST & json
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	var resp *Response

	w.Header().Set("Content-Type", "application/json")

	if r.Method != http.MethodPost {
		resp = NewResponse("", nil, NewJsonrpcErr(
			http.StatusMethodNotAllowed, "HTTP request method must be POST", nil),
		)
		response(w, resp)
		return
	}
	// 解析请求参数到[]*rpc.Request
	reqs, err := getRequestFromBody(r)
	if err != nil {
		resp = NewResponse("", nil, NewJsonrpcErr(InternalErr, err.Error(), nil))
		response(w, resp)
		return
	}

	// 处理请求,包括批量请求
	resps := s.handleWithRequests(reqs)

	if len(resps) > 1 {
		response(w, resps)
	} else {
		response(w, resps[0])
	}
	return
}

使用示例

服务端使用

// test_server.go
package main

import (
	// "fmt"
	"net/http"
	"github.com/yeqown/rpc"
)

type Int int

type Args struct {
	A int `json:"a"`
	B int `json:"b"`
}

func (i *Int) Sum(args *Args, reply *int) error {
	*reply = args.A + args.B
	return nil
}

type MultyArgs struct {
	A *Args `json:"aa"`
	B *Args `json:"bb"`
}

type MultyReply struct {
	A int `json:"aa"`
	B int `json:"bb"`
}

func (i *Int) Multy(args *MultyArgs, reply *MultyReply) error {
	reply.A = (args.A.A * args.A.B)
	reply.B = (args.B.A * args.B.B)
	return nil
}

func main() {
	s := rpc.NewServer()
	mine_int := new(Int)
	s.Register(mine_int)
	go s.HandleTCP("127.0.0.1:9999")

	// 开启http
	http.ListenAndServe(":9998", s)
}

客户端使用

// test_client.go
package main

import (
	"github.com/yeqown/rpc"
)

type Args struct {
	A int `json:"a"`
	B int `json:"b"`
}

type MultyArgs struct {
	A *Args `json:"aa"`
	B *Args `json:"bb"`
}

type MultyReply struct {
	A int `json:"aa"`
	B int `json:"bb"`
}

func main() {
	c := rpc.NewClient()
	c.DialTCP("127.0.0.1:9999")

	var sum int
	c.Call("1", "Int.Sum", &Args{A: 1, B: 2}, &sum)
	println(sum)

	c.DialTCP("127.0.0.1:9999")
	var reply MultyReply
	c.Call("2", "Int.Multy", &MultyArgs{A: &Args{1, 2}, B: &Args{3, 4}}, &reply)
	println(reply.A, reply.B)
}

运行截图

server client http-support


实现

上面只挑了我觉得比较重要的部分,讲了实现,更多如客户端的支持,JSON-RPC的请求响应定义,可以在项目中里查阅。目前基于TCP和HTTP实现了JSON-RPC,项目地址:github.com/yeqown/rpc

缺陷

只支持JSON-RPC, 且还没有完全实现JSON-RPC的约定。譬如批量调用中:

若批量调用的 RPC 操作本身非一个有效 JSON 或一个至少包含一个值的数组,则服务端返回的将单单是一个响应对象而非数组。若批量调用没有需要返回的响应对象,则服务端不需要返回任何结果且必须不能返回一个空数组给客户端。

阅读参考中的两个RPC,发现两者都是使用的codec的方式来提供扩展。因此以后可以考虑使用这种方式来扩展。


参考