RPC简介
RPC(remote process call),即远程过程调用,意思就是两台物理位置不同的服务器,其中一台服务器的应用想调用另一台服务器上某个应用的函数或者方法,由于不在同一个内存空间不能直接调用,因此需要通过网络来表达语义以及传入的参数,RPC是跨操作系统,跨编程语言的网络通信方式。
RPC启动
我们可以通过执行以下命令来启动RPC:
代码语言:javascript复制geth --networkid 666 --datadir /home/ubuntu/Private_eth/eth1 --identity "node1" --rpc --rpcport "8545" --rpcaddr "192.168.174.212" --nodiscover --rpcapi "eth,net,web3,txpool,debug,miner"
之后我们可以通过以下脚本进行RPC测试:
代码语言:javascript复制#!/usr/bin/env python3
import requests
URL = "http://192.168.174.212:8545/"
data = {
"jsonrpc": "2.0",
"method": "eth_getBalance",
"params":["0x578efd53cf8342f4f5acfb6ee0ce9c7b3cfe2252", "latest"],
"id":0
}
response = requests.post(url=URL,json=data)
print(response.json())
源码分析
以太坊有四种RPC:HTTP RPC、Inproc RPC、IPC RPC、WS RPC,它们主要的实现逻辑都在rpc/server.go和rpc/client.go,各自根据自己的实现方式派生自己的client实例,建立各自的net.conn通道,由于HTTP RPC是基于短链接请求,实现方式和其他的不太一样,这里仅对RPC服务的启动以及HTTP RPC请求、HTTP RPC和非HTTP请求类的请求和响应做一个简单的介绍分析~
服务启动
RPC服务的启动与否是我们在通过geth来启动链节点时有参数--rpc来决定的,在geth函数中会调用startNode来启动一个node:
代码语言:javascript复制// filedir:go-ethereum-1.10.2cmdgethmain.go L308
// geth is the main entry point into the system if no special subcommand is ran.
// It creates a default node based on the command line arguments and runs it in
// blocking mode, waiting for it to be shut down.
func geth(ctx *cli.Context) error {
if args := ctx.Args(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
prepare(ctx)
stack, backend := makeFullNode(ctx)
defer stack.Close()
startNode(ctx, stack, backend)
stack.Wait()
return nil
}
startNode进而转去调用utils的StartNode函数,此处的utils为github.com/ethereum/go-ethereum/cmd/utils
代码语言:javascript复制// filedir:go-ethereum-1.10.2cmdgethmain.go L325
// startNode boots up the system node and all registered protocols, after which
// it unlocks any requested accounts, and starts the RPC/IPC interfaces and the
// miner.
func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend) {
debug.Memsize.Add("node", stack)
// Start up the node itself
utils.StartNode(ctx, stack)
......
之后再go-ethereum-1.10.2cmdutilscmd.go的startNode函数中转而调用Node的start函数来启动服务,之后开启监听:
代码语言:javascript复制func StartNode(ctx *cli.Context, stack *node.Node) {
if err := stack.Start(); err != nil {
Fatalf("Error starting protocol stack: %v", err)
}
go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigc)
minFreeDiskSpace := ethconfig.Defaults.TrieDirtyCache
if ctx.GlobalIsSet(MinFreeDiskSpaceFlag.Name) {
minFreeDiskSpace = ctx.GlobalInt(MinFreeDiskSpaceFlag.Name)
} else if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) {
minFreeDiskSpace = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100
}
if minFreeDiskSpace > 0 {
go monitorFreeDiskSpace(sigc, stack.InstanceDir(), uint64(minFreeDiskSpace)*1024*1024)
}
<-sigc
log.Info("Got interrupt, shutting down...")
go stack.Close()
for i := 10; i > 0; i-- {
<-sigc
if i > 1 {
log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
}
}
debug.Exit() // ensure trace and CPU profile data is flushed.
debug.LoudPanic("boom")
}()
}
Node.start函数进而去调用openEndpoints去开启RPC端点:
代码语言:javascript复制// filedir: go-ethereum-1.10.2nodenode.go
// Start starts all registered lifecycles, RPC services and p2p networking.
// Node can only be started once.
func (n *Node) Start() error {
n.startStopLock.Lock()
defer n.startStopLock.Unlock()
n.lock.Lock()
switch n.state {
case runningState:
n.lock.Unlock()
return ErrNodeRunning
case closedState:
n.lock.Unlock()
return ErrNodeStopped
}
n.state = runningState
// open networking and RPC endpoints
err := n.openEndpoints()
lifecycles := make([]Lifecycle, len(n.lifecycles))
copy(lifecycles, n.lifecycles)
n.lock.Unlock()
// Check if endpoint startup failed.
if err != nil {
n.doClose(nil)
return err
}
// Start all registered lifecycles.
var started []Lifecycle
for _, lifecycle := range lifecycles {
if err = lifecycle.Start(); err != nil {
break
}
started = append(started, lifecycle)
}
// Check if any lifecycle failed to start.
if err != nil {
n.stopServices(started)
n.doClose(nil)
}
return err
}
之后的openEndpoints调用startRPC来启动RPC服务:
代码语言:javascript复制// filedir:go-ethereum-1.10.2nodenode.go L260
// openEndpoints starts all network and RPC endpoints.
func (n *Node) openEndpoints() error {
// start networking endpoints
n.log.Info("Starting peer-to-peer node", "instance", n.server.Name)
if err := n.server.Start(); err != nil {
return convertFileLockError(err)
}
// start RPC endpoints
err := n.startRPC()
if err != nil {
n.stopRPC()
n.server.Stop()
}
return err
}
startRPC具体实现如下:
代码语言:javascript复制// filedir: go-ethereum-1.10.2nodenode.go
// configureRPC is a helper method to configure all the various RPC endpoints during node
// startup. It's not meant to be called at any time afterwards as it makes certain
// assumptions about the state of the node.
func (n *Node) startRPC() error {
if err := n.startInProc(); err != nil {
return err
}
// Configure IPC.
if n.ipc.endpoint != "" {
if err := n.ipc.start(n.rpcAPIs); err != nil {
return err
}
}
// Configure HTTP.
if n.config.HTTPHost != "" {
config := httpConfig{
CorsAllowedOrigins: n.config.HTTPCors,
Vhosts: n.config.HTTPVirtualHosts,
Modules: n.config.HTTPModules,
prefix: n.config.HTTPPathPrefix,
}
if err := n.http.setListenAddr(n.config.HTTPHost, n.config.HTTPPort); err != nil {
return err
}
if err := n.http.enableRPC(n.rpcAPIs, config); err != nil {
return err
}
}
// Configure WebSocket.
if n.config.WSHost != "" {
server := n.wsServerForPort(n.config.WSPort)
config := wsConfig{
Modules: n.config.WSModules,
Origins: n.config.WSOrigins,
prefix: n.config.WSPathPrefix,
}
if err := server.setListenAddr(n.config.WSHost, n.config.WSPort); err != nil {
return err
}
if err := server.enableWS(n.rpcAPIs, config); err != nil {
return err
}
}
if err := n.http.start(); err != nil {
return err
}
return n.ws.start()
}
在这里会调用startInProc来注册所有的RPC API接口信息:
代码语言:javascript复制// startInProc registers all RPC APIs on the inproc server.
func (n *Node) startInProc() error {
for _, api := range n.rpcAPIs {
if err := n.inprocHandler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
}
return nil
}
调用n.ipc.start(n.rpcAPIs)启动IPC
代码语言:javascript复制// filedir: go-ethereum-1.10.2noderpcstack.go
// Start starts the httpServer's http.Server
func (is *ipcServer) start(apis []rpc.API) error {
is.mu.Lock()
defer is.mu.Unlock()
if is.listener != nil {
return nil // already running
}
listener, srv, err := rpc.StartIPCEndpoint(is.endpoint, apis)
if err != nil {
is.log.Warn("IPC opening failed", "url", is.endpoint, "error", err)
return err
}
is.log.Info("IPC endpoint opened", "url", is.endpoint)
is.listener, is.srv = listener, srv
return nil
}
调用n.http.enableRPC启动RPC服务并注册Handler:
代码语言:javascript复制// filedir:go-ethereum-1.10.2noderpcstack.go L272
// enableRPC turns on JSON-RPC over HTTP on the server.
func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {
h.mu.Lock()
defer h.mu.Unlock()
if h.rpcAllowed() {
return fmt.Errorf("JSON-RPC over HTTP is already enabled")
}
// Create RPC server and handler.
srv := rpc.NewServer()
if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil {
return err
}
h.httpConfig = config
h.httpHandler.Store(&rpcHandler{
Handler: NewHTTPHandlerStack(srv, config.CorsAllowedOrigins, config.Vhosts),
server: srv,
})
return nil
}
Handler注册跟踪如下(下面的ServeHTTP其实已经到了处理请求的逻辑了,这里不再深入,后面再做探究):
代码语言:javascript复制// filedir: go-ethereum-1.10.2noderpcstack.go
// NewHTTPHandlerStack returns wrapped http-related handlers
func NewHTTPHandlerStack(srv http.Handler, cors []string, vhosts []string) http.Handler {
// Wrap the CORS-handler within a host-handler
handler := newCorsHandler(srv, cors)
handler = newVHostHandler(vhosts, handler)
return newGzipHandler(handler)
}
// filedir:go-ethereum-1.10.2noderpcstack.go
func newGzipHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
next.ServeHTTP(w, r)
return
}
w.Header().Set("Content-Encoding", "gzip")
gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)
gz.Reset(w)
defer gz.Close()
next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r)
})
}
// filedir: go-ethereum-1.10.2rpchttp.go
// ServeHTTP serves JSON-RPC requests over HTTP.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Permit dumb empty requests for remote health-checks (AWS)
if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {
w.WriteHeader(http.StatusOK)
return
}
if code, err := validateRequest(r); err != nil {
http.Error(w, err.Error(), code)
return
}
// All checks passed, create a codec that reads directly from the request body
// until EOF, writes the response to w, and orders the server to process a
// single request.
ctx := r.Context()
ctx = context.WithValue(ctx, "remote", r.RemoteAddr)
ctx = context.WithValue(ctx, "scheme", r.Proto)
ctx = context.WithValue(ctx, "local", r.Host)
if ua := r.Header.Get("User-Agent"); ua != "" {
ctx = context.WithValue(ctx, "User-Agent", ua)
}
if origin := r.Header.Get("Origin"); origin != "" {
ctx = context.WithValue(ctx, "Origin", origin)
}
w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
defer codec.close()
s.serveSingleRequest(ctx, codec)
}
调用server.enableWS启动Websocket:
代码语言:javascript复制// filedir:go-ethereum-1.10.2noderpcstack.go
// enableWS turns on JSON-RPC over WebSocket on the server.
func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error {
h.mu.Lock()
defer h.mu.Unlock()
if h.wsAllowed() {
return fmt.Errorf("JSON-RPC over WebSocket is already enabled")
}
// Create RPC server and handler.
srv := rpc.NewServer()
if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil {
return err
}
h.wsConfig = config
h.wsHandler.Store(&rpcHandler{
Handler: srv.WebsocketHandler(config.Origins),
server: srv,
})
return nil
}
Handler注册:
代码语言:javascript复制// filedir: go-ethereum-1.10.2rpcwebsocket.go L45
// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
//
// allowedOrigins should be a comma-separated list of allowed origin URLs.
// To allow connections with any origin, pass "*".
func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler {
var upgrader = websocket.Upgrader{
ReadBufferSize: wsReadBuffer,
WriteBufferSize: wsWriteBuffer,
WriteBufferPool: wsBufferPool,
CheckOrigin: wsHandshakeValidator(allowedOrigins),
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Debug("WebSocket upgrade failed", "err", err)
return
}
codec := newWebsocketCodec(conn)
s.ServeCodec(codec, 0)
})
}
// filedir: go-ethereum-1.10.2rpcwebsocket.go L241
func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
conn.SetReadLimit(wsMessageSizeLimit)
wc := &websocketCodec{
jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
conn: conn,
pingReset: make(chan struct{}, 1),
}
wc.wg.Add(1)
go wc.pingLoop()
return wc
}
// filedir: go-ethereum-1.10.2rpcserver.go
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed.
//
// Note that codec options are no longer supported.
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
defer codec.close()
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
return
}
// Add the codec to the set so it can be closed by Stop.
s.codecs.Add(codec)
defer s.codecs.Remove(codec)
c := initClient(codec, s.idgen, &s.services)
<-codec.closed()
c.Close()
}
最后调用start启动HTTP Server
代码语言:javascript复制// start starts the HTTP server if it is enabled and not already running.
func (h *httpServer) start() error {
h.mu.Lock()
defer h.mu.Unlock()
if h.endpoint == "" || h.listener != nil {
return nil // already running or not configured
}
// Initialize the server.
h.server = &http.Server{Handler: h}
if h.timeouts != (rpc.HTTPTimeouts{}) {
CheckTimeouts(&h.timeouts)
h.server.ReadTimeout = h.timeouts.ReadTimeout
h.server.WriteTimeout = h.timeouts.WriteTimeout
h.server.IdleTimeout = h.timeouts.IdleTimeout
}
// Start the server.
listener, err := net.Listen("tcp", h.endpoint)
if err != nil {
// If the server fails to start, we need to clear out the RPC and WS
// configuration so they can be configured another time.
h.disableRPC()
h.disableWS()
return err
}
h.listener = listener
go h.server.Serve(listener)
if h.wsAllowed() {
url := fmt.Sprintf("ws://%v", listener.Addr())
if h.wsConfig.prefix != "" {
url = h.wsConfig.prefix
}
h.log.Info("WebSocket enabled", "url", url)
}
// if server is websocket only, return after logging
if !h.rpcAllowed() {
return nil
}
// Log http endpoint.
h.log.Info("HTTP server started",
"endpoint", listener.Addr(),
"prefix", h.httpConfig.prefix,
"cors", strings.Join(h.httpConfig.CorsAllowedOrigins, ","),
"vhosts", strings.Join(h.httpConfig.Vhosts, ","),
)
// Log all handlers mounted on server.
var paths []string
for path := range h.handlerNames {
paths = append(paths, path)
}
sort.Strings(paths)
logged := make(map[string]bool, len(paths))
for _, path := range paths {
name := h.handlerNames[path]
if !logged[name] {
log.Info(name " enabled", "url", "http://" listener.Addr().String() path)
logged[name] = true
}
}
return nil
}
至此,启动完成~
请求发起
这里我们以RPC请求接口eth_getBalance为例进行分析,首先我们在全局搜索"eth_getBalance"关键字,确定其引用位置——ethclient.go L354
eth_getBalance的具体接口为——BalanceAt
代码语言:javascript复制// filedir:go-ethereum-1.10.2ethclientethclient.go L351
// BalanceAt returns the wei balance of the given account.
// The block number can be nil, in which case the balance is taken from the latest known block.
func (ec *Client) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
var result hexutil.Big
err := ec.c.CallContext(ctx, &result, "eth_getBalance", account, toBlockNumArg(blockNumber))
return (*big.Int)(&result), err
}
之后跟进这里的关键操作函数ec.c.CallContext逻辑代码如下:
代码语言:javascript复制// filedir: go-ethereum-1.10.2rpcclient.go L286
// CallContext performs a JSON-RPC call with the given arguments. If the context is
// canceled before the call has successfully returned, CallContext returns immediately.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
}
msg, err := c.newMessage(method, args...)
if err != nil {
return err
}
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
}
if err != nil {
return err
}
// dispatch has accepted the request and will close the channel when it quits.
switch resp, err := op.wait(ctx, c); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
return ErrNoResult
default:
return json.Unmarshal(resp.Result, &result)
}
}
上面的newMessage用于处理请求数据:
代码语言:javascript复制// filedir: go-ethereum-1.10.2rpcclient.go
func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method}
if paramsIn != nil { // prevent sending "params":null
var err error
if msg.Params, err = json.Marshal(paramsIn); err != nil {
return nil, err
}
}
return msg, nil
}
这里的&jsonrpcMessage的数据格式和我们python脚本构建的请求相差无几:
代码语言:javascript复制#!/usr/bin/env python3
import requests
URL = "http://192.168.174.212:8545/"
data = {
"jsonrpc": "2.0",
"method": "eth_getBalance",
"params":["0x578efd53cf8342f4f5acfb6ee0ce9c7b3cfe2252", "latest"],
"id":0
}
response = requests.post(url=URL,json=data)
print(response.json())
再这里我们的请求为http请求,所以会进入到sendHTTP方法中:
代码语言:javascript复制// filedir: go-ethereum-1.10.2rpchttp.go
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msg)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
if respBody != nil {
buf := new(bytes.Buffer)
if _, err2 := buf.ReadFrom(respBody); err2 == nil {
return fmt.Errorf("%v: %v", err, buf.String())
}
}
return err
}
var respmsg jsonrpcMessage
if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
return err
}
op.resp <- &respmsg
return nil
}
在这里首先调用doRequest来读取请求并记录请求数据的长度,之后设置请求头信息,然后调用http的do请求,获取到请求的返回值resp
代码语言:javascript复制// filedir: go-ethereum-1.10.2rpchttp.go L175
func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
body, err := json.Marshal(msg)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, "POST", hc.url, ioutil.NopCloser(bytes.NewReader(body)))
if err != nil {
return nil, err
}
req.ContentLength = int64(len(body))
// set headers
hc.mu.Lock()
req.Header = hc.headers.Clone()
hc.mu.Unlock()
// do request
resp, err := hc.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return resp.Body, errors.New(resp.Status)
}
return resp.Body, nil
}
之后传进管道op.resp,向上回溯到CallContext()里面的op.wait(ctx)方法:
代码语言:javascript复制func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
......
// dispatch has accepted the request and will close the channel when it quits.
switch resp, err := op.wait(ctx, c); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
return ErrNoResult
default:
return json.Unmarshal(resp.Result, &result)
}
}
wait的具体实现如下所示:
代码语言:javascript复制func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) {
select {
case <-ctx.Done():
// Send the timeout to dispatch so it can remove the request IDs.
if !c.isHTTP {
select {
case c.reqTimeout <- op:
case <-c.closing:
}
}
return nil, ctx.Err()
case resp := <-op.resp:
return resp, op.err
}
}
之后resp recieve到op.resp管道的数据,然后对resp数据进行json序列化并返回~
请求处理
这里我们直接回溯到之前服务启动部分的HTTP请求处理模块(当时我们只是分析启动没有深入,这里我们深入追踪分析一波),在这里会首先通过调用validateRequest来验证请求的合法性,之后调用newHTTPServerConn建立一个HTTP请求连接,最后调用serveSingleRequest来处理请求:
代码语言:javascript复制// filedir: go-ethereum-1.10.2rpchttp.go L225
// ServeHTTP serves JSON-RPC requests over HTTP.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Permit dumb empty requests for remote health-checks (AWS)
if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {
w.WriteHeader(http.StatusOK)
return
}
if code, err := validateRequest(r); err != nil {
http.Error(w, err.Error(), code)
return
}
// All checks passed, create a codec that reads directly from the request body
// until EOF, writes the response to w, and orders the server to process a
// single request.
ctx := r.Context()
ctx = context.WithValue(ctx, "remote", r.RemoteAddr)
ctx = context.WithValue(ctx, "scheme", r.Proto)
ctx = context.WithValue(ctx, "local", r.Host)
if ua := r.Header.Get("User-Agent"); ua != "" {
ctx = context.WithValue(ctx, "User-Agent", ua)
}
if origin := r.Header.Get("Origin"); origin != "" {
ctx = context.WithValue(ctx, "Origin", origin)
}
w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
defer codec.close()
s.serveSingleRequest(ctx, codec)
}
validateRequest实现代码如下所示,这里会校验请求的方法(不允许PUT、DELETE)、请求数据包大小(缺失可能会带来安全风险)、Content-Type等:
代码语言:javascript复制// validateRequest returns a non-zero response code and error message if the
// request is invalid.
func validateRequest(r *http.Request) (int, error) {
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
return http.StatusMethodNotAllowed, errors.New("method not allowed")
}
if r.ContentLength > maxRequestContentLength {
err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength)
return http.StatusRequestEntityTooLarge, err
}
// Allow OPTIONS (regardless of content-type)
if r.Method == http.MethodOptions {
return 0, nil
}
// Check content-type
if mt, _, err := mime.ParseMediaType(r.Header.Get("content-type")); err == nil {
for _, accepted := range acceptedContentTypes {
if accepted == mt {
return 0, nil
}
}
}
// Invalid content-type
err := fmt.Errorf("invalid content type, only %s is supported", contentType)
return http.StatusUnsupportedMediaType, err
}
serveSingleRequest实现逻辑如下所示,在这里会调用newHandler来获取用于处理请求的Handler:
代码语言:javascript复制// filedir:go-ethereum-1.10.2rpcserver.go L94
// serveSingleRequest reads and processes a single RPC request from the given codec. This
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
// this mode.
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
return
}
h := newHandler(ctx, codec, s.idgen, &s.services)
h.allowSubscribe = false
defer h.close(io.EOF, nil)
reqs, batch, err := codec.readBatch()
if err != nil {
if err != io.EOF {
codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
}
return
}
if batch {
h.handleBatch(reqs)
} else {
h.handleMsg(reqs[0])
}
}
newHandler实现代码如下所示,这里会调用newCallBack回调函数:
代码语言:javascript复制func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
rootCtx, cancelRoot := context.WithCancel(connCtx)
h := &handler{
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
}
if conn.remoteAddr() != "" {
h.log = h.log.New("conn", conn.remoteAddr())
}
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
return h
}
newCallback函数实现如下所示:
代码语言:javascript复制// filedir:go-ethereum-1.10.2rpcservice.go
// newCallback turns fn (a function) into a callback object. It returns nil if the function
// is unsuitable as an RPC callback.
func newCallback(receiver, fn reflect.Value) *callback {
fntype := fn.Type()
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)}
// Determine parameter types. They must all be exported or builtin types.
c.makeArgTypes()
// Verify return types. The function must return at most one error
// and/or one other non-error value.
outs := make([]reflect.Type, fntype.NumOut())
for i := 0; i < fntype.NumOut(); i {
outs[i] = fntype.Out(i)
}
if len(outs) > 2 {
return nil
}
// If an error is returned, it must be the last returned value.
switch {
case len(outs) == 1 && isErrorType(outs[0]):
c.errPos = 0
case len(outs) == 2:
if isErrorType(outs[0]) || !isErrorType(outs[1]) {
return nil
}
c.errPos = 1
}
return c
}
之后调用startCallProc在一个新的goroutine中执行function并跟踪:
代码语言:javascript复制// filedir:go-ethereum-1.10.2rpchandler.go L219
// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
func (h *handler) startCallProc(fn func(*callProc)) {
h.callWG.Add(1)
go func() {
ctx, cancel := context.WithCancel(h.rootCtx)
defer h.callWG.Done()
defer cancel()
fn(&callProc{ctx: ctx})
}()
}
这里的handleCallMsg用于处理请求并返回执行结果:
代码语言:javascript复制// filedir:go-ethereum-1.10.2rpchandler.go L290
// handleCallMsg executes a call message and returns the answer.
func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
start := time.Now()
switch {
case msg.isNotification():
h.handleCall(ctx, msg)
h.log.Debug("Served " msg.Method, "t", time.Since(start))
return nil
case msg.isCall():
resp := h.handleCall(ctx, msg)
var ctx []interface{}
ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start))
if resp.Error != nil {
ctx = append(ctx, "err", resp.Error.Message)
if resp.Error.Data != nil {
ctx = append(ctx, "errdata", resp.Error.Data)
}
h.log.Warn("Served " msg.Method, ctx...)
} else {
h.log.Debug("Served " msg.Method, ctx...)
}
return resp
case msg.hasValidID():
return msg.errorResponse(&invalidRequestError{"invalid request"})
default:
return errorMessage(&invalidRequestError{"invalid request"})
}
}
这里的HandleCall用于处理方法调用
代码语言:javascript复制// filedir:go-ethereum-1.10.2rpchandler.go L318
// handleCall processes method calls.
func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
if msg.isSubscribe() {
return h.handleSubscribe(cp, msg)
}
var callb *callback
if msg.isUnsubscribe() {
callb = h.unsubscribeCb
} else {
callb = h.reg.callback(msg.Method)
}
if callb == nil {
return msg.errorResponse(&methodNotFoundError{method: msg.Method})
}
args, err := parsePositionalArguments(msg.Params, callb.argTypes)
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
start := time.Now()
answer := h.runMethod(cp.ctx, msg, callb, args)
// Collect the statistics for RPC calls if metrics is enabled.
// We only care about pure rpc call. Filter out subscription.
if callb != h.unsubscribeCb {
rpcRequestGauge.Inc(1)
if answer.Error != nil {
failedReqeustGauge.Inc(1)
} else {
successfulRequestGauge.Inc(1)
}
rpcServingTimer.UpdateSince(start)
newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start)
}
return answer
}
之后调用runMethod方法来处理请求:
代码语言:javascript复制// runMethod runs the Go callback for an RPC method.
func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage {
result, err := callb.call(ctx, msg.Method, args)
if err != nil {
return msg.errorResponse(err)
}
return msg.response(result)
}
紧接着调用Call方法执行最后的调用并返回执行结果:
代码语言:javascript复制// call invokes the callback.
func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res interface{}, errRes error) {
// Create the argument slice.
fullargs := make([]reflect.Value, 0, 2 len(args))
if c.rcvr.IsValid() {
fullargs = append(fullargs, c.rcvr)
}
if c.hasCtx {
fullargs = append(fullargs, reflect.ValueOf(ctx))
}
fullargs = append(fullargs, args...)
// Catch panic while running the callback.
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Error("RPC method " method " crashed: " fmt.Sprintf("%vn%s", err, buf))
errRes = errors.New("method handler crashed")
}
}()
// Run the callback.
results := c.fn.Call(fullargs)
if len(results) == 0 {
return nil, nil
}
if c.errPos >= 0 && !results[c.errPos].IsNil() {
// Method has returned non-nil error value.
err := results[c.errPos].Interface().(error)
return reflect.Value{}, err
}
return results[0].Interface(), nil
}
非 HTTP
当请求不是HTTP请求时会转而走向c.send(ctx, op, msg)方法,这里之所以这样设计是因为http是一个短连接,每次请求都是同步的,直接返回请求结果,而IPC、InProc、 websocket请求都是长连接,每次请求都是异步的,需要在网络线程外监听请求返回的结果
代码语言:javascript复制// CallContext performs a JSON-RPC call with the given arguments. If the context is
// canceled before the call has successfully returned, CallContext returns immediately.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
}
msg, err := c.newMessage(method, args...)
if err != nil {
return err
}
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
}
if err != nil {
return err
}
// dispatch has accepted the request and will close the channel when it quits.
switch resp, err := op.wait(ctx, c); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
return ErrNoResult
default:
return json.Unmarshal(resp.Result, &result)
}
}
send方法的具体实现代码如下所示,在这里请求会被select阻塞直到c.reqInit接收到Op,或者receive到ctx.Done():
代码语言:javascript复制// filedir:go-ethereum-1.10.2rpcclient.go L478
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
select {
case c.reqInit <- op:
err := c.write(ctx, msg, false)
c.reqSent <- err
return err
case <-ctx.Done():
// This can happen if the client is overloaded or unable to keep up with
// subscription notifications.
return ctx.Err()
case <-c.closing:
return ErrClientQuit
}
}
当c.reqInit接收到Op后会把请求的内容写入到conn通道中去
代码语言:javascript复制func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
// The previous write failed. Try to establish a new connection.
if c.writeConn == nil {
if err := c.reconnect(ctx); err != nil {
return err
}
}
err := c.writeConn.writeJSON(ctx, msg)
if err != nil {
c.writeConn = nil
if !retry {
return c.write(ctx, msg, true)
}
}
return err
}
之后向上回溯到op.wait(ctx, c),在该函数中调用ctx.Done添加到请求队列中去:
代码语言:javascript复制// filedir :go-ethereum-1.10.2rpcclient.go
func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) {
select {
case <-ctx.Done():
// Send the timeout to dispatch so it can remove the request IDs.
if !c.isHTTP {
select {
case c.reqTimeout <- op:
case <-c.closing:
}
}
return nil, ctx.Err()
case resp := <-op.resp:
return resp, op.err
}
}
之后client的dispactch方法会收到这个结果
代码语言:javascript复制// filedir: go-ethereum-1.10.2rpcclient.go L538
// dispatch is the main loop of the client.
// It sends read messages to waiting calls to Call and BatchCall
// and subscription notifications to registered subscriptions.
func (c *Client) dispatch(codec ServerCodec) {
var (
lastOp *requestOp // tracks last send operation
reqInitLock = c.reqInit // nil while the send lock is held
conn = c.newClientConn(codec)
reading = true
)
defer func() {
close(c.closing)
if reading {
conn.close(ErrClientQuit, nil)
c.drainRead()
}
close(c.didClose)
}()
// Spawn the initial read loop.
go c.read(codec)
for {
select {
case <-c.close:
return
// Read path:
case op := <-c.readOp:
if op.batch {
conn.handler.handleBatch(op.msgs)
} else {
conn.handler.handleMsg(op.msgs[0])
}
case err := <-c.readErr:
conn.handler.log.Debug("RPC connection read error", "err", err)
conn.close(err, lastOp)
reading = false
// Reconnect:
case newcodec := <-c.reconnected:
log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.remoteAddr())
if reading {
// Wait for the previous read loop to exit. This is a rare case which
// happens if this loop isn't notified in time after the connection breaks.
// In those cases the caller will notice first and reconnect. Closing the
// handler terminates all waiting requests (closing op.resp) except for
// lastOp, which will be transferred to the new handler.
conn.close(errClientReconnected, lastOp)
c.drainRead()
}
go c.read(newcodec)
reading = true
conn = c.newClientConn(newcodec)
// Re-register the in-flight request on the new handler
// because that's where it will be sent.
conn.handler.addRequestOp(lastOp)
// Send path:
case op := <-reqInitLock:
// Stop listening for further requests until the current one has been sent.
reqInitLock = nil
lastOp = op
conn.handler.addRequestOp(op)
case err := <-c.reqSent:
if err != nil {
// Remove response handlers for the last send. When the read loop
// goes down, it will signal all other current operations.
conn.handler.removeRequestOp(lastOp)
}
// Let the next request in.
reqInitLock = c.reqInit
lastOp = nil
case op := <-c.reqTimeout:
conn.handler.removeRequestOp(op)
}
}
}
之后通过c.read(codec)读取server通过conn返回的数据:
代码语言:javascript复制// filedir:go-ethereum-1.10.2rpcclient.go L630
// read decodes RPC messages from a codec, feeding them into dispatch.
func (c *Client) read(codec ServerCodec) {
for {
msgs, batch, err := codec.readBatch()
if _, ok := err.(*json.SyntaxError); ok {
codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()}))
}
if err != nil {
c.readErr <- err
return
}
c.readOp <- readOp{msgs, batch}
}
}
之后将server返回数据send到c.readOp,之后调用handler(handleBatchhandleMsg)处理请求,后续逻辑和HTTP请求处理一致,这里不再赘述~
RPC使用
以太坊JSON RPC接口文档可以访问以下链接进行查看:
http://cw.hubwiz.com/card/c/ethereum-json-rpc-api/
这里主要分为以下几个模块:
- web3:web3.js相关操作
- net:与网络相关的操作
- eth:以太坊关键RPC交互
- db:数据库交互
- shh:whisper相关操作
在以太坊中我们还可以通过JSON RPC来管理API,具体示例可以参考以下连接:
http://cw.hubwiz.com/card/c/geth-rpc-api/
其Geth模块又可以划分为以下几个模块:
- admin:Geth节点管理
- debug:Geth节点调试
- miner:矿工和DAG管理
- personal:帐户管理
- txpool:事务池检查
文末小结
这里关于RPC的调用不再展开进行介绍了,有兴趣的读者可以结合前面的章节自我搭建以太坊测试链之后开启RPC调用支持后结合上面的说明文件进行测试~