Files
Xray-core/app/proxyman/inbound/inbound.go
𐲓𐳛𐳪𐳂𐳐 𐲀𐳢𐳦𐳫𐳢 𐲥𐳔𐳛𐳪𐳌𐳑𐳖𐳇 fe57507fd9
Some checks failed
Scheduled assets update / geodat (push) Has been cancelled
Build and Release for Windows 7 / check-assets (push) Has been cancelled
Build and Release / check-assets (push) Has been cancelled
Test / check-assets (push) Has been cancelled
Build and Release for Windows 7 / build (win7-32, 386, windows) (push) Has been cancelled
Build and Release for Windows 7 / build (win7-64, amd64, windows) (push) Has been cancelled
Build and Release / build (386, freebsd, ) (push) Has been cancelled
Build and Release / build (386, linux, ) (push) Has been cancelled
Build and Release / build (386, openbsd, ) (push) Has been cancelled
Build and Release / build (386, windows, ) (push) Has been cancelled
Build and Release / build (amd64, android, android-amd64) (push) Has been cancelled
Build and Release / build (amd64, darwin, ) (push) Has been cancelled
Build and Release / build (amd64, freebsd, ) (push) Has been cancelled
Build and Release / build (amd64, linux, ) (push) Has been cancelled
Build and Release / build (amd64, openbsd, ) (push) Has been cancelled
Build and Release / build (amd64, windows, ) (push) Has been cancelled
Build and Release / build (arm, 5, linux) (push) Has been cancelled
Build and Release / build (arm, 6, linux) (push) Has been cancelled
Build and Release / build (arm, 7, freebsd) (push) Has been cancelled
Build and Release / build (arm, 7, linux) (push) Has been cancelled
Build and Release / build (arm, 7, openbsd) (push) Has been cancelled
Build and Release / build (arm, 7, windows) (push) Has been cancelled
Build and Release / build (arm64, android) (push) Has been cancelled
Build and Release / build (arm64, darwin) (push) Has been cancelled
Build and Release / build (arm64, freebsd) (push) Has been cancelled
Build and Release / build (arm64, linux) (push) Has been cancelled
Build and Release / build (arm64, openbsd) (push) Has been cancelled
Build and Release / build (arm64, windows) (push) Has been cancelled
Build and Release / build (loong64, linux) (push) Has been cancelled
Build and Release / build (mips, linux) (push) Has been cancelled
Build and Release / build (mips64, linux) (push) Has been cancelled
Build and Release / build (mips64le, linux) (push) Has been cancelled
Build and Release / build (mipsle, linux) (push) Has been cancelled
Build and Release / build (ppc64, linux) (push) Has been cancelled
Build and Release / build (ppc64le, linux) (push) Has been cancelled
Build and Release / build (riscv64, linux) (push) Has been cancelled
Build and Release / build (s390x, linux) (push) Has been cancelled
Test / test (macos-latest) (push) Has been cancelled
Test / test (ubuntu-latest) (push) Has been cancelled
Test / test (windows-latest) (push) Has been cancelled
Outbound: One endpoint and at most one user only (#5144)
https://github.com/XTLS/Xray-core/pull/5124#issuecomment-3281091009

Fixes https://github.com/XTLS/Xray-core/pull/5124#pullrequestreview-3218097421
2025-09-15 13:31:27 +00:00

192 lines
4.7 KiB
Go

package inbound
import (
"context"
"sync"
"github.com/xtls/xray-core/app/proxyman"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/serial"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/inbound"
)
// Manager manages all inbound handlers.
type Manager struct {
access sync.RWMutex
untaggedHandlers []inbound.Handler
taggedHandlers map[string]inbound.Handler
running bool
}
// New returns a new Manager for inbound handlers.
func New(ctx context.Context, config *proxyman.InboundConfig) (*Manager, error) {
m := &Manager{
taggedHandlers: make(map[string]inbound.Handler),
}
return m, nil
}
// Type implements common.HasType.
func (*Manager) Type() interface{} {
return inbound.ManagerType()
}
// AddHandler implements inbound.Manager.
func (m *Manager) AddHandler(ctx context.Context, handler inbound.Handler) error {
m.access.Lock()
defer m.access.Unlock()
tag := handler.Tag()
if len(tag) > 0 {
if _, found := m.taggedHandlers[tag]; found {
return errors.New("existing tag found: " + tag)
}
m.taggedHandlers[tag] = handler
} else {
m.untaggedHandlers = append(m.untaggedHandlers, handler)
}
if m.running {
return handler.Start()
}
return nil
}
// GetHandler implements inbound.Manager.
func (m *Manager) GetHandler(ctx context.Context, tag string) (inbound.Handler, error) {
m.access.RLock()
defer m.access.RUnlock()
handler, found := m.taggedHandlers[tag]
if !found {
return nil, errors.New("handler not found: ", tag)
}
return handler, nil
}
// RemoveHandler implements inbound.Manager.
func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
if tag == "" {
return common.ErrNoClue
}
m.access.Lock()
defer m.access.Unlock()
if handler, found := m.taggedHandlers[tag]; found {
if err := handler.Close(); err != nil {
errors.LogWarningInner(ctx, err, "failed to close handler ", tag)
}
delete(m.taggedHandlers, tag)
return nil
}
return common.ErrNoClue
}
// ListHandlers implements inbound.Manager.
func (m *Manager) ListHandlers(ctx context.Context) []inbound.Handler {
m.access.RLock()
defer m.access.RUnlock()
response := make([]inbound.Handler, len(m.untaggedHandlers))
copy(response, m.untaggedHandlers)
for _, v := range m.taggedHandlers {
response = append(response, v)
}
return response
}
// Start implements common.Runnable.
func (m *Manager) Start() error {
m.access.Lock()
defer m.access.Unlock()
m.running = true
for _, handler := range m.taggedHandlers {
if err := handler.Start(); err != nil {
return err
}
}
for _, handler := range m.untaggedHandlers {
if err := handler.Start(); err != nil {
return err
}
}
return nil
}
// Close implements common.Closable.
func (m *Manager) Close() error {
m.access.Lock()
defer m.access.Unlock()
m.running = false
var errs []interface{}
for _, handler := range m.taggedHandlers {
if err := handler.Close(); err != nil {
errs = append(errs, err)
}
}
for _, handler := range m.untaggedHandlers {
if err := handler.Close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.New("failed to close all handlers").Base(errors.New(serial.Concat(errs...)))
}
return nil
}
// NewHandler creates a new inbound.Handler based on the given config.
func NewHandler(ctx context.Context, config *core.InboundHandlerConfig) (inbound.Handler, error) {
rawReceiverSettings, err := config.ReceiverSettings.GetInstance()
if err != nil {
return nil, err
}
proxySettings, err := config.ProxySettings.GetInstance()
if err != nil {
return nil, err
}
tag := config.Tag
receiverSettings, ok := rawReceiverSettings.(*proxyman.ReceiverConfig)
if !ok {
return nil, errors.New("not a ReceiverConfig").AtError()
}
streamSettings := receiverSettings.StreamSettings
if streamSettings != nil && streamSettings.SocketSettings != nil {
ctx = session.ContextWithSockopt(ctx, &session.Sockopt{
Mark: streamSettings.SocketSettings.Mark,
})
}
if streamSettings != nil && streamSettings.ProtocolName == "splithttp" {
ctx = session.ContextWithAllowedNetwork(ctx, net.Network_UDP)
}
return NewAlwaysOnInboundHandler(ctx, tag, receiverSettings, proxySettings)
}
func init() {
common.Must(common.RegisterConfig((*proxyman.InboundConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
return New(ctx, config.(*proxyman.InboundConfig))
}))
common.Must(common.RegisterConfig((*core.InboundHandlerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
return NewHandler(ctx, config.(*core.InboundHandlerConfig))
}))
}