Files
Xray-core/transport/internet/grpc/encoding/hunkconn.go
风扇滑翔翼 a610a4c89a
Some checks failed
Build and Release for Windows 7 / check-assets (push) Has been cancelled
Build and Release / check-assets (push) Has been cancelled
Test / check-assets (push) Has been cancelled
Build and Release for Windows 7 / build (win7-32, 386, windows) (push) Has been cancelled
Build and Release for Windows 7 / build (win7-64, amd64, windows) (push) Has been cancelled
Build and Release / build (386, freebsd, ) (push) Has been cancelled
Build and Release / build (386, linux, ) (push) Has been cancelled
Build and Release / build (386, openbsd, ) (push) Has been cancelled
Build and Release / build (386, windows, ) (push) Has been cancelled
Build and Release / build (amd64, android, android-amd64) (push) Has been cancelled
Build and Release / build (amd64, darwin, ) (push) Has been cancelled
Build and Release / build (amd64, freebsd, ) (push) Has been cancelled
Build and Release / build (amd64, linux, ) (push) Has been cancelled
Build and Release / build (amd64, openbsd, ) (push) Has been cancelled
Build and Release / build (amd64, windows, ) (push) Has been cancelled
Build and Release / build (arm, 5, linux) (push) Has been cancelled
Build and Release / build (arm, 6, linux) (push) Has been cancelled
Build and Release / build (arm, 7, freebsd) (push) Has been cancelled
Build and Release / build (arm, 7, linux) (push) Has been cancelled
Build and Release / build (arm, 7, openbsd) (push) Has been cancelled
Build and Release / build (arm, 7, windows) (push) Has been cancelled
Build and Release / build (arm64, android) (push) Has been cancelled
Build and Release / build (arm64, darwin) (push) Has been cancelled
Build and Release / build (arm64, freebsd) (push) Has been cancelled
Build and Release / build (arm64, linux) (push) Has been cancelled
Build and Release / build (arm64, openbsd) (push) Has been cancelled
Build and Release / build (arm64, windows) (push) Has been cancelled
Build and Release / build (loong64, linux) (push) Has been cancelled
Build and Release / build (mips, linux) (push) Has been cancelled
Build and Release / build (mips64, linux) (push) Has been cancelled
Build and Release / build (mips64le, linux) (push) Has been cancelled
Build and Release / build (mipsle, linux) (push) Has been cancelled
Build and Release / build (ppc64, linux) (push) Has been cancelled
Build and Release / build (ppc64le, linux) (push) Has been cancelled
Build and Release / build (riscv64, linux) (push) Has been cancelled
Build and Release / build (s390x, linux) (push) Has been cancelled
Test / test (macos-latest) (push) Has been cancelled
Test / test (ubuntu-latest) (push) Has been cancelled
Test / test (windows-latest) (push) Has been cancelled
Scheduled assets update / geodat (push) Has been cancelled
Chore: Remove all double gonet import (#5402)
2025-12-10 07:17:29 +00:00

153 lines
2.9 KiB
Go

package encoding
import (
"context"
"io"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/net/cnc"
"github.com/xtls/xray-core/common/signal/done"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
type HunkConn interface {
Context() context.Context
Send(*Hunk) error
Recv() (*Hunk, error)
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}
type StreamCloser interface {
CloseSend() error
}
type HunkReaderWriter struct {
hc HunkConn
cancel context.CancelFunc
done *done.Instance
buf []byte
index int
}
func NewHunkReadWriter(hc HunkConn, cancel context.CancelFunc) *HunkReaderWriter {
return &HunkReaderWriter{hc, cancel, done.New(), nil, 0}
}
func NewHunkConn(hc HunkConn, cancel context.CancelFunc) net.Conn {
var rAddr net.Addr
pr, ok := peer.FromContext(hc.Context())
if ok {
rAddr = pr.Addr
} else {
rAddr = &net.TCPAddr{
IP: []byte{0, 0, 0, 0},
Port: 0,
}
}
md, ok := metadata.FromIncomingContext(hc.Context())
if ok {
header := md.Get("x-real-ip")
if len(header) > 0 {
realip := net.ParseAddress(header[0])
if realip.Family().IsIP() {
rAddr = &net.TCPAddr{
IP: realip.IP(),
Port: 0,
}
}
}
}
wrc := NewHunkReadWriter(hc, cancel)
return cnc.NewConnection(
cnc.ConnectionInput(wrc),
cnc.ConnectionOutput(wrc),
cnc.ConnectionOnClose(wrc),
cnc.ConnectionRemoteAddr(rAddr),
)
}
func (h *HunkReaderWriter) forceFetch() error {
hunk, err := h.hc.Recv()
if err != nil {
if err == io.EOF {
return err
}
return errors.New("failed to fetch hunk from gRPC tunnel").Base(err)
}
h.buf = hunk.Data
h.index = 0
return nil
}
func (h *HunkReaderWriter) Read(buf []byte) (int, error) {
if h.done.Done() {
return 0, io.EOF
}
if h.index >= len(h.buf) {
if err := h.forceFetch(); err != nil {
return 0, err
}
}
n := copy(buf, h.buf[h.index:])
h.index += n
return n, nil
}
func (h *HunkReaderWriter) ReadMultiBuffer() (buf.MultiBuffer, error) {
if h.done.Done() {
return nil, io.EOF
}
if h.index >= len(h.buf) {
if err := h.forceFetch(); err != nil {
return nil, err
}
}
if cap(h.buf) >= buf.Size {
b := h.buf
h.index = len(h.buf)
return buf.MultiBuffer{buf.NewExisted(b)}, nil
}
b := buf.New()
_, err := b.ReadFrom(h)
if err != nil {
return nil, err
}
return buf.MultiBuffer{b}, nil
}
func (h *HunkReaderWriter) Write(buf []byte) (int, error) {
if h.done.Done() {
return 0, io.ErrClosedPipe
}
err := h.hc.Send(&Hunk{Data: buf[:]})
if err != nil {
return 0, errors.New("failed to send data over gRPC tunnel").Base(err)
}
return len(buf), nil
}
func (h *HunkReaderWriter) Close() error {
if h.cancel != nil {
h.cancel()
}
if sc, match := h.hc.(StreamCloser); match {
return sc.CloseSend()
}
return h.done.Close()
}