Go io 流管道连接
Go 一系列的 io 操作的接口非常棒,可以将一切东西归为 Read()
、Write()
,以及其他少数几个接口。完美诠释了 Linux 万物皆文件的思想
(对比其他语言,虽然也是同样的思路,但是还暴露的更多的接口,反而增大了使用复杂度)
在一些场景下,需要实现将两个流拼接起来,比如提供一个服务器上的 gRPC 服务,可以通过 K8S 或是 Docker 的接口,连接到容器内执行命令(exec)。Docker 本身提供的是一个 Reader & Writer 的接口,CRI 接口则要求的是 Reader、Writer 流。
而 gRPC 流接口则给出的是被阻塞的 Message。
因此需要将 gRPC 的数据转换为流,目前内置的各种相关功能都无法满足需要:
bytes.Buffer
: 读完缓冲区就会直接结束(这时可能 gRPC 还没开始写入)io.Copy
: 复制到 EOF 就会结束io.Pipe
: 返回的是新的 Reader 和 Writer,无法直接连接 Reader 和 Writer
因此,应该仿照 io.Pipe
实现一个可以将 Write()
和 Read()
函数连接起来,同时可以通过 ReadFrom()
和 WriteTo()
将其连接到其他 Reader 和 Writer 中
使用样例
两个流直接连接
将 stdin 连接到 stdout
pipe.RWPipe(stdin, stdout)
将输入流同步到其他流中
stdin := pipe.New() go stdin.WriteTo(os.Stdout)
将非流数据转换为流
in := pipe.New() go in.WriteTo(os.Stdout) for { msg := getMessage() in.Write(msg) }
将流数据转换为非流
out := pipe.New() go out.ReadFrom(os.Stdin) reader := bufio.NewReader(out) for { b, err := reader.ReadBytes('\n') send(b) }
代码
代码中 WriteTo()
和 ReadFrom()
需要在单独的 goroutine 中运行
package pipe import ( "io" "sync" ) func RWPipe(r io.Reader, w io.Writer) (n int64, err error) { buf := make([]byte, 512) var n1, n2 int for { n1, err = r.Read(buf) if err != nil { break } n2, err = w.Write(buf[:n1]) if err != nil { break } n += int64(n2) } return } type Pipe struct { wrMu sync.Mutex wrCh chan []byte rdCh chan int once sync.Once done chan struct{} } func New() *Pipe { return &Pipe{ wrMu: sync.Mutex{}, wrCh: make(chan []byte), rdCh: make(chan int), once: sync.Once{}, done: make(chan struct{}), } } func (p *Pipe) Read(b []byte) (int, error) { select { case bw := <-p.wrCh: nr := copy(b, bw) p.rdCh <- nr return nr, nil case <-p.done: return 0, io.EOF } } func (p *Pipe) Write(b []byte) (int, error) { select { case <-p.done: return 0, io.EOF default: p.wrMu.Lock() defer p.wrMu.Unlock() } n := 0 for once := true; once || len(b) > 0; once = false { select { case p.wrCh <- b: nw := <-p.rdCh b = b[nw:] n += nw case <-p.done: return n, io.EOF } } return n, nil } func (p *Pipe) ReadFrom(r io.Reader) (n int64, err error) { defer p.Close() return RWPipe(r, p) } func (p *Pipe) WriteTo(w io.Writer) (n int64, err error) { defer p.Close() return RWPipe(p, w) } func (p *Pipe) Close() { p.once.Do(func() { close(p.done) }) }