Compare commits

..

5 Commits

Author SHA1 Message Date
yuhan6665
7f48a2a3d0 Add back pipe for reverse proxy portal
Some checks failed
Build and Release / build (amd64, android, android-amd64) (push) Has been cancelled
Build and Release / build (amd64, darwin, ) (push) Has been cancelled
Build and Release / build (amd64, freebsd, ) (push) Has been cancelled
Build and Release / build (amd64, linux, ) (push) Has been cancelled
Build and Release / build (amd64, openbsd, ) (push) Has been cancelled
Build and Release / build (amd64, windows, ) (push) Has been cancelled
Build and Release / build (arm, 5, linux) (push) Has been cancelled
Build and Release / build (arm, 6, linux) (push) Has been cancelled
Build and Release / build (arm, 7, freebsd) (push) Has been cancelled
Build and Release / build (arm, 7, linux) (push) Has been cancelled
Build and Release / build (arm, 7, openbsd) (push) Has been cancelled
Build and Release / build (arm, 7, windows) (push) Has been cancelled
Build and Release / build (arm64, android) (push) Has been cancelled
Build and Release / build (arm64, darwin) (push) Has been cancelled
Build and Release / build (arm64, freebsd) (push) Has been cancelled
Build and Release / build (arm64, linux) (push) Has been cancelled
Build and Release / build (arm64, openbsd) (push) Has been cancelled
Build and Release / build (arm64, windows) (push) Has been cancelled
Build and Release / build (loong64, linux) (push) Has been cancelled
Build and Release / build (mips, linux) (push) Has been cancelled
Build and Release / build (mips64, linux) (push) Has been cancelled
Build and Release / build (mips64le, linux) (push) Has been cancelled
Build and Release / build (mipsle, linux) (push) Has been cancelled
Build and Release / build (ppc64, linux) (push) Has been cancelled
Build and Release / build (ppc64le, linux) (push) Has been cancelled
Build and Release / build (riscv64, linux) (push) Has been cancelled
Build and Release / build (s390x, linux) (push) Has been cancelled
Test / test (macos-latest) (push) Has been cancelled
Test / test (ubuntu-latest) (push) Has been cancelled
Test / test (windows-latest) (push) Has been cancelled
2025-09-21 20:07:38 -04:00
yuhan6665
b285ccc180 Add back pipe for Mux
Some checks failed
Build and Release / build (amd64, darwin, ) (push) Has been cancelled
Build and Release / build (amd64, freebsd, ) (push) Has been cancelled
Build and Release / build (amd64, linux, ) (push) Has been cancelled
Build and Release / build (amd64, openbsd, ) (push) Has been cancelled
Build and Release / build (amd64, windows, ) (push) Has been cancelled
Build and Release / build (arm, 5, linux) (push) Has been cancelled
Build and Release / build (arm, 6, linux) (push) Has been cancelled
Build and Release / build (arm, 7, freebsd) (push) Has been cancelled
Build and Release / build (arm, 7, linux) (push) Has been cancelled
Build and Release / build (arm, 7, openbsd) (push) Has been cancelled
Build and Release / build (arm, 7, windows) (push) Has been cancelled
Build and Release / build (arm64, android) (push) Has been cancelled
Build and Release / build (arm64, darwin) (push) Has been cancelled
Build and Release / build (arm64, freebsd) (push) Has been cancelled
Build and Release / build (arm64, linux) (push) Has been cancelled
Build and Release / build (arm64, openbsd) (push) Has been cancelled
Build and Release / build (arm64, windows) (push) Has been cancelled
Build and Release / build (loong64, linux) (push) Has been cancelled
Build and Release / build (mips, linux) (push) Has been cancelled
Build and Release / build (mips64, linux) (push) Has been cancelled
Build and Release / build (mips64le, linux) (push) Has been cancelled
Build and Release / build (mipsle, linux) (push) Has been cancelled
Build and Release / build (ppc64, linux) (push) Has been cancelled
Build and Release / build (ppc64le, linux) (push) Has been cancelled
Build and Release / build (riscv64, linux) (push) Has been cancelled
Build and Release / build (s390x, linux) (push) Has been cancelled
Test / check-assets (push) Has been cancelled
Test / test (macos-latest) (push) Has been cancelled
Test / test (ubuntu-latest) (push) Has been cancelled
Test / test (windows-latest) (push) Has been cancelled
For Mux, we need to use pipe to guard against multiple sub-connections writing back responses at the same time
2025-09-21 10:52:05 -04:00
yuhan6665
97f977003d Add Vless Mux and Enc tests 2025-09-21 10:52:05 -04:00
yuhan6665
1b8eba1338 Inbound remove NoTerminationSignal 2025-09-20 18:31:11 -04:00
yuhan6665
b14d5407e5 Refactor code to use DispatchLink() in vmess inbound
- Always apply NoTerminationSignal
2025-09-20 18:31:11 -04:00
25 changed files with 387 additions and 360 deletions

View File

@@ -45,7 +45,6 @@
- [wulabing/xray_docker](https://github.com/wulabing/xray_docker)
- Web Panel - **WARNING: Please DO NOT USE plain HTTP panels like 3X-UI**, as they are believed to be bribed by Iran GFW for supporting plain HTTP by default and refused to change (https://github.com/XTLS/Xray-core/pull/3884#issuecomment-2439595331), which has already put many users' data security in danger in the past few years. **If you are already using 3X-UI, please switch to the following panels, which are verified to support HTTPS and SSH port forwarding only:**
- [X-Panel](https://github.com/xeefei/X-Panel)
- [PasarGuard](https://github.com/PasarGuard/panel)
- [Remnawave](https://github.com/remnawave/panel)
- [Marzban](https://github.com/Gozargah/Marzban)
- [Xray-UI](https://github.com/qist/xray-ui)

View File

@@ -108,7 +108,7 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou
}
h.proxyConfig = proxyConfig
ctx = session.ContextWithFullHandler(ctx, h)
ctx = session.ContextWithHandler(ctx, h)
rawProxyHandler, err := common.CreateObject(ctx, proxyConfig)
if err != nil {

View File

@@ -198,11 +198,9 @@ func (w *BridgeWorker) handleInternalConn(link *transport.Link) {
func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
if !isInternalDomain(dest) {
if session.InboundFromContext(ctx) == nil {
ctx = session.ContextWithInbound(ctx, &session.Inbound{
Tag: w.Tag,
})
}
ctx = session.ContextWithInbound(ctx, &session.Inbound{
Tag: w.Tag,
})
return w.Dispatcher.Dispatch(ctx, dest)
}
@@ -223,11 +221,9 @@ func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*tra
func (w *BridgeWorker) DispatchLink(ctx context.Context, dest net.Destination, link *transport.Link) error {
if !isInternalDomain(dest) {
if session.InboundFromContext(ctx) == nil {
ctx = session.ContextWithInbound(ctx, &session.Inbound{
Tag: w.Tag,
})
}
ctx = session.ContextWithInbound(ctx, &session.Inbound{
Tag: w.Tag,
})
return w.Dispatcher.DispatchLink(ctx, dest, link)
}

View File

@@ -72,7 +72,14 @@ func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) err
}
if isDomain(ob.Target, p.domain) {
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
opts := pipe.OptionsFromContext(ctx)
uplinkReader, uplinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...)
muxClient, err := mux.NewClientWorker(transport.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
}, mux.ClientStrategy{})
if err != nil {
return errors.New("failed to create mux client worker").Base(err).AtWarning()
}
@@ -84,11 +91,24 @@ func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) err
p.picker.AddWorker(worker)
if _, ok := link.Reader.(*pipe.Reader); !ok {
select {
case <-ctx.Done():
case <-muxClient.WaitClosed():
inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
requestDone := func() error {
if err := buf.Copy(link.Reader, inboundLink.Writer); err != nil {
return errors.New("failed to transfer request").Base(err)
}
return nil
}
responseDone := func() error {
if err := buf.Copy(inboundLink.Reader, link.Writer); err != nil {
return err
}
return nil
}
requestDonePost := task.OnSuccess(requestDone, task.Close(inboundLink.Writer))
if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
common.Interrupt(inboundLink.Reader)
common.Interrupt(inboundLink.Writer)
return errors.New("connection ends").Base(err)
}
return nil
}

View File

@@ -264,11 +264,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
transferType = protocol.TransferTypePacket
}
s.transferType = transferType
var inbound *session.Inbound
if session.IsReverseMuxFromContext(ctx) {
inbound = session.InboundFromContext(ctx)
}
writer := NewWriter(s.ID, ob.Target, output, transferType, xudp.GetGlobalID(ctx), inbound)
writer := NewWriter(s.ID, ob.Target, output, transferType, xudp.GetGlobalID(ctx))
defer s.Close(false)
defer writer.Close()
@@ -388,7 +384,7 @@ func (m *ClientWorker) fetchOutput() {
var meta FrameMetadata
for {
err := meta.Unmarshal(reader, false)
err := meta.Unmarshal(reader)
if err != nil {
if errors.Cause(err) != io.EOF {
errors.LogInfoInner(context.Background(), err, "failed to read metadata")

View File

@@ -11,7 +11,6 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/serial"
"github.com/xtls/xray-core/common/session"
)
type SessionStatus byte
@@ -61,7 +60,6 @@ type FrameMetadata struct {
Option bitmask.Byte
SessionStatus SessionStatus
GlobalID [8]byte
Inbound *session.Inbound
}
func (f FrameMetadata) WriteTo(b *buf.Buffer) error {
@@ -81,23 +79,11 @@ func (f FrameMetadata) WriteTo(b *buf.Buffer) error {
case net.Network_UDP:
common.Must(b.WriteByte(byte(TargetNetworkUDP)))
}
if err := addrParser.WriteAddressPort(b, f.Target.Address, f.Target.Port); err != nil {
return err
}
if f.Inbound != nil {
if f.Inbound.Source.Network == net.Network_TCP || f.Inbound.Source.Network == net.Network_UDP {
common.Must(b.WriteByte(byte(f.Inbound.Source.Network - 1)))
if err := addrParser.WriteAddressPort(b, f.Inbound.Source.Address, f.Inbound.Source.Port); err != nil {
return err
}
if f.Inbound.Local.Network == net.Network_TCP || f.Inbound.Local.Network == net.Network_UDP {
common.Must(b.WriteByte(byte(f.Inbound.Local.Network - 1)))
if err := addrParser.WriteAddressPort(b, f.Inbound.Local.Address, f.Inbound.Local.Port); err != nil {
return err
}
}
}
} else if b.UDP != nil { // make sure it's user's proxy request
if b.UDP != nil { // make sure it's user's proxy request
b.Write(f.GlobalID[:]) // no need to check whether it's empty
}
} else if b.UDP != nil {
@@ -111,7 +97,7 @@ func (f FrameMetadata) WriteTo(b *buf.Buffer) error {
}
// Unmarshal reads FrameMetadata from the given reader.
func (f *FrameMetadata) Unmarshal(reader io.Reader, readSourceAndLocal bool) error {
func (f *FrameMetadata) Unmarshal(reader io.Reader) error {
metaLen, err := serial.ReadUint16(reader)
if err != nil {
return err
@@ -126,12 +112,12 @@ func (f *FrameMetadata) Unmarshal(reader io.Reader, readSourceAndLocal bool) err
if _, err := b.ReadFullFrom(reader, int32(metaLen)); err != nil {
return err
}
return f.UnmarshalFromBuffer(b, readSourceAndLocal)
return f.UnmarshalFromBuffer(b)
}
// UnmarshalFromBuffer reads a FrameMetadata from the given buffer.
// Visible for testing only.
func (f *FrameMetadata) UnmarshalFromBuffer(b *buf.Buffer, readSourceAndLocal bool) error {
func (f *FrameMetadata) UnmarshalFromBuffer(b *buf.Buffer) error {
if b.Len() < 4 {
return errors.New("insufficient buffer: ", b.Len())
}
@@ -164,54 +150,6 @@ func (f *FrameMetadata) UnmarshalFromBuffer(b *buf.Buffer, readSourceAndLocal bo
}
}
if f.SessionStatus == SessionStatusNew && readSourceAndLocal {
f.Inbound = &session.Inbound{}
if b.Len() == 0 {
return nil // for heartbeat, etc.
}
network := TargetNetwork(b.Byte(0))
if network == 0 {
return nil // may be padding
}
b.Advance(1)
addr, port, err := addrParser.ReadAddressPort(nil, b)
if err != nil {
return errors.New("reading source: failed to parse address and port").Base(err)
}
switch network {
case TargetNetworkTCP:
f.Inbound.Source = net.TCPDestination(addr, port)
case TargetNetworkUDP:
f.Inbound.Source = net.UDPDestination(addr, port)
default:
return errors.New("reading source: unknown network type: ", network)
}
if b.Len() == 0 {
return nil
}
network = TargetNetwork(b.Byte(0))
if network == 0 {
return nil
}
b.Advance(1)
addr, port, err = addrParser.ReadAddressPort(nil, b)
if err != nil {
return errors.New("reading local: failed to parse address and port").Base(err)
}
switch network {
case TargetNetworkTCP:
f.Inbound.Local = net.TCPDestination(addr, port)
case TargetNetworkUDP:
f.Inbound.Local = net.UDPDestination(addr, port)
default:
return errors.New("reading local: unknown network type: ", network)
}
return nil
}
// Application data is essential, to test whether the pipe is closed.
if f.SessionStatus == SessionStatusNew && f.Option.Has(OptionData) &&
f.Target.Network == net.Network_UDP && b.Len() >= 8 {

View File

@@ -10,7 +10,6 @@ import (
. "github.com/xtls/xray-core/common/mux"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/transport/pipe"
)
@@ -33,13 +32,13 @@ func TestReaderWriter(t *testing.T) {
pReader, pWriter := pipe.New(pipe.WithSizeLimit(1024))
dest := net.TCPDestination(net.DomainAddress("example.com"), 80)
writer := NewWriter(1, dest, pWriter, protocol.TransferTypeStream, [8]byte{}, &session.Inbound{})
writer := NewWriter(1, dest, pWriter, protocol.TransferTypeStream, [8]byte{})
dest2 := net.TCPDestination(net.LocalHostIP, 443)
writer2 := NewWriter(2, dest2, pWriter, protocol.TransferTypeStream, [8]byte{}, &session.Inbound{})
writer2 := NewWriter(2, dest2, pWriter, protocol.TransferTypeStream, [8]byte{})
dest3 := net.TCPDestination(net.LocalHostIPv6, 18374)
writer3 := NewWriter(3, dest3, pWriter, protocol.TransferTypeStream, [8]byte{}, &session.Inbound{})
writer3 := NewWriter(3, dest3, pWriter, protocol.TransferTypeStream, [8]byte{})
writePayload := func(writer *Writer, payload ...byte) error {
b := buf.New()
@@ -63,7 +62,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
common.Must(meta.Unmarshal(bytesReader, false))
common.Must(meta.Unmarshal(bytesReader))
if r := cmp.Diff(meta, FrameMetadata{
SessionID: 1,
SessionStatus: SessionStatusNew,
@@ -82,7 +81,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
common.Must(meta.Unmarshal(bytesReader, false))
common.Must(meta.Unmarshal(bytesReader))
if r := cmp.Diff(meta, FrameMetadata{
SessionStatus: SessionStatusNew,
SessionID: 2,
@@ -95,7 +94,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
common.Must(meta.Unmarshal(bytesReader, false))
common.Must(meta.Unmarshal(bytesReader))
if r := cmp.Diff(meta, FrameMetadata{
SessionID: 1,
SessionStatus: SessionStatusKeep,
@@ -113,7 +112,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
common.Must(meta.Unmarshal(bytesReader, false))
common.Must(meta.Unmarshal(bytesReader))
if r := cmp.Diff(meta, FrameMetadata{
SessionID: 3,
SessionStatus: SessionStatusNew,
@@ -132,7 +131,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
common.Must(meta.Unmarshal(bytesReader, false))
common.Must(meta.Unmarshal(bytesReader))
if r := cmp.Diff(meta, FrameMetadata{
SessionID: 1,
SessionStatus: SessionStatusEnd,
@@ -144,7 +143,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
common.Must(meta.Unmarshal(bytesReader, false))
common.Must(meta.Unmarshal(bytesReader))
if r := cmp.Diff(meta, FrameMetadata{
SessionID: 3,
SessionStatus: SessionStatusEnd,
@@ -156,7 +155,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
common.Must(meta.Unmarshal(bytesReader, false))
common.Must(meta.Unmarshal(bytesReader))
if r := cmp.Diff(meta, FrameMetadata{
SessionID: 2,
SessionStatus: SessionStatusKeep,
@@ -174,7 +173,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
common.Must(meta.Unmarshal(bytesReader, false))
common.Must(meta.Unmarshal(bytesReader))
if r := cmp.Diff(meta, FrameMetadata{
SessionID: 2,
SessionStatus: SessionStatusEnd,
@@ -188,7 +187,7 @@ func TestReaderWriter(t *testing.T) {
{
var meta FrameMetadata
err := meta.Unmarshal(bytesReader, false)
err := meta.Unmarshal(bytesReader)
if err == nil {
t.Error("nil error")
}

View File

@@ -5,7 +5,6 @@ import (
"io"
"time"
"github.com/xtls/xray-core/app/dispatcher"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
@@ -13,8 +12,11 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session"
//"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/core"
//"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/pipe"
@@ -64,14 +66,43 @@ func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *t
if dest.Address != muxCoolAddress {
return s.dispatcher.DispatchLink(ctx, dest, link)
}
link = s.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link)
worker, err := NewServerWorker(ctx, s.dispatcher, link)
// For Mux, we need to use pipe to guard against multiple sub-connections writing back responses at the same time
// sessionPolicy = h.policyManager.ForLevel(request.User.Level)
// ctx, cancel := context.WithCancel(ctx)
// timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
// ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
opts := pipe.OptionsFromContext(ctx)
uplinkReader, uplinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...)
_, err := NewServerWorker(ctx, s.dispatcher, &transport.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
})
if err != nil {
return err
}
select {
case <-ctx.Done():
case <-worker.done.Wait():
inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
requestDone := func() error {
//defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
if err := buf.Copy(link.Reader, inboundLink.Writer); err != nil {
return errors.New("failed to transfer request").Base(err)
}
return nil
}
responseDone := func() error {
//defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
if err := buf.Copy(inboundLink.Reader, link.Writer); err != nil {
return err
}
return nil
}
requestDonePost := task.OnSuccess(requestDone, task.Close(inboundLink.Writer))
if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
common.Interrupt(inboundLink.Reader)
common.Interrupt(inboundLink.Writer)
return errors.New("connection ends").Base(err)
}
return nil
}
@@ -166,14 +197,6 @@ func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.Bu
func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
ctx = session.SubContextFromMuxInbound(ctx)
if meta.Inbound != nil && meta.Inbound.Source.IsValid() && meta.Inbound.Local.IsValid() {
if inbound := session.InboundFromContext(ctx); inbound != nil {
newInbound := *inbound
newInbound.Source = meta.Inbound.Source
newInbound.Local = meta.Inbound.Local
ctx = session.ContextWithInbound(ctx, &newInbound)
}
}
errors.LogInfo(ctx, "received request for ", meta.Target)
{
msg := &log.AccessMessage{
@@ -337,7 +360,7 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error {
var meta FrameMetadata
err := meta.Unmarshal(reader, session.IsReverseMuxFromContext(ctx))
err := meta.Unmarshal(reader)
if err != nil {
return errors.New("failed to read metadata").Base(err)
}
@@ -348,7 +371,7 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead
case SessionStatusEnd:
err = w.handleStatusEnd(&meta, reader)
case SessionStatusNew:
err = w.handleStatusNew(session.ContextWithIsReverseMux(ctx, false), &meta, reader)
err = w.handleStatusNew(ctx, &meta, reader)
case SessionStatusKeep:
err = w.handleStatusKeep(&meta, reader)
default:

View File

@@ -6,7 +6,6 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/serial"
"github.com/xtls/xray-core/common/session"
)
type Writer struct {
@@ -17,10 +16,9 @@ type Writer struct {
hasError bool
transferType protocol.TransferType
globalID [8]byte
inbound *session.Inbound
}
func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType, globalID [8]byte, inbound *session.Inbound) *Writer {
func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType, globalID [8]byte) *Writer {
return &Writer{
id: id,
dest: dest,
@@ -28,7 +26,6 @@ func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType
followup: false,
transferType: transferType,
globalID: globalID,
inbound: inbound,
}
}
@@ -46,7 +43,6 @@ func (w *Writer) getNextFrameMeta() FrameMetadata {
SessionID: w.id,
Target: w.dest,
GlobalID: w.globalID,
Inbound: w.inbound,
}
if w.followup {

View File

@@ -73,7 +73,7 @@ type ResponseHeader struct {
var (
// Keep in sync with crypto/tls/cipher_suites.go.
hasGCMAsmAMD64 = cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ && cpu.X86.HasSSE41 && cpu.X86.HasSSSE3
hasGCMAsmARM64 = (cpu.ARM64.HasAES && cpu.ARM64.HasPMULL) || (runtime.GOOS == "darwin" && runtime.GOARCH == "arm64")
hasGCMAsmARM64 = cpu.ARM64.HasAES && cpu.ARM64.HasPMULL
hasGCMAsmS390X = cpu.S390X.HasAES && cpu.S390X.HasAESCTR && cpu.S390X.HasGHASH
hasGCMAsmPPC64 = runtime.GOARCH == "ppc64" || runtime.GOARCH == "ppc64le"

View File

@@ -17,13 +17,13 @@ const (
inboundSessionKey ctx.SessionKey = 1
outboundSessionKey ctx.SessionKey = 2
contentSessionKey ctx.SessionKey = 3
isReverseMuxKey ctx.SessionKey = 4 // is reverse mux
muxPreferredSessionKey ctx.SessionKey = 4 // unused
sockoptSessionKey ctx.SessionKey = 5 // used by dokodemo to only receive sockopt.Mark
trackedConnectionErrorKey ctx.SessionKey = 6 // used by observer to get outbound error
dispatcherKey ctx.SessionKey = 7 // used by ss2022 inbounds to get dispatcher
timeoutOnlyKey ctx.SessionKey = 8 // mux context's child contexts to only cancel when its own traffic times out
allowedNetworkKey ctx.SessionKey = 9 // muxcool server control incoming request tcp/udp
fullHandlerKey ctx.SessionKey = 10 // outbound gets full handler
handlerSessionKey ctx.SessionKey = 10 // outbound gets full handler
mitmAlpn11Key ctx.SessionKey = 11 // used by TLS dialer
mitmServerNameKey ctx.SessionKey = 12 // used by TLS dialer
)
@@ -75,21 +75,25 @@ func ContentFromContext(ctx context.Context) *Content {
return nil
}
func ContextWithIsReverseMux(ctx context.Context, isReverseMux bool) context.Context {
return context.WithValue(ctx, isReverseMuxKey, isReverseMux)
// ContextWithMuxPreferred returns a new context with the given bool
func ContextWithMuxPreferred(ctx context.Context, forced bool) context.Context {
return context.WithValue(ctx, muxPreferredSessionKey, forced)
}
func IsReverseMuxFromContext(ctx context.Context) bool {
if val, ok := ctx.Value(isReverseMuxKey).(bool); ok {
// MuxPreferredFromContext returns value in this context, or false if not contained.
func MuxPreferredFromContext(ctx context.Context) bool {
if val, ok := ctx.Value(muxPreferredSessionKey).(bool); ok {
return val
}
return false
}
// ContextWithSockopt returns a new context with Socket configs included
func ContextWithSockopt(ctx context.Context, s *Sockopt) context.Context {
return context.WithValue(ctx, sockoptSessionKey, s)
}
// SockoptFromContext returns Socket configs in this context, or nil if not contained.
func SockoptFromContext(ctx context.Context) *Sockopt {
if sockopt, ok := ctx.Value(sockoptSessionKey).(*Sockopt); ok {
return sockopt
@@ -160,12 +164,12 @@ func AllowedNetworkFromContext(ctx context.Context) net.Network {
return net.Network_Unknown
}
func ContextWithFullHandler(ctx context.Context, handler outbound.Handler) context.Context {
return context.WithValue(ctx, fullHandlerKey, handler)
func ContextWithHandler(ctx context.Context, handler outbound.Handler) context.Context {
return context.WithValue(ctx, handlerSessionKey, handler)
}
func FullHandlerFromContext(ctx context.Context) outbound.Handler {
if val, ok := ctx.Value(fullHandlerKey).(outbound.Handler); ok {
func HandlerFromContext(ctx context.Context) outbound.Handler {
if val, ok := ctx.Value(handlerSessionKey).(outbound.Handler); ok {
return val
}
return nil

View File

@@ -4,10 +4,8 @@ import (
"context"
"io"
"net"
"time"
"github.com/sagernet/sing/common/bufio"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/transport"
)
@@ -35,26 +33,8 @@ func (w *PipeConnWrapper) Close() error {
return nil
}
// This Read implemented a timeout to avoid goroutine leak.
// as a temporarily solution
func (w *PipeConnWrapper) Read(b []byte) (n int, err error) {
type readResult struct {
n int
err error
}
c := make(chan readResult, 1)
go func() {
n, err := w.R.Read(b)
c <- readResult{n: n, err: err}
}()
select {
case result := <-c:
return result.n, result.err
case <-time.After(300 * time.Second):
common.Close(w.R)
common.Interrupt(w.R)
return 0, buf.ErrReadTimeout
}
return w.R.Read(b)
}
func (w *PipeConnWrapper) Write(p []byte) (n int, err error) {

View File

@@ -18,8 +18,8 @@ import (
var (
Version_x byte = 25
Version_y byte = 10
Version_z byte = 15
Version_y byte = 9
Version_z byte = 11
)
var (

View File

@@ -68,6 +68,5 @@ func RCodeFromError(err error) uint16 {
if r, ok := cause.(RCodeError); ok {
return uint16(r)
}
// unknown error
return 2
return 0
}

View File

@@ -12,19 +12,14 @@ import (
// ResolvableContext is an implementation of routing.Context, with domain resolving capability.
type ResolvableContext struct {
routing.Context
dnsClient dns.Client
cacheIPs []net.IP
hasError bool
dnsClient dns.Client
resolvedIPs []net.IP
}
// GetTargetIPs overrides original routing.Context's implementation.
func (ctx *ResolvableContext) GetTargetIPs() []net.IP {
if len(ctx.cacheIPs) > 0 {
return ctx.cacheIPs
}
if ctx.hasError {
return nil
if len(ctx.resolvedIPs) > 0 {
return ctx.resolvedIPs
}
if domain := ctx.GetTargetDomain(); len(domain) != 0 {
@@ -34,18 +29,16 @@ func (ctx *ResolvableContext) GetTargetIPs() []net.IP {
FakeEnable: false,
})
if err == nil {
ctx.cacheIPs = ips
ctx.resolvedIPs = ips
return ips
}
errors.LogInfoInner(context.Background(), err, "resolve ip for ", domain)
}
if ips := ctx.Context.GetTargetIPs(); len(ips) != 0 {
ctx.cacheIPs = ips
return ips
}
ctx.hasError = true
return nil
}

25
go.mod
View File

@@ -11,23 +11,23 @@ require (
github.com/miekg/dns v1.1.68
github.com/pelletier/go-toml v1.9.5
github.com/pires/go-proxyproto v0.8.1
github.com/quic-go/quic-go v0.55.0
github.com/refraction-networking/utls v1.8.1
github.com/quic-go/quic-go v0.54.0
github.com/refraction-networking/utls v1.8.0
github.com/sagernet/sing v0.5.1
github.com/sagernet/sing-shadowsocks v0.2.7
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771
github.com/stretchr/testify v1.11.1
github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e
github.com/vishvananda/netlink v1.3.1
github.com/xtls/reality v0.0.0-20251014195629-e4eec4520535
github.com/xtls/reality v0.0.0-20250904214705-431b6ff8c67c
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba
golang.org/x/crypto v0.43.0
golang.org/x/net v0.46.0
golang.org/x/crypto v0.42.0
golang.org/x/net v0.44.0
golang.org/x/sync v0.17.0
golang.org/x/sys v0.37.0
golang.org/x/sys v0.36.0
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173
google.golang.org/grpc v1.76.0
google.golang.org/protobuf v1.36.10
google.golang.org/grpc v1.75.1
google.golang.org/protobuf v1.36.9
gvisor.dev/gvisor v0.0.0-20250428193742-2d800c3129d5
h12.io/socks v1.0.3
lukechampine.com/blake3 v1.4.1
@@ -46,12 +46,13 @@ require (
github.com/quic-go/qpack v0.5.1 // indirect
github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 // indirect
github.com/vishvananda/netns v0.0.5 // indirect
golang.org/x/mod v0.28.0 // indirect
golang.org/x/text v0.30.0 // indirect
go.uber.org/mock v0.5.0 // indirect
golang.org/x/mod v0.27.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/tools v0.37.0 // indirect
golang.org/x/tools v0.36.0 // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

52
go.sum
View File

@@ -51,10 +51,10 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.55.0 h1:zccPQIqYCXDt5NmcEabyYvOnomjs8Tlwl7tISjJh9Mk=
github.com/quic-go/quic-go v0.55.0/go.mod h1:DR51ilwU1uE164KuWXhinFcKWGlEjzys2l8zUl5Ss1U=
github.com/refraction-networking/utls v1.8.1 h1:yNY1kapmQU8JeM1sSw2H2asfTIwWxIkrMJI0pRUOCAo=
github.com/refraction-networking/utls v1.8.1/go.mod h1:jkSOEkLqn+S/jtpEHPOsVv/4V4EVnelwbMQl4vCWXAM=
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
github.com/refraction-networking/utls v1.8.0 h1:L38krhiTAyj9EeiQQa2sg+hYb4qwLCqdMcpZrRfbONE=
github.com/refraction-networking/utls v1.8.0/go.mod h1:jkSOEkLqn+S/jtpEHPOsVv/4V4EVnelwbMQl4vCWXAM=
github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 h1:f/FNXud6gA3MNr8meMVVGxhp+QBTqY91tM8HjEuMjGg=
github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3/go.mod h1:HgjTstvQsPGkxUsCd2KWxErBblirPizecHcpD3ffK+s=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
@@ -75,8 +75,8 @@ github.com/vishvananda/netlink v1.3.1 h1:3AEMt62VKqz90r0tmNhog0r/PpWKmrEShJU0wJW
github.com/vishvananda/netlink v1.3.1/go.mod h1:ARtKouGSTGchR8aMwmkzC0qiNPrrWO5JS/XMVl45+b4=
github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY=
github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/xtls/reality v0.0.0-20251014195629-e4eec4520535 h1:nwobseOLLRtdbP6z7Z2aVI97u8ZptTgD1ofovhAKmeU=
github.com/xtls/reality v0.0.0-20251014195629-e4eec4520535/go.mod h1:vbHCV/3VWUvy1oKvTxxWJRPEWSeR1sYgQHIh6u/JiZQ=
github.com/xtls/reality v0.0.0-20250904214705-431b6ff8c67c h1:LHLhQY3mKXSpTcQAkjFR4/6ar3rXjQryNeM7khK3AHU=
github.com/xtls/reality v0.0.0-20250904214705-431b6ff8c67c/go.mod h1:XxvnCCgBee4WWE0bc4E+a7wbk8gkJ/rS0vNVNtC5qp0=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
@@ -90,22 +90,22 @@ go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFh
go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko=
go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o=
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba h1:0b9z3AuHCjxk0x/opv64kcgZLBseWJUpBw5I82+2U4M=
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba/go.mod h1:PLyyIXexvUFg3Owu6p/WfdlivPbZJsZdgWZlrGope/Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U=
golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI=
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
@@ -117,21 +117,21 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE=
golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -141,12 +141,12 @@ golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173 h1:/jFs0duh4rdb8uI
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI=
google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -289,8 +289,8 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
return nil, errors.New("maxConnections cannot be specified together with maxConcurrency")
}
if c.Xmux == (XmuxConfig{}) {
c.Xmux.MaxConcurrency.From = 1
c.Xmux.MaxConcurrency.To = 1
c.Xmux.MaxConcurrency.From = 16
c.Xmux.MaxConcurrency.To = 32
c.Xmux.HMaxRequestTimes.From = 600
c.Xmux.HMaxRequestTimes.To = 900
c.Xmux.HMaxReusableSecs.From = 1800

View File

@@ -2,8 +2,6 @@ package external
import (
"bytes"
"context"
"net"
"io"
"net/http"
"net/url"
@@ -20,9 +18,6 @@ import (
func ConfigLoader(arg string) (out io.Reader, err error) {
var data []byte
switch {
case strings.HasPrefix(arg, "http+unix://"):
data, err = FetchUnixSocketHTTPContent(arg)
case strings.HasPrefix(arg, "http://"), strings.HasPrefix(arg, "https://"):
data, err = FetchHTTPContent(arg)
@@ -75,60 +70,6 @@ func FetchHTTPContent(target string) ([]byte, error) {
return content, nil
}
// Format: http+unix:///path/to/socket.sock/api/endpoint
func FetchUnixSocketHTTPContent(target string) ([]byte, error) {
path := strings.TrimPrefix(target, "http+unix://")
if !strings.HasPrefix(path, "/") {
return nil, errors.New("unix socket path must be absolute")
}
var socketPath, httpPath string
sockIdx := strings.Index(path, ".sock")
if sockIdx != -1 {
socketPath = path[:sockIdx+5]
httpPath = path[sockIdx+5:]
if httpPath == "" {
httpPath = "/"
}
} else {
return nil, errors.New("cannot determine socket path, socket file should have .sock extension")
}
if _, err := os.Stat(socketPath); err != nil {
return nil, errors.New("socket file not found: ", socketPath).Base(err)
}
client := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "unix", socketPath)
},
},
}
defer client.CloseIdleConnections()
resp, err := client.Get("http://localhost" + httpPath)
if err != nil {
return nil, errors.New("failed to fetch from unix socket: ", socketPath).Base(err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, errors.New("unexpected HTTP status code: ", resp.StatusCode)
}
content, err := buf.ReadAllToBytes(resp.Body)
if err != nil {
return nil, errors.New("failed to read response").Base(err)
}
return content, nil
}
func ExtConfigLoader(files []string, reader io.Reader) (io.Reader, error) {
buf, err := ctlcmd.Run(append([]string{"convert"}, files...), reader)
if err != nil {

View File

@@ -666,7 +666,7 @@ func (r *Reverse) Dispatch(ctx context.Context, link *transport.Link) {
link.Reader = &buf.EndpointOverrideReader{Reader: link.Reader, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address}
link.Writer = &buf.EndpointOverrideWriter{Writer: link.Writer, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address}
}
r.client.Dispatch(session.ContextWithIsReverseMux(ctx, true), link)
r.client.Dispatch(ctx, link)
}
}

View File

@@ -89,11 +89,8 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
handler.reverse = &Reverse{
tag: a.Reverse.Tag,
dispatcher: v.GetFeature(routing.DispatcherType()).(routing.Dispatcher),
ctx: session.ContextWithInbound(ctx, &session.Inbound{
Tag: a.Reverse.Tag,
User: handler.server.User, // TODO: email
}),
handler: handler,
ctx: ctx,
handler: handler,
}
handler.reverse.monitorTask = &task.Periodic{
Execute: handler.reverse.monitor,
@@ -201,7 +198,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
}
case protocol.RequestCommandMux:
fallthrough // let server break Mux connections that contain TCP requests
case protocol.RequestCommandTCP, protocol.RequestCommandRvs:
case protocol.RequestCommandTCP:
var t reflect.Type
var p uintptr
if commonConn, ok := conn.(*encryption.CommonConn); ok {
@@ -226,8 +223,6 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
r, _ := t.FieldByName("rawInput")
input = (*bytes.Reader)(unsafe.Pointer(p + i.Offset))
rawInput = (*bytes.Buffer)(unsafe.Pointer(p + r.Offset))
default:
panic("unknown VLESS request command")
}
default:
ob.CanSpliceCopy = 3
@@ -400,7 +395,7 @@ func (r *Reverse) monitor() error {
Tag: r.tag,
Dispatcher: r.dispatcher,
}
worker, err := mux.NewServerWorker(session.ContextWithIsReverseMux(r.ctx, true), w, link1)
worker, err := mux.NewServerWorker(r.ctx, w, link1)
if err != nil {
errors.LogWarningInner(r.ctx, err, "failed to create mux server worker")
return nil
@@ -411,7 +406,7 @@ func (r *Reverse) monitor() error {
ctx := session.ContextWithOutbounds(r.ctx, []*session.Outbound{{
Target: net.Destination{Address: net.DomainAddress("v1.rvs.cool")},
}})
r.handler.Process(ctx, link2, session.FullHandlerFromContext(ctx).(*proxyman.Handler))
r.handler.Process(ctx, link2, session.HandlerFromContext(ctx).(*proxyman.Handler))
common.Interrupt(reader1)
common.Interrupt(reader2)
}()

View File

@@ -14,8 +14,6 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/core"
feature_inbound "github.com/xtls/xray-core/features/inbound"
@@ -23,6 +21,7 @@ import (
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/proxy/vmess"
"github.com/xtls/xray-core/proxy/vmess/encoding"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/internet/stat"
)
@@ -184,44 +183,6 @@ func (h *Handler) RemoveUser(ctx context.Context, email string) error {
return nil
}
func transferResponse(timer signal.ActivityUpdater, session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input buf.Reader, output *buf.BufferedWriter) error {
session.EncodeResponseHeader(response, output)
bodyWriter, err := session.EncodeResponseBody(request, output)
if err != nil {
return errors.New("failed to start decoding response").Base(err)
}
{
// Optimize for small response packet
data, err := input.ReadMultiBuffer()
if err != nil {
return err
}
if err := bodyWriter.WriteMultiBuffer(data); err != nil {
return err
}
}
if err := output.SetBuffered(false); err != nil {
return err
}
if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil {
return err
}
account := request.User.Account.(*vmess.MemoryAccount)
if request.Option.Has(protocol.RequestOptionChunkStream) && !account.NoTerminationSignal {
if err := bodyWriter.WriteMultiBuffer(buf.MultiBuffer{}); err != nil {
return err
}
}
return nil
}
// Process implements proxy.Inbound.Process().
func (h *Handler) Process(ctx context.Context, network net.Network, connection stat.Connection, dispatcher routing.Dispatcher) error {
sessionPolicy := h.policyManager.ForLevel(0)
@@ -275,49 +236,28 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
inbound.CanSpliceCopy = 3
inbound.User = request.User
sessionPolicy = h.policyManager.ForLevel(request.User.Level)
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
link, err := dispatcher.Dispatch(ctx, request.Destination())
bodyReader, err := svrSession.DecodeRequestBody(request, reader)
if err != nil {
return errors.New("failed to dispatch request to ", request.Destination()).Base(err)
return errors.New("failed to start decoding").Base(err)
}
requestDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
bodyReader, err := svrSession.DecodeRequestBody(request, reader)
if err != nil {
return errors.New("failed to start decoding").Base(err)
}
if err := buf.Copy(bodyReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
return errors.New("failed to transfer request").Base(err)
}
return nil
writer := buf.NewBufferedWriter(buf.NewWriter(connection))
response := &protocol.ResponseHeader{
Command: h.generateCommand(ctx, request),
}
responseDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
writer := buf.NewBufferedWriter(buf.NewWriter(connection))
defer writer.Flush()
response := &protocol.ResponseHeader{
Command: h.generateCommand(ctx, request),
}
return transferResponse(timer, svrSession, request, response, link.Reader, writer)
svrSession.EncodeResponseHeader(response, writer)
bodyWriter, err := svrSession.EncodeResponseBody(request, writer)
if err != nil {
return errors.New("failed to start decoding response").Base(err)
}
writer.SetFlushNext()
requestDonePost := task.OnSuccess(requestDone, task.Close(link.Writer))
if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
common.Interrupt(link.Reader)
common.Interrupt(link.Writer)
return errors.New("connection ends").Base(err)
if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{
Reader: bodyReader,
Writer: bodyWriter},
); err != nil {
return errors.New("failed to dispatch request").Base(err)
}
return nil
}

View File

@@ -121,6 +121,212 @@ func TestVless(t *testing.T) {
}
}
func TestVlessMuxTcp(t *testing.T) {
tcpServer := tcp.Server{
MsgProcessor: xor,
}
dest, err := tcpServer.Start()
common.Must(err)
defer tcpServer.Close()
userID := protocol.NewID(uuid.New())
serverPort := tcp.PickPort()
serverConfig := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&log.Config{
ErrorLogLevel: clog.Severity_Debug,
ErrorLogType: log.LogType_Console,
}),
},
Inbound: []*core.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(serverPort)}},
Listen: net.NewIPOrDomain(net.LocalHostIP),
}),
ProxySettings: serial.ToTypedMessage(&inbound.Config{
Clients: []*protocol.User{
{
Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(),
}),
},
},
}),
},
},
Outbound: []*core.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
},
},
}
clientPort := tcp.PickPort()
clientConfig := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&log.Config{
ErrorLogLevel: clog.Severity_Debug,
ErrorLogType: log.LogType_Console,
}),
},
Inbound: []*core.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(clientPort)}},
Listen: net.NewIPOrDomain(net.LocalHostIP),
}),
ProxySettings: serial.ToTypedMessage(&dokodemo.Config{
Address: net.NewIPOrDomain(dest.Address),
Port: uint32(dest.Port),
Networks: []net.Network{net.Network_TCP},
}),
},
},
Outbound: []*core.OutboundHandlerConfig{
{
SenderSettings: serial.ToTypedMessage(&proxyman.SenderConfig{
MultiplexSettings: &proxyman.MultiplexingConfig{
Enabled: true,
Concurrency: 4,
},
}),
ProxySettings: serial.ToTypedMessage(&outbound.Config{
Vnext: &protocol.ServerEndpoint{
Address: net.NewIPOrDomain(net.LocalHostIP),
Port: uint32(serverPort),
User: &protocol.User{
Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(),
}),
},
},
}),
},
},
}
servers, err := InitializeServerConfigs(serverConfig, clientConfig)
common.Must(err)
defer CloseAllServers(servers)
for range "abcd" {
var errg errgroup.Group
for range 3 {
errg.Go(testTCPConn(clientPort, 10240, time.Second*20))
}
if err := errg.Wait(); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}
}
func TestVlessEncMuxTcp(t *testing.T) {
tcpServer := tcp.Server{
MsgProcessor: xor,
}
dest, err := tcpServer.Start()
common.Must(err)
defer tcpServer.Close()
userID := protocol.NewID(uuid.New())
serverPort := tcp.PickPort()
serverConfig := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&log.Config{
ErrorLogLevel: clog.Severity_Debug,
ErrorLogType: log.LogType_Console,
}),
},
Inbound: []*core.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(serverPort)}},
Listen: net.NewIPOrDomain(net.LocalHostIP),
}),
ProxySettings: serial.ToTypedMessage(&inbound.Config{
Clients: []*protocol.User{
{
Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(),
}),
},
},
SecondsFrom: 600, //mlkem768x25519plus.native.600s.
Decryption: "Gzh5Aa3Ibo3343XFC7V2a8ucOpFeGjOL6jMlBZAfjqyty2rdRms8xccBAm68imYw2q96gg2dcueeL2r7n_2YzQ",
}),
},
},
Outbound: []*core.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
},
},
}
clientPort := tcp.PickPort()
clientConfig := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&log.Config{
ErrorLogLevel: clog.Severity_Debug,
ErrorLogType: log.LogType_Console,
}),
},
Inbound: []*core.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(clientPort)}},
Listen: net.NewIPOrDomain(net.LocalHostIP),
}),
ProxySettings: serial.ToTypedMessage(&dokodemo.Config{
Address: net.NewIPOrDomain(dest.Address),
Port: uint32(dest.Port),
Networks: []net.Network{net.Network_TCP},
}),
},
},
Outbound: []*core.OutboundHandlerConfig{
{
SenderSettings: serial.ToTypedMessage(&proxyman.SenderConfig{
MultiplexSettings: &proxyman.MultiplexingConfig{
Enabled: true,
Concurrency: 4,
},
}),
ProxySettings: serial.ToTypedMessage(&outbound.Config{
Vnext: &protocol.ServerEndpoint{
Address: net.NewIPOrDomain(net.LocalHostIP),
Port: uint32(serverPort),
User: &protocol.User{
Account: serial.ToTypedMessage(&vless.Account{
Id: userID.String(),
Seconds: 1, //mlkem768x25519plus.native.0rtt.
Encryption: "ExaMB4tIHpFikMeZwAJ8_8hxpZNi3gY13Ft455yC04xiCWgWUwMvKUwDQVm8zLcE8EKnjVlhRDmkTzMzvTMZyYlswCuqx0YK9kVNNFcrQJWD8JpAmTN8fffApIoWitDEAUTEp9S_Ehxo-9a2evRyKqJcQ6WmPiiyGbZrnNAfLKhdRsA15rZt6eKMVQExtDpucfaFc2E4-GtKzKd7P0I6bXccC1q4gqyZcXiEfOmmWBTPMTkNPEUdnQVsPiSWgJxslQZ5pYlPE7GQE7qoxYBItDMhkHZ4l0YwsvgZ1EQ2yTEn9DOxbyMihLk4kSAtg1IrW7tCTNkhyVsUY3SeyReB2sfN2AU-TXmVGUJMTKJ1jfywu8JIb9lG14HB1Rku6nVNcIMTzyshvsi_8AQFCSOcDdQ7ZpBxKxW7N1tKXBI0shq7vWdufjpYCjAVh-k_QgonVOwadYt-wPMxDntbWzEf_yC9eFQ6cBGd5smWNeSQZwAvqXw_WVPD56EVlaQ5HpsOkqBdy1Enr1NnH7WdgNsfk6RSQhRgW1dF9XBUKylpqsvOXkq3I0fLuuJFfuEZu4MeNvdgI2mbM_UxK8AzlRwkm7Eb1WQfm-S05HJefdZzu8kHYamggwtNQum_NtODzRgw3uWbjYbEBIY0j9IMhyGynOYQHHmrR2kT-dh08GwVD7BfsJRvFYgy2ZI8a3xGgHyi6MKKE8g7krEd-ne_4ddSaysgctaiiLwI4NVRbYJIT8XEbmKTIwoZx4R7m7AffYJo2NlfEPREg8stBcY5dAGXeSwD0pxs-jCJOeifQYq7Elq216SrwCmayLg3XJcpxutOmkhai6hRO6eBP6uy9XlLXyMt3TW6isx_rRt1hXCezkl_8hPEcqI9tPE0ZYVQ-eMh2_e35gQyPUw02aequ4ojaHV03QaSMquqF8RXG7k1gDed9vqex3aFaSN6UUNkebLKrqAiPmq0fccQ3qdbAxLGZ0ZFF5mIwEiFoTM6V4yPgntkRYtxcCKK-5YkPfsIunrM3EsWDCovp_Ahdfs-aqQLqzk1wVKTLQaQI5ApBlmGB3EauNdHFJBoeGZOF9e7QbGujhGRGMpS1fFtI2SqlcXINZU7YvR2JMfBrvBYZ9whXawM_Rg31IJR1raMGAEm6hNpa7SBD0cprIZxG6HKUQFMGHVlVohjwpWE5AGIc5Rc8Va2x8e3zFTMTUIwCdMz1XlNaqBMldJx01JQLwgSsnfGGlEJ_jYujvYNo0EBk4yev1Ap6nO-zSU-WtimlhEP0-cb22Q6e4wCEnWfO-lABJsrhwhrbloM51k5QVIefNyIvDWBszpRsreidUZVU4TOH2EoltYslWdPkcckfCplFLyvGKBItoAPRTOKRCjOsqlmj9OvpbDCzedZUmjLNfoLSwsPC7Nk2FpIkVUG6WxCE2YiU9LFrZIgWRKwUluM_at9w7wowRkujXEAQiJKtuUWQCxGyVbJtufLmQI6_yafmwgLoSlyE0cL-_Rf4nBCBjJnmyBDRvAoA-W08vw53uMt3RnFVwKFqo3PonmYAETv5rrMjh3L3K16QS-2EgL_R7WAFd0",
}),
},
},
}),
},
},
}
servers, err := InitializeServerConfigs(serverConfig, clientConfig)
common.Must(err)
defer CloseAllServers(servers)
for range "abcd" {
var errg errgroup.Group
for range 3 {
errg.Go(testTCPConn(clientPort, 10240, time.Second*20))
}
if err := errg.Wait(); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}
}
func TestVlessTls(t *testing.T) {
tcpServer := tcp.Server{
MsgProcessor: xor,

View File

@@ -1213,7 +1213,7 @@ func TestVMessGCMLengthAuthPlusNoTerminationSignal(t *testing.T) {
{
Account: serial.ToTypedMessage(&vmess.Account{
Id: userID.String(),
TestsEnabled: "AuthenticatedLength|NoTerminationSignal",
TestsEnabled: "AuthenticatedLength|",
}),
},
},

View File

@@ -75,7 +75,8 @@ func (c *UConn) HandshakeAddress() net.Address {
func (c *UConn) VerifyPeerCertificate(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
if c.Config.Show {
localAddr := c.LocalAddr().String()
fmt.Printf("REALITY localAddr: %v\tis using X25519MLKEM768 for TLS' communication: %v\n", localAddr, c.HandshakeState.ServerHello.ServerShare.Group == utls.X25519MLKEM768)
curveID := *(*utls.CurveID)(unsafe.Pointer(reflect.ValueOf(c).Elem().FieldByName("curveID").UnsafeAddr()))
fmt.Printf("REALITY localAddr: %v\tis using X25519MLKEM768 for TLS' communication: %v\n", localAddr, curveID == utls.X25519MLKEM768)
fmt.Printf("REALITY localAddr: %v\tis using ML-DSA-65 for cert's extra verification: %v\n", localAddr, len(c.Config.Mldsa65Verify) > 0)
}
p, _ := reflect.TypeOf(c.Conn).Elem().FieldByName("peerCertificates")