开发者

如何解析golang中Context在HTTP服务中的角色

目录
  • 问题背景
  • 错误追踪
    • Context
  • 总结

    问题背景

    在go语言的http服务中,我们常javascript常会使用到Context来取消一个请求,或者取消数据的读取。偶然的一次尝试,让我对Context有了一定的兴趣。

    接下来本文围绕下面的例子,分析http如何利用Context来控制请求的取消和影响数据读取。

    例子

    我们开启一个http服务,发送大量数据给每个请求,代码如下:

    srv.go:http服务

    package main
    
    import (
    	"fmt"
    	"net/http"
    )
    
    func hello(w http.ResponseWriter, r *http.Request) {
    	for i := 0; i < 100*10000; i++ {
    		w.Write([]byte("hello world"))
    	}
    }
    
    func main() {
    	fmt.Println("listening 8888:")
    	http.HandleFunc("/hello", hello)
    	_ = http.ListenAndServe(":8888", nil)
    }
    
    

    client.go: 发送请求的客户端

    package main
    
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"net/http"
    	"time"
    )
    
    func main() {
    
    	client := http.Client{}
    	request, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:8888/hello", nil)
    	ctx, cancelFunc := context.WithCancel(request.Context())
    	request = request.WithContext(ctx)
    	if err != nil {
    		return
    	}
    	response, err := client.Do(request)
    	if err != nil {
    		log.Fatal(err)
    	}
    	cache := make([]byte, 128)
    	timer := time.NewTimer(time.Millisecond)
    	go func() {
    		select {
    		case <-timer.C:
    			cancelFunc()
    		}
    	}()
    	for {
    		read, err := response.Body.Read(cache)
    		if err == nil {
    			fmt.Println(string(cache[:read]))
    			continue
    		}
    		if err == io.EOF {
    			fmt.Println(string(cache[:read]))
    			break
    		}
    		log.Fatal(err)
    	}
    
    }
    
    

    代码很简单,就不做注释啦。分别启动服务和client,我们将得到如下结果:

    如何解析golang中Context在HTTP服务中的角色

    我们看到这句话Process finished with the exit code 1,程序非正常退出,那么首先是追踪这个错误,下面我们追踪这个错误。

    错误追踪

    首先清楚这个“context canceled” 是客户端打印出来的:

    log.Fatal(err)
    // 这个错误来源于读取Response中的数据时得到错误,而且这个错误非io.EOF错误
    

    断点入口:

    read, err := response.Body.Read(cache)

    我们会进入transport.go文件中:

    func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { // 这里表明我们读取的body是bodyEOFSignal类型
    	es.mu.Lock()
    	closed, rerr := es.closed, es.rerr
    	es.mu.Unlock()
    	if closed {
    		return 0, errReadOnClosedResBody
    	}
    	if rerr != nil {
    		return 0, rerr
    	}
    
    	n, err = es.body.Read(p)// 我们在这里读到了错误,这里是什么错误,在后面将会介绍
    	if err != nil {
    		es.mu.Lock()
    		defer es.mu.Unlock()
    		if es.rerr == nil {
    			es.rerr = err
    		}
    		err = es.condfn(err) // 通过这个方法对错误进行判别,得到上层传下来的错误信息
    	}
    	return
    }
    

    然后我们继续进入到bodyEOFSignal的condfn(error)函数中:

    func (es *bodyEOFSignal) condfn(err error) error {
    	if es.fn == nil {
    		return err //1
    	}
    	err = es.fn(err) // 如果fn不为空,这里会继续到bodyEOFSignal去得到上层的错误信息;fn为空,显然错误和上层就没有关系,就在上面1处就返回了。除此,因为client从这个body读的数据,这里的错误是通过fn从上层获取。
    	es.fn = nil
    	return err
    }
    

    那我们继续到es.fn(err)中一探究竟:

    body := &bodyEOFSignal{
    			body: resp.Body,
    			earlyCloseFn: func() error {
    				waitForBodyRead <- false
    				<-eofc // will be closed by deferred call at the end of the function
    				return nil
    
    			},
    			fn: func(err error) error {// 就到了这里,这一段代码源自transport.go中的封装内部类persistConn的方法readLoop,顾名思义:循环读取
    			// 这里会简单的android皮判断错误是不是io.EOF,然后作进一步处理
    				iseoF := err == io.EOF
    				waitForBodyRead <- isEOF
    				if isEOF {
    					<-eofc // see comment above eofc declaration
    				} else if err != nil {
    					if cerr := pc.canceled(); cerr != nil {// 继续调试我们就到了这里,显然不是io.EOF错误
    						return cerr // 返回的是pc.canceled()
    					}
    				}
    				return err
    			},
    		}
    

    继续到pc.canceled()中:

    func (pc *persistConn) canceled() error {
    	pc.mu.Lock()
    	defer pc.mu.Unlock()
    	return pc.canceledErr // 返回的这个错误,那么下一步便需要知道这个canceledErr是什么?如何被赋值?
    }
    

    1. 是什么?

    canceledErr          error // set non-nil if conn is canceled 
    //是一种错误,且如果非空,则连接被取消,那么这个错误是一个连接状态的标志或者连接断开的原因
    

    2. 如何被赋值?

    根据canceledErr,我们找被赋值的函数如下:

    func (pc *persistConn) cancelRequest(err error) {
    	pc.mu.Lock()
    	defer pc.mu.Unlock()
    	pc.canceledErr = err // 在这里被赋值
    	pc.closeLocked(errRequestCanceled)
    }
    

    错误追踪先到这里。接下来我们换一个角度,我们从Context的角度来看。

    Context

    这里就不讲context了,有兴趣的伙伴去官网获取吧!!!回到客户端代码,给request传入了一个WithCancel context,看看这个函数做了什么:

    func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    	if parent == nil {
    		panic("cannot create context from nil parent")
    	}
    	c := newCancelCtx(parent) // 包装父类Context
    	propagateCancel(parent, &c)
    	return &c, func() { 
    		c.cancel(true, Canceled) // 返回一个取消函数
    	}
    }
    

    进入到c.cancel(),会发现Canceled作为一个错误类型,定义如下:

    // Canceled is the error returned by Context.Err when the context is canceled.
    var Canceled = errors.New("context canceled")// 这个不是客户端打印的吗?是不是很激动,找到了错误信息的祖宗
    ...
    //而cancel函数定义如下:
    // cancel closes c.done, cancels each of c's children, and, if
    // removeFromParent is true, removes c from its parent's children.
    func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    	...
    	c.err = err //这里做了一个赋值,即把这个错误传给cancelCtx了,它是Context的一个内部类
    	...
    	// 做一些子context的通知以及错误的传递,说取消了,不用干了
    }
    

    context先到这里,在context里找到了错误信息的来源,接下来看看错误是如何传给前面我们谈到的canceledErr。

    似乎还有一个入口没有看,就是http.client.Do的方法:

    我们打断点进入到RoundTrip方法的调用入口,看看下面是如何感知context被取消:

    resp, err = rt.RoundTrip(req) //这个在send()方法内部调用
    
    ...
    
    // send issues an HTTP request.
    // Caller should close resp.Body when done reading from it.
    func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    	...
    	resp, err = rt.RoundTrip(req) 
    	...
    }
    

    然后跟着RoundTrip(…), 进入到:

    func (t *Transport)js roundTrip(req *Request) (*Response, error) {
    	...
    	var resp *Response
    		if pconn.alt != nil {
    			// HTTP/2 path.
    			t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
    			resp, err = pconn.alt.RoundTrip(req)
    		} else {
    			resp, err = pconn.roundTrip(treq) // 继续可到这里,我们看看这个pconn,刚好就是前面提到的persistConn,它里面包含了canceledErr,似乎我们离真相更近了
    		}
    }
    

    进入到persistConn的实现方法roundTrip(),我们看看这个for循环:

    var respHeaderTimer <-chan time.Time
    cancelChan := req.Request.Cancel
    ctxDoneChan := req.Context().Done() //这个request是setRequestCancel(req *Request, rt RoundTripper, deadline time.Time)中重新定义的request,里实现了超时取消的机制,这里的监听便是超时的监听,并不是我们取消的监听
    pcClosed := pc.closech
    canceled := false
    for {
    		testHookWaitResLoop()
    		select { // select开启对channel的轮询
    		case err := <-writeErrCh:
    			if debugRoundTrip {
    				req.logf("writeErrCh resv: %T/%#v", err, err)
    			}
    			if err != nil {
    				pc.close(fmt.Errorf("write error: %v", err))
    				return nil, pc.mapRoundTripError(req, startBytesWritten, err)
    			}
    			if d := pc.t.ResponseHeaderTimeout; d > 0 {
    				if debugRoundTrip {
    					req.logf("starting timer for %v", d)
    				}
    				timer := time.NewTimer(d)
    				defer timer.Stop() // prevent leaks
    				respHeaderTimer = timer.C
    			}
    		case <-pcClosed:
    			pcClosed = nil
    			if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) {
    				if debugRoundTrip {
    					req.logf("closech recv: %T %#v", pc.closed, pc.closed)
    				}
    				return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
    			}
    		case <-respHeaderTimer:
    			if debugRoundTrip {
    				req.logf("timeout waiting for response headers.")
    			}
    			pc.close(errTimeout)
    			return nil, errTimeout
    		case re := <-resc:
    			if (re.res == nil) == (re.err == nil) {
    				panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
    			}
    			if debugRoundTrip {
    				req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
    			}
    			if re.err != nil {
    				return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
    			}
    			return re.res, nil
    		case <-cancelChan:
    			canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
    			cancelChan = nil
    		case <-ctxDoneChan:
    			canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
    			cancelChan = nil
    			ctxDoneChan = nil
    		}
    	}
    

    因而这里的监听不是在客户端取消的context的监听,根据客户端的输出显示,表明请求已经发送到服务端,请求并未超时,response也返回了,那么这里的函数监听是与我们读取数据没有联系。

    小编最开始也以为是在这里监听返回,然而这里打断点,怎么进不来。

    在前面提到,连接是类型为persistConn,其次是读取数据过程中,context的取消会产生影响,那么表明错误发生在tcp连接中的读取数据。

    接下来,根据连接建立过程,看看http做了什么?其次是真正的数据读取来自哪里?

    pconn, err := t.getConn(treq, cm)
    ...
    func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
    	req := treq.Request
    	trace := treq.trace
    	ctx := req.Context() //这里去了request的context
    	w := &wantConn{
    			cm:         cm,
    			key:        cm.key(),
    			ctx:        ctx, //传给w
    			ready:      make(chan struct{}, 1),
    			beforeDial: testHookPrePendingDial,
    			afterDial:  testHookPostPendingDial,
    		}
    	...
    	
    	select{
    	case <-w.ready:
    		if w.err != nil {
    				// If the request has been canceled, that's probably
    				// what caused w.err; if so, prefer to return the
    				// cancellation error (see golang.org/issue/16049).
    				//如果建立连接前,请求被取消,这里会监听到取消的err
    				select {
    				case <-req.Cancel:
    					return nil, errRequestCanceledConn
    				case <-req.Context().Done():
    					return nil, req.Context().Err()
    				case err := <-cancelc:
    					if err == errRequestCanceled {
    						err = errRequestCanceledConn
    					}
    					return nil, err
    				default:
    					// return below
    				}
    			}
    	return w.pc, w.err//这里返回的是persistConn
    		...	
    

    通过这个w建立连接,进入到dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error)。 在这里面开启了一个协程pconn.readLoop(),读取连接里面的数据。

    (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
    	...
    	go pconn.readLoop()
    }
    

    因为错误与数据读取有直接联系,至少错误发生readloop中的某一个地方:

    for alive {
    		...
    
    		var resp *Response
    		if err == nil {
    			resp, err = pc.readResponse(rc, trace) // 得到response
    		} else {
    			err = transportReadFromServerError{err}
    			closeErr = err
    		}
    		...
    
    		waitForBodyRead := make(chan bool, 2)
    		body := &bodyEOFSignal{ //对上面读取的resp.Body进行封装,这里封装主要是传递请求取消的错误
    			body: resp.Body,
    			earlyCloseFn: func() error {
    				waitForBodyRead <- false
    				<-eofc // will be closed by deferred call at the end of the function
    				return nil
    
    			},
    			fn: func(err error) error {// 
    				isEOF := err == io.EOF
    				waitForBodyRead <- isEOF
    				if isEOF {
    					<-eofc // see comment above eofc declaration
    				} else if err != nil {
    					if cerr := pc.canceled(); cerr != nil {
    						return cerr
    					}
    				}
    				return err
    			},
    		}
    
    		resp.Body = body
    		...
    
    		// Before looping back to the top of this function and peeking on
    		// the bufio.Reader, wait for the caller goroutine to finish
    		// reading the response body. (or for cancellation or death)
    		// 这里有开启监听,显然是监听读的过程中发生的取消和超时等
    		select {
    		case bodyEOF := <-waitForBodyRead:
    			replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
    			alive = alive &&
    				bodyEOF &&
    				!pc.sawEOF &&
    				pc.wroteRequest() &&
    				replaced && tryPutIdleConn(trace)
    			if bodyEOF {
    				eofc <- struct{}{}
    			}
    		case <-rc.req.Cancel:
    			alive = false
    			pc.t.CancelRequest(rc.req)
    		case <-rc.req.Context().Done(): //这里便监听了客户顿context的取消
    			alive = false //结束循环
    			pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())//传递err
    		case <-pc.closech:
    			alive = false
    		}
    
    		testHookReadLoopBeforeNextRead()
    	}
    

    熟悉context的便知道,当我们调用context的cancel方法时,在前面的context的cancel()方法中有如下代码:

    	d, _ := c.done.Load().(chan struct{}) // 拿到Done方法的返回值channel
    	if d == nil {
    		c.done.Store(closedchan)
    	} else {
    		close(d)// 关闭channel,而关闭时会向channel写入值
    	}
    

    再回到:

    ccase <-rc.req.Context().Done():// 当contex取消,便进入这个代码块
    			alive = false
    			pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
    

    进入到cancelRequest(…)的rc.req.Context().Err()

    func (c *cancelCtx) Err() error {
    	c.mu.Lock()
    	err := c.err//这里似曾相识,前面我们说到context调用取消函数时,会给c.err赋值为cancelErr
    	c.mu.Unlock()
    	return err
    }
    

    因而传入canc编程elRequest的err便是cancelErr,我们进入cancelRequest:

    func (t *Transport) cancelRequest(key cancelKey, err error) bool {
    	// This function must not return until the cancel func has completed.
    	// See: https://golang.org/issue/34658
    	t.reqMu.Lock()
    	defer t.reqMu.Unlock()
    	cancel := t.reqCanceler[key]// 这里的key正是我们传入的请求的cancelkey,拿到reqCanceler中的func(error)
    	delete(t.reqCanceler, key)
    	if cancel != nil {
    		cancel(err) // 进入cancel
    	}
    
    	return cancel != nil
    }
    

    进入cancel(err):

    func (pc *persistConn) cancelRequest(err error) {//这个函数不正是我们前面追踪错误所看见的,这也表明我们追踪是正确的
    	pc.mu.Lock()
    	defer pc.mu.Unlock()
    	pc.canceledErr = err 
    	pc.closeLocked(errRequestCanceled)
    }
    

    到这里我们的err就传给了body bodyEOFSignal,整个错误传递流程便走通了。

    还剩最后一个问题,bodyEOFSignal的read函数中n, err = es.body.Read§ 所遇到的错误是什么?

    n, err = es.body.Read(p)// 调试发现是网络连接关闭错误,这里表明我们执行完err的传递根本原因在于连接被关闭
    	if err != nil {
    		es.mu.Lock()
    		defer es.mu.Unlock()
    		if es.rerr == nil {
    			es.rerr = err
    		}
    		err = es.condfn(err)
    	}
    	return
    

    那么关闭连接又是在哪里呢?

    我们回到cancelRequest函数:

    pc.closeLocked(errRequestCanceled) //这里便关闭了连接

    这样err整个传递逻辑和原因便都走同通了!

    总结

    经过上面的分析,将整个Context取消过程总结如下:

    1.当创建一个带有取消的Context,会把Context的内部类中的err变量赋值为CancelErr;

    2.客户端的调用cancelFunc,会向context的Done所绑定的channel写入值;

    3.当channel写入值后,transport.go中的readLoop方法会监听这个channel的写入,从而把context取消的err传给persistConn,并关闭连接;

    4.关闭连接后,数据读取便会遇到连接关闭的网络错误错误,当遇到这个错误,在bodySignal中进行错误处理,这里并不感知连接的关闭,只利用fn分别错误类型,当错误为io.EOF,直接将这个错误置为nil,若不是,便通过bodySignal获取到连接中的错误,再返回这个错误;

    5.最后通过body.read()方法将错误打印出来。

    6.这里复杂在于,每个角色只做自己的工作,遇到错误不是直接返回,而是等待其他角色来读取错误;具体表现为:context负责生成错误消息、传递取消指令给persistConn;persistConn基于bodySignal建立读取数据和连接的关联,响应Context的取消并关闭连接,拿到context的错误信息;client读取数据和错误;bodySignal:分析错误,并传递数据和persiYLKGWUkJTwstConn的错误消息给client。 

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜