Documentation
¶
Index ¶
- Constants
- Variables
- func ChainErrors(errors ...error) error
- func CountErrorsBySeverity(errors []error) map[ErrorSeverity]int
- func FormatErrorSummary(errors []error) string
- func GetErrorCode(err error) string
- func HasCriticalErrors(errors []error) bool
- func IsTemporary(err error) bool
- func IsTimeout(err error) bool
- func PutBuffer(buf *PooledBuffer)
- func PutMessage(msg *PooledMessage)
- func PutMetrics(metrics *Metrics)
- func PutSessionMetrics(metrics *SessionMetrics)
- func PutTimer(timer *PooledTimer)
- func ResetPoolStats()
- func StatusToString(status uint32) string
- func TopicMatch(filter, topic string) bool
- type AdvancedRoute
- type AuthError
- type AuthProvider
- type BufferPool
- type CertWatcher
- type CompositeFilter
- type CompositeTransformer
- type ConnectHandler
- type ConnectLostHandler
- type ConnectProps
- type DefaultErrorHandler
- type DefaultLogger
- func (l *DefaultLogger) Debug(msg string, args ...interface{})
- func (l *DefaultLogger) Debugf(format string, args ...interface{})
- func (l *DefaultLogger) Error(msg string, args ...interface{})
- func (l *DefaultLogger) Errorf(format string, args ...interface{})
- func (l *DefaultLogger) Info(msg string, args ...interface{})
- func (l *DefaultLogger) Infof(format string, args ...interface{})
- func (l *DefaultLogger) Warn(msg string, args ...interface{})
- func (l *DefaultLogger) Warnf(format string, args ...interface{})
- type EnhancedTLSConfig
- func (c *EnhancedTLSConfig) BuildTLSConfig() (*tls.Config, error)
- func (c *EnhancedTLSConfig) GetCertExpiry() time.Time
- func (c *EnhancedTLSConfig) IsExpiringSoon() bool
- func (c *EnhancedTLSConfig) ReloadCertificates() error
- func (c *EnhancedTLSConfig) StartCertWatcher(logger Logger, reloadFunc func() error) error
- func (c *EnhancedTLSConfig) StopCertWatcher()
- func (c *EnhancedTLSConfig) WithAutoReload(autoReload bool, reloadInterval time.Duration) *EnhancedTLSConfig
- func (c *EnhancedTLSConfig) WithCACert(caFile, format string) *EnhancedTLSConfig
- func (c *EnhancedTLSConfig) WithCertExpireWarning(warning time.Duration) *EnhancedTLSConfig
- func (c *EnhancedTLSConfig) WithCipherSuites(cipherSuites []uint16) *EnhancedTLSConfig
- func (c *EnhancedTLSConfig) WithClientCert(certFile, keyFile, certFormat, keyFormat string) *EnhancedTLSConfig
- func (c *EnhancedTLSConfig) WithCurvePreferences(curves []tls.CurveID) *EnhancedTLSConfig
- func (c *EnhancedTLSConfig) WithPassword(certPassword, keyPassword string) *EnhancedTLSConfig
- func (c *EnhancedTLSConfig) WithTLSVersion(minVersion, maxVersion uint16) *EnhancedTLSConfig
- type ErrorCategory
- type ErrorHandler
- type ErrorInfo
- type ErrorMetrics
- type ErrorReporter
- type ErrorSeverity
- type ErrorType
- type Event
- type EventHandler
- type EventManager
- type FileStore
- type Forwarder
- type ForwarderBuilder
- func (fb *ForwarderBuilder) BufferSize(size int) *ForwarderBuilder
- func (fb *ForwarderBuilder) Build() (ForwarderConfig, error)
- func (fb *ForwarderBuilder) Disable() *ForwarderBuilder
- func (fb *ForwarderBuilder) Enable() *ForwarderBuilder
- func (fb *ForwarderBuilder) MapTopic(srcTopic, dstTopic string) *ForwarderBuilder
- func (fb *ForwarderBuilder) MustBuild() ForwarderConfig
- func (fb *ForwarderBuilder) QoS(qos byte) *ForwarderBuilder
- func (fb *ForwarderBuilder) Source(session string, topics ...string) *ForwarderBuilder
- func (fb *ForwarderBuilder) Target(session string) *ForwarderBuilder
- func (fb *ForwarderBuilder) TopicMapping(mappings map[string]string) *ForwarderBuilder
- type ForwarderConfig
- type ForwarderRegistry
- func (sfr *ForwarderRegistry) Get(name string) (*Forwarder, error)
- func (sfr *ForwarderRegistry) GetAllMetrics() map[string]map[string]interface{}
- func (sfr *ForwarderRegistry) List() []string
- func (sfr *ForwarderRegistry) Register(config ForwarderConfig) (*Forwarder, error)
- func (sfr *ForwarderRegistry) StopAll()
- func (sfr *ForwarderRegistry) Unregister(name string) error
- type GoRedisAdapter
- func (a *GoRedisAdapter) Close() error
- func (a *GoRedisAdapter) Del(ctx context.Context, keys ...string) error
- func (a *GoRedisAdapter) Get(ctx context.Context, key string) (string, error)
- func (a *GoRedisAdapter) Ping(ctx context.Context) error
- func (a *GoRedisAdapter) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
- type GoRedisOptions
- type HandlerRegistry
- func (h *HandlerRegistry) AddMessageHandler(topic string, handler MessageHandler)
- func (h *HandlerRegistry) ClearMessageHandlers()
- func (h *HandlerRegistry) CountMessageHandlers() int
- func (h *HandlerRegistry) GetAllTopics() []string
- func (h *HandlerRegistry) GetMessageHandler(topic string) (MessageHandler, bool)
- func (h *HandlerRegistry) RemoveMessageHandler(topic string)
- func (h *HandlerRegistry) SetConnectHandler(handler ConnectHandler)
- func (h *HandlerRegistry) SetConnectLostHandler(handler ConnectLostHandler)
- type InMemoryErrorReporter
- type JSONExporter
- type JWTAuthProvider
- type JsonPathTransformer
- type Logger
- type MQTT5Auth
- type MQTT5AuthManager
- type MQTT5Config
- type MQTT5Options
- type MQTTXError
- func FilterErrors(errors []error, errType ErrorType) []*MQTTXError
- func NewConfigError(message string, cause error) *MQTTXError
- func NewConnectionError(message string, cause error) *MQTTXError
- func NewError(errType ErrorType, severity ErrorSeverity, message string) *MQTTXError
- func NewForwarderError(message string, cause error) *MQTTXError
- func NewPublishError(topic string, cause error) *MQTTXError
- func NewSubscribeError(topic string, cause error) *MQTTXError
- func WrapError(err error, errType ErrorType, message string) *MQTTXError
- func (e *MQTTXError) Error() string
- func (e *MQTTXError) Is(target error) bool
- func (e *MQTTXError) IsCritical() bool
- func (e *MQTTXError) IsRecoverable() bool
- func (e *MQTTXError) Unwrap() error
- func (e *MQTTXError) WithContext(key string, value any) *MQTTXError
- func (e *MQTTXError) WithSession(session string) *MQTTXError
- func (e *MQTTXError) WithTopic(topic string) *MQTTXError
- type Manager
- func (m *Manager) AddSession(opts *Options) error
- func (m *Manager) ClearRecoveredErrors() int
- func (m *Manager) Close()
- func (m *Manager) DisconnectAll()
- func (m *Manager) GetActiveErrors() []*ErrorInfo
- func (m *Manager) GetAllSessionsStatus() map[string]string
- func (m *Manager) GetAllSessionsStatusCode() map[string]uint32
- func (m *Manager) GetErrorStats() map[string]interface{}
- func (m *Manager) GetMetrics() map[string]interface{}
- func (m *Manager) GetSession(name string) (*Session, error)
- func (m *Manager) Handle(topic string, handler func(*Message)) *Route
- func (m *Manager) HandleTo(session, topic string, handler func(*Message)) (*Route, error)
- func (m *Manager) HandleToWithConfig(config RouteConfig) (*AdvancedRoute, error)
- func (m *Manager) HandleWithConfig(config RouteConfig, handler func(*Message)) *AdvancedRoute
- func (m *Manager) ListSessions() []string
- func (m *Manager) Listen(topic string) (chan *Message, *Route)
- func (m *Manager) ListenTo(session, topic string) (chan *Message, *Route, error)
- func (m *Manager) ListenToWithConfig(config RouteConfig) (chan *Message, *AdvancedRoute, error)
- func (m *Manager) ListenWithConfig(config RouteConfig) (chan *Message, *AdvancedRoute)
- func (m *Manager) OnEvent(eventType string, handler func(event Event))
- func (m *Manager) PublishTo(sessionName, topic string, payload []byte, qos byte) error
- func (m *Manager) PublishToAll(topic string, payload []byte, qos byte) []error
- func (m *Manager) RegisterError(sessionName string, err error, category ErrorCategory) *ErrorInfo
- func (m *Manager) RemoveSession(name string) error
- func (m *Manager) SetLogger(logger Logger) Logger
- func (m *Manager) SetMaxRetries(category ErrorCategory, count int)
- func (m *Manager) SubscribeAll(topic string, handler MessageHandler, qos byte) []error
- func (m *Manager) SubscribeTo(sessionName, topic string, handler MessageHandler, qos byte) error
- func (m *Manager) UnsubscribeAll(topic string) []error
- func (m *Manager) UnsubscribeTo(name string, topic string) error
- func (m *Manager) WaitForAllSessions(timeout time.Duration) error
- func (m *Manager) WaitForSession(name string, timeout time.Duration) error
- type MemoryStore
- type Message
- type MessageFilter
- type MessageHandler
- type MessagePool
- type MessageTransformer
- type Metrics
- type MetricsExporter
- type MetricsPool
- type MockRedisClient
- func (m *MockRedisClient) Close() error
- func (m *MockRedisClient) Del(ctx context.Context, keys ...string) error
- func (m *MockRedisClient) Get(ctx context.Context, key string) (string, error)
- func (m *MockRedisClient) Ping(ctx context.Context) error
- func (m *MockRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
- func (m *MockRedisClient) WithDelError(err error) *MockRedisClient
- func (m *MockRedisClient) WithGetError(err error) *MockRedisClient
- func (m *MockRedisClient) WithSetError(err error) *MockRedisClient
- type OAuth2AuthProvider
- type Options
- func (o *Options) Clone() *Options
- func (o *Options) ConfigureTLS() (*tls.Config, error)
- func (o *Options) ConfigureTLSWithEnhanced() (*tls.Config, error)
- func (o *Options) Validate() error
- func (o *Options) WithAuth(username, password string) *Options
- func (o *Options) WithCleanSession(clean bool) *Options
- func (o *Options) WithEnhancedAuth(auth *MQTT5Auth) *Options
- func (o *Options) WithEnhancedTLS(config *EnhancedTLSConfig) *Options
- func (o *Options) WithMQTT5(config *MQTT5Config) *Options
- func (o *Options) WithPerformance(writeBuffer, readBuffer, chanSize int) *Options
- func (o *Options) WithPersistence(enabled bool) *Options
- func (o *Options) WithReconnect(autoReconnect bool, initialInterval, maxInterval int64, backoffFactor float64) *Options
- func (o *Options) WithRedisStorage(addr, username, password string, db int, keyPrefix string, ttl int64) *Options
- func (o *Options) WithSessionExpiry(expiry time.Duration) *Options
- func (o *Options) WithSharedSubscription(enable bool, prefix string) *Options
- func (o *Options) WithSimpleReconnect(autoReconnect bool, maxInterval int64) *Options
- func (o *Options) WithStorage(storeType StoreType, path string) *Options
- func (o *Options) WithTLS(caFile, certFile, keyFile string, skipVerify bool) *Options
- func (o *Options) WithTimeout(connect, write int64) *Options
- func (o *Options) WithUserProperty(key, value string) *Options
- type PayloadFilter
- type PayloadTransformer
- type PerformanceOptions
- type PlainAuthProvider
- type PoolStats
- type PooledBuffer
- type PooledMessage
- type PooledTimer
- type PrometheusExporter
- type RecoveryManager
- func (rm *RecoveryManager) ClearRecoveredErrors() int
- func (rm *RecoveryManager) GetActiveErrors() []*ErrorInfo
- func (rm *RecoveryManager) GetErrorStats() map[string]interface{}
- func (rm *RecoveryManager) MarkErrorRecovered(sessionName string, category ErrorCategory, err error) bool
- func (rm *RecoveryManager) RegisterError(sessionName string, err error, category ErrorCategory) *ErrorInfo
- func (rm *RecoveryManager) SetMaxRetries(category ErrorCategory, count int)
- type RecoveryStrategy
- type RedisClient
- type RedisOptions
- type RedisStore
- type RedisStoreOptions
- type RetryableError
- type Route
- type RouteConfig
- type RouteStats
- type SHA256AuthProvider
- type SecurityError
- type Session
- func (s *Session) Disconnect()
- func (s *Session) GetBrokers() []string
- func (s *Session) GetClientID() string
- func (s *Session) GetLastActivity() time.Time
- func (s *Session) GetMetrics() map[string]interface{}
- func (s *Session) GetName() string
- func (s *Session) GetOptions() *Options
- func (s *Session) GetSessionStatus() uint32
- func (s *Session) GetStatus() string
- func (s *Session) GetSubscribedTopics() []string
- func (s *Session) GetSubscriptionCount() int
- func (s *Session) IsConnected() bool
- func (s *Session) IsPersistent() bool
- func (s *Session) IsSubscribed(topic string) bool
- func (s *Session) PrometheusMetrics() string
- func (s *Session) Publish(topic string, payload []byte, qos byte) error
- func (s *Session) ResetMetrics()
- func (s *Session) String() string
- func (s *Session) Subscribe(topic string, handler MessageHandler, qos byte) error
- func (s *Session) SubscribeShared(group, topic string, handler MessageHandler, qos byte) error
- func (s *Session) Unsubscribe(topics ...string) error
- func (s *Session) UnsubscribeShared(group, topic string) error
- func (s *Session) UpdateLastActivity()
- func (s *Session) WithEnhancedAuth(provider AuthProvider) *Session
- type SessionBuilder
- func (b *SessionBuilder) Auth(username, password string) *SessionBuilder
- func (b *SessionBuilder) AutoReconnect() *SessionBuilder
- func (b *SessionBuilder) Broker(addr string) *SessionBuilder
- func (b *SessionBuilder) Brokers(addrs ...string) *SessionBuilder
- func (b *SessionBuilder) Build() (*Options, error)
- func (b *SessionBuilder) CleanSession(clean bool) *SessionBuilder
- func (b *SessionBuilder) ClientID(id string) *SessionBuilder
- func (b *SessionBuilder) DisableReconnect() *SessionBuilder
- func (b *SessionBuilder) FileStorage(path string) *SessionBuilder
- func (b *SessionBuilder) KeepAlive(seconds int) *SessionBuilder
- func (b *SessionBuilder) MessageChannelSize(size int) *SessionBuilder
- func (b *SessionBuilder) MustBuild() *Options
- func (b *SessionBuilder) Performance(bufferSizeKB, maxPendingMsgs int) *SessionBuilder
- func (b *SessionBuilder) Persistent() *SessionBuilder
- func (b *SessionBuilder) ReconnectConfig(initialSec, maxSec int, backoffFactor float64) *SessionBuilder
- func (b *SessionBuilder) RedisAuth(username, password string, db int) *SessionBuilder
- func (b *SessionBuilder) RedisStorage(addr string) *SessionBuilder
- func (b *SessionBuilder) Reset(name string) *SessionBuilder
- func (b *SessionBuilder) Subscribe(topic string, qos byte, handler MessageHandler) *SessionBuilder
- func (b *SessionBuilder) TLS(caFile, certFile, keyFile string, skipVerify bool) *SessionBuilder
- func (b *SessionBuilder) Timeouts(connectSec, writeSec int) *SessionBuilder
- func (b *SessionBuilder) Validate() []error
- type SessionError
- type SessionMetrics
- type SessionState
- type SessionStore
- type StorageOptions
- type StoreType
- type TLSConfig
- type TimerPool
- type TopicConfig
- type TopicFilter
- type TopicRewriteTransformer
- type TopicSubscription
- type ValidationErrors
Constants ¶
const ( MQTT311 = 4 // MQTT 3.1.1 MQTT50 = 5 // MQTT 5.0 )
MQTT协议版本常量
const ( PropPayloadFormatIndicator = 1 // 载荷格式指示器 PropMessageExpiryInterval = 2 // 消息过期间隔 PropContentType = 3 // 内容类型 PropResponseTopic = 8 // 响应主题 PropCorrelationData = 9 // 关联数据 PropSubscriptionIdentifier = 11 // 订阅标识符 PropSessionExpiryInterval = 17 // 会话过期间隔 PropAssignedClientIdentifier = 18 // 分配的客户端标识符 PropServerKeepAlive = 19 // 服务端保活时间 PropAuthenticationMethod = 21 // 认证方法 PropAuthenticationData = 22 // 认证数据 PropRequestProblemInformation = 23 // 请求问题信息 PropWillDelayInterval = 24 // 遗嘱延迟间隔 PropRequestResponseInformation = 25 // 请求响应信息 PropResponseInformation = 26 // 响应信息 PropServerReference = 28 // 服务端引用 PropReasonString = 31 // 原因字符串 PropReceiveMaximum = 33 // 接收最大值 PropTopicAliasMaximum = 34 // 主题别名最大值 PropTopicAlias = 35 // 主题别名 PropMaximumQoS = 36 // 最大QoS PropRetainAvailable = 37 // 保留可用 PropUserProperty = 38 // 用户属性 PropMaximumPacketSize = 39 // 最大报文大小 PropWildcardSubscriptionAvail = 40 // 通配符订阅可用 PropSubscriptionIdentifierAvail = 41 // 订阅标识符可用 )
MQTT 5.0 属性标识符常量
const ( ReasonSuccess = 0 // 成功 ReasonNormalDisconnection = 0 // 正常断开连接 ReasonGrantedQoS0 = 0 // 授予QoS 0 ReasonGrantedQoS1 = 1 // 授予QoS 1 ReasonGrantedQoS2 = 2 // 授予QoS 2 ReasonDisconnectWithWill = 4 // 带遗嘱消息断开连接 ReasonNoMatchingSubscribers = 16 // 没有匹配的订阅者 ReasonNoSubscriptionExisted = 17 // 没有存在的订阅 ReasonContinueAuthentication = 24 // 继续认证 ReasonReAuthenticate = 25 // 重新认证 ReasonUnspecifiedError = 128 // 未指定错误 ReasonMalformedPacket = 129 // 报文格式错误 ReasonProtocolError = 130 // 协议错误 ReasonImplementationSpecificError = 131 // 实现特定错误 ReasonUnsupportedProtocolVersion = 132 // 不支持的协议版本 ReasonClientIdentifierNotValid = 133 // 客户端标识符无效 ReasonBadUsernameOrPassword = 134 // 用户名或密码错误 ReasonNotAuthorized = 135 // 未授权 ReasonServerBusy = 137 // 服务端繁忙 ReasonBanned = 138 // 禁止 ReasonServerShuttingDown = 139 // 服务端关闭 ReasonBadAuthenticationMethod = 140 // 认证方法错误 ReasonKeepAliveTimeout = 141 // 保活超时 ReasonSessionTakenOver = 142 // 会话被接管 ReasonTopicFilterInvalid = 143 // 主题过滤器无效 ReasonTopicNameInvalid = 144 // 主题名无效 ReasonPacketIdentifierInUse = 145 // 报文标识符已使用 ReasonPacketIdentifierNotFound = 146 // 报文标识符未找到 ReasonReceiveMaximumExceeded = 147 // 超出接收最大值 ReasonTopicAliasInvalid = 148 // 主题别名无效 ReasonPacketTooLarge = 149 // 报文过大 ReasonMessageRateTooHigh = 150 // 消息速率过高 ReasonQuotaExceeded = 151 // 超出配额 ReasonAdministrativeAction = 152 // 管理操作 ReasonPayloadFormatInvalid = 153 // 载荷格式无效 ReasonRetainNotSupported = 154 // 不支持保留 ReasonQoSNotSupported = 155 // 不支持QoS ReasonUseAnotherServer = 156 // 使用另一个服务端 ReasonServerMoved = 157 // 服务端已移动 ReasonConnectionRateExceeded = 159 // 超出连接速率 ReasonMaximumConnectTime = 160 // 最大连接时间 ReasonSubscriptionIdentifiersNotSupported = 161 // 不支持订阅标识符 ReasonWildcardSubscriptionsNotSupported = 162 // 不支持通配符订阅 )
MQTT 5.0 原因码常量
const ( // 标准认证方法 AuthMethodPlain = "PLAIN" // 明文认证 AuthMethodSHA256 = "SHA-256" // SHA-256哈希认证 AuthMethodScramSHA1 = "SCRAM-SHA1" // SCRAM-SHA1认证 // OAuth 2.0认证 AuthMethodOAuth2 = "oauth2" // OAuth 2.0认证 // JWT认证 AuthMethodJWT = "jwt" // JWT认证 )
MQTT 5.0 认证方法常量
const ( AuthStateNone = iota // 未认证 AuthStateInProgress // 认证进行中 AuthStateSuccess // 认证成功 AuthStateFailed // 认证失败 )
MQTT 5.0 认证状态常量
const ( CertFormatPEM = "pem" // PEM格式证书 CertFormatDER = "der" // DER格式证书 CertFormatP12 = "p12" // PKCS#12格式证书 CertFormatJKS = "jks" // Java KeyStore格式 CertFormatPFX = "pfx" // PFX格式证书(PKCS#12的别名) )
证书格式类型
const ( DefaultKeepAlive = 60 // 默认保活时间(秒) DefaultConnectTimeout = 30 * time.Second // 默认连接超时时间 DefaultInitialReconnectInterval = 2 * time.Second // 默认初始重连间隔 DefaultMaxReconnectInterval = 120 * time.Second // 默认最大重连间隔 DefaultBackoffFactor = 1.5 // 默认退避因子 DefaultWriteTimeout = 30 * time.Second // 默认写超时时间 DefaultMessageChanSize = 100 // 默认消息通道大小 DefaultMaxMessageSize = 32 * 1024 // 默认最大消息大小(32KB) DefaultMaxPendingMessages = 1000 // 默认最大待处理消息数 DefaultWriteBufferSize = 4096 // 默认写缓冲区大小 DefaultReadBufferSize = 4096 // 默认读缓冲区大小 )
默认配置常量
const ( StatusDisconnected uint32 = 0 // 已断开连接 StatusConnecting uint32 = 1 // 正在连接 StatusConnected uint32 = 2 // 已连接 StatusReconnecting uint32 = 3 // 正在重连 StatusClosed uint32 = 4 // 已关闭 )
会话状态常量 - 供外部代码使用
const ( EventSessionConnecting = "session_connecting" // 会话正在连接 EventSessionConnected = "session_connected" // 会话已连接 EventSessionDisconnected = "session_disconnected" // 会话已断开 EventSessionReconnecting = "session_reconnecting" // 会话重连中 EventSessionAdded = "session_added" // 会话已添加 EventSessionRemoved = "session_removed" // 会话已移除 EventSessionReady = "session_ready" // 会话准备就绪(已成功连接) EventStateChanged = "session_state_changed" // 会话状态已改变 )
事件类型常量
Variables ¶
var ( ErrSessionNotFound = errors.New("session not found") ErrSessionExists = errors.New("session already exists") ErrNotConnected = errors.New("not connected") ErrSessionClosed = errors.New("session closed") ErrInvalidOptions = errors.New("invalid options") ErrInvalidBroker = errors.New("invalid broker address") ErrInvalidClientID = errors.New("invalid client ID") ErrInvalidTopic = errors.New("invalid topic") ErrTimeout = errors.New("operation timeout") ErrInvalidPayload = errors.New("invalid payload") ErrSubscribeFailed = errors.New("subscribe failed") ErrUnsubscribeFailed = errors.New("unsubscribe failed") ErrPublishFailed = errors.New("publish failed") ErrQoSNotSupported = errors.New("QoS level not supported") ErrStorageFailure = errors.New("storage operation failed") ErrInvalidConfig = errors.New("invalid configuration") ErrConnectionRefused = errors.New("connection refused") ErrAuthenticationFailed = errors.New("authentication failed") ErrForwarderFailed = errors.New("forwarder operation failed") )
基础错误定义
Functions ¶
func CountErrorsBySeverity ¶ added in v0.1.4
func CountErrorsBySeverity(errors []error) map[ErrorSeverity]int
CountErrorsBySeverity 按严重程度统计错误
func FormatErrorSummary ¶ added in v0.1.4
FormatErrorSummary 格式化错误摘要
func HasCriticalErrors ¶ added in v0.1.4
HasCriticalErrors 检查是否有严重错误
func PutSessionMetrics ¶ added in v0.1.4
func PutSessionMetrics(metrics *SessionMetrics)
PutSessionMetrics 归还会话指标对象
func StatusToString ¶ added in v0.1.4
StatusToString 将状态常量转换为状态字符串
func TopicMatch ¶ added in v0.1.4
TopicMatch 判断一个主题是否与主题过滤器匹配 支持MQTT的主题匹配规则: - "+" 匹配单个层级 - "#" 匹配多个层级,但必须在最后
Types ¶
type AdvancedRoute ¶ added in v0.1.4
type AdvancedRoute struct {
Route
// contains filtered or unexported fields
}
AdvancedRoute 高级路由配置 支持消息过滤和转换
func (*AdvancedRoute) AddFilter ¶ added in v0.1.4
func (r *AdvancedRoute) AddFilter(filter MessageFilter)
AddFilter 添加消息过滤器
func (*AdvancedRoute) AddTransformer ¶ added in v0.1.4
func (r *AdvancedRoute) AddTransformer(transformer MessageTransformer)
AddTransformer 添加消息转换器
type AuthProvider ¶ added in v0.1.4
type AuthProvider interface {
// 认证方法名称
Method() string
// 初始化认证过程
// 返回初始认证数据和可能的错误
Initialize() ([]byte, error)
// 处理认证挑战
// data为服务器返回的认证数据
// 返回响应数据和可能的错误
HandleChallenge(data []byte) ([]byte, error)
// 完成认证过程
// data为服务器返回的最终认证数据
// 返回是否认证成功和可能的错误
Complete(data []byte) (bool, error)
}
AuthProvider 认证提供者接口 用于实现不同的认证机制
type BufferPool ¶ added in v0.1.4
type BufferPool struct {
// contains filtered or unexported fields
}
BufferPool 字节缓冲区对象池
type CertWatcher ¶ added in v0.1.4
type CertWatcher struct {
// contains filtered or unexported fields
}
CertWatcher 证书监视器 用于监控证书文件变化和过期时间
type CompositeFilter ¶ added in v0.1.4
type CompositeFilter struct {
// contains filtered or unexported fields
}
CompositeFilter 组合过滤器,可以组合多个过滤器形成复杂的过滤规则
func NewAndFilter ¶ added in v0.1.4
func NewAndFilter(filters ...MessageFilter) *CompositeFilter
NewAndFilter 创建一个AND复合过滤器,所有子过滤器都必须匹配
func NewOrFilter ¶ added in v0.1.4
func NewOrFilter(filters ...MessageFilter) *CompositeFilter
NewOrFilter 创建一个OR复合过滤器,任一子过滤器匹配即可
func (*CompositeFilter) GetDescription ¶ added in v0.1.4
func (f *CompositeFilter) GetDescription() string
GetDescription 获取过滤器的描述
func (*CompositeFilter) Match ¶ added in v0.1.4
func (f *CompositeFilter) Match(message *Message) bool
Match 判断消息是否匹配复合过滤规则
type CompositeTransformer ¶ added in v0.1.4
type CompositeTransformer struct {
// contains filtered or unexported fields
}
CompositeTransformer 组合转换器 可以将多个转换器组合在一起,按顺序应用
func NewCompositeTransformer ¶ added in v0.1.4
func NewCompositeTransformer(description string, transformers ...MessageTransformer) *CompositeTransformer
NewCompositeTransformer 创建组合转换器
func (*CompositeTransformer) GetDescription ¶ added in v0.1.4
func (t *CompositeTransformer) GetDescription() string
GetDescription 获取转换器的描述
type ConnectProps ¶
type ConnectProps struct {
KeepAlive int // 保活时间(秒)
CleanSession bool // 是否清理会话
AutoReconnect bool // 是否自动重连
ConnectTimeout time.Duration // 连接超时时间
InitialReconnectInterval time.Duration // 初始重连间隔
MaxReconnectInterval time.Duration // 最大重连间隔
BackoffFactor float64 // 退避因子
WriteTimeout time.Duration // 写超时时间
ResumeSubs bool // 是否恢复订阅
PersistentSession bool // 是否持久化会话
}
ConnectProps 连接属性
type DefaultErrorHandler ¶ added in v0.1.4
type DefaultErrorHandler struct {
// contains filtered or unexported fields
}
DefaultErrorHandler 默认错误处理器
func NewDefaultErrorHandler ¶ added in v0.1.4
func NewDefaultErrorHandler(logger *slog.Logger) *DefaultErrorHandler
NewDefaultErrorHandler 创建默认错误处理器
func (*DefaultErrorHandler) HandleError ¶ added in v0.1.4
func (h *DefaultErrorHandler) HandleError(ctx context.Context, err error) bool
HandleError 处理错误
type DefaultLogger ¶ added in v0.1.4
type DefaultLogger struct {
// contains filtered or unexported fields
}
DefaultLogger 是默认的日志实现
func NewDefaultLogger ¶ added in v0.1.4
func NewDefaultLogger() *DefaultLogger
NewDefaultLogger 创建一个默认的日志实现
func (*DefaultLogger) Debug ¶ added in v0.1.4
func (l *DefaultLogger) Debug(msg string, args ...interface{})
Debug 记录Debug级别日志
func (*DefaultLogger) Debugf ¶ added in v0.1.4
func (l *DefaultLogger) Debugf(format string, args ...interface{})
Debugf 记录Debug级别格式化日志
func (*DefaultLogger) Error ¶ added in v0.1.4
func (l *DefaultLogger) Error(msg string, args ...interface{})
Error 记录Error级别日志
func (*DefaultLogger) Errorf ¶ added in v0.1.4
func (l *DefaultLogger) Errorf(format string, args ...interface{})
Errorf 记录Error级别格式化日志
func (*DefaultLogger) Info ¶ added in v0.1.4
func (l *DefaultLogger) Info(msg string, args ...interface{})
Info 记录Info级别日志
func (*DefaultLogger) Infof ¶ added in v0.1.4
func (l *DefaultLogger) Infof(format string, args ...interface{})
Infof 记录Info级别格式化日志
func (*DefaultLogger) Warn ¶ added in v0.1.4
func (l *DefaultLogger) Warn(msg string, args ...interface{})
Warn 记录Warn级别日志
func (*DefaultLogger) Warnf ¶ added in v0.1.4
func (l *DefaultLogger) Warnf(format string, args ...interface{})
Warnf 记录Warn级别格式化日志
type EnhancedTLSConfig ¶ added in v0.1.4
type EnhancedTLSConfig struct {
// 基本TLS配置
CAFile string // CA证书文件路径
CertFile string // 客户端证书文件路径
KeyFile string // 客户端密钥文件路径
SkipVerify bool // 是否跳过服务器证书验证
// 增强配置
CAFormat string // CA证书格式
CertFormat string // 客户端证书格式
KeyFormat string // 客户端密钥格式
KeyPassword string // 私钥密码
CertPassword string // 证书密码
MinVersion uint16 // 最低TLS版本
MaxVersion uint16 // 最高TLS版本
CipherSuites []uint16 // 加密套件列表
CurvePreferences []tls.CurveID // 曲线偏好
// 证书轮换配置
AutoReload bool // 是否自动重新加载证书
ReloadInterval time.Duration // 重新加载间隔
CertExpireWarning time.Duration // 证书过期警告时间
// contains filtered or unexported fields
}
EnhancedTLSConfig 增强的TLS配置
func NewEnhancedTLSConfig ¶ added in v0.1.4
func NewEnhancedTLSConfig() *EnhancedTLSConfig
NewEnhancedTLSConfig 创建增强的TLS配置
func (*EnhancedTLSConfig) BuildTLSConfig ¶ added in v0.1.4
func (c *EnhancedTLSConfig) BuildTLSConfig() (*tls.Config, error)
BuildTLSConfig 构建TLS配置
func (*EnhancedTLSConfig) GetCertExpiry ¶ added in v0.1.4
func (c *EnhancedTLSConfig) GetCertExpiry() time.Time
GetCertExpiry 获取证书过期时间
func (*EnhancedTLSConfig) IsExpiringSoon ¶ added in v0.1.4
func (c *EnhancedTLSConfig) IsExpiringSoon() bool
IsExpiringSoon 检查证书是否即将过期
func (*EnhancedTLSConfig) ReloadCertificates ¶ added in v0.1.4
func (c *EnhancedTLSConfig) ReloadCertificates() error
ReloadCertificates 重新加载证书
func (*EnhancedTLSConfig) StartCertWatcher ¶ added in v0.1.4
func (c *EnhancedTLSConfig) StartCertWatcher(logger Logger, reloadFunc func() error) error
StartCertWatcher 启动证书监视器
func (*EnhancedTLSConfig) StopCertWatcher ¶ added in v0.1.4
func (c *EnhancedTLSConfig) StopCertWatcher()
StopCertWatcher 停止证书监视器
func (*EnhancedTLSConfig) WithAutoReload ¶ added in v0.1.4
func (c *EnhancedTLSConfig) WithAutoReload(autoReload bool, reloadInterval time.Duration) *EnhancedTLSConfig
WithAutoReload 设置自动重新加载
func (*EnhancedTLSConfig) WithCACert ¶ added in v0.1.4
func (c *EnhancedTLSConfig) WithCACert(caFile, format string) *EnhancedTLSConfig
WithCACert 设置CA证书
func (*EnhancedTLSConfig) WithCertExpireWarning ¶ added in v0.1.4
func (c *EnhancedTLSConfig) WithCertExpireWarning(warning time.Duration) *EnhancedTLSConfig
WithCertExpireWarning 设置证书过期警告时间
func (*EnhancedTLSConfig) WithCipherSuites ¶ added in v0.1.4
func (c *EnhancedTLSConfig) WithCipherSuites(cipherSuites []uint16) *EnhancedTLSConfig
WithCipherSuites 设置加密套件
func (*EnhancedTLSConfig) WithClientCert ¶ added in v0.1.4
func (c *EnhancedTLSConfig) WithClientCert(certFile, keyFile, certFormat, keyFormat string) *EnhancedTLSConfig
WithClientCert 设置客户端证书
func (*EnhancedTLSConfig) WithCurvePreferences ¶ added in v0.1.4
func (c *EnhancedTLSConfig) WithCurvePreferences(curves []tls.CurveID) *EnhancedTLSConfig
WithCurvePreferences 设置曲线偏好
func (*EnhancedTLSConfig) WithPassword ¶ added in v0.1.4
func (c *EnhancedTLSConfig) WithPassword(certPassword, keyPassword string) *EnhancedTLSConfig
WithPassword 设置证书和私钥密码
func (*EnhancedTLSConfig) WithTLSVersion ¶ added in v0.1.4
func (c *EnhancedTLSConfig) WithTLSVersion(minVersion, maxVersion uint16) *EnhancedTLSConfig
WithTLSVersion 设置TLS版本范围
type ErrorCategory ¶ added in v0.1.4
type ErrorCategory string
ErrorCategory 错误类别
const ( // CategoryConnection 连接相关错误 CategoryConnection ErrorCategory = "connection" // CategorySubscription 订阅相关错误 CategorySubscription ErrorCategory = "subscription" // CategoryPublish 发布相关错误 CategoryPublish ErrorCategory = "publish" // CategoryAuthentication 认证相关错误 CategoryAuthentication ErrorCategory = "authentication" // CategoryInternal 内部错误 CategoryInternal ErrorCategory = "internal" )
type ErrorHandler ¶ added in v0.1.4
type ErrorHandler interface {
HandleError(ctx context.Context, err error) bool // 返回true表示错误已处理,false表示需要继续传播
}
ErrorHandler 错误处理器接口
type ErrorInfo ¶ added in v0.1.4
type ErrorInfo struct {
Error error // 原始错误
Time time.Time // 错误发生时间
Session string // 相关会话
Category ErrorCategory // 错误类别
Severity ErrorSeverity // 错误严重程度
Strategy RecoveryStrategy // 恢复策略
RetryCount int // 重试次数
Recovered bool // 是否已恢复
}
ErrorInfo 错误信息
type ErrorMetrics ¶ added in v0.1.4
type ErrorMetrics struct {
TotalErrors uint64 `json:"total_errors"`
ErrorsByType map[string]uint64 `json:"errors_by_type"`
ErrorsBySeverity map[string]uint64 `json:"errors_by_severity"`
LastError string `json:"last_error"`
LastErrorTime int64 `json:"last_error_time"`
}
ErrorMetrics 错误度量收集器
func NewErrorMetrics ¶ added in v0.1.4
func NewErrorMetrics() *ErrorMetrics
NewErrorMetrics 创建错误度量收集器
func (*ErrorMetrics) GetSnapshot ¶ added in v0.1.4
func (m *ErrorMetrics) GetSnapshot() map[string]interface{}
GetSnapshot 获取度量快照
func (*ErrorMetrics) RecordError ¶ added in v0.1.4
func (m *ErrorMetrics) RecordError(err error)
RecordError 记录错误
type ErrorReporter ¶ added in v0.1.4
ErrorReporter 错误报告接口
type ErrorSeverity ¶ added in v0.1.4
type ErrorSeverity string
ErrorSeverity 错误严重程度
const ( SeverityInfo ErrorSeverity = "info" // 信息级别(可忽略) SeverityWarning ErrorSeverity = "warning" // 警告级别(需关注) SeverityError ErrorSeverity = "error" // 错误级别(需处理) SeverityCritical ErrorSeverity = "critical" // 严重级别(需立即处理) )
func GetErrorSeverity ¶ added in v0.1.4
func GetErrorSeverity(err error) ErrorSeverity
GetErrorSeverity 获取错误严重程度
type ErrorType ¶ added in v0.1.4
type ErrorType string
ErrorType 错误类型
const ( TypeConnection ErrorType = "connection" // 连接相关错误 TypeAuthentication ErrorType = "authentication" // 认证相关错误 TypeSubscription ErrorType = "subscription" // 订阅相关错误 TypePublish ErrorType = "publish" // 发布相关错误 TypeConfiguration ErrorType = "configuration" // 配置相关错误 TypeStorage ErrorType = "storage" // 存储相关错误 TypeForwarder ErrorType = "forwarder" // 转发器相关错误 TypeInternal ErrorType = "internal" // 内部错误 )
type Event ¶
type Event struct {
Type string // 事件类型
Session string // 相关会话名称
Data interface{} // 事件数据
Timestamp time.Time // 事件发生时间
}
Event 事件结构
type EventManager ¶
type EventManager struct {
// contains filtered or unexported fields
}
EventManager 事件管理器
type FileStore ¶
type FileStore struct {
// contains filtered or unexported fields
}
FileStore 基于文件的会话状态存储
func (*FileStore) DeleteState ¶
DeleteState 删除会话状态
type Forwarder ¶ added in v0.1.4
type Forwarder struct {
// contains filtered or unexported fields
}
Forwarder 简化的消息转发器
func NewForwarder ¶ added in v0.1.4
func NewForwarder(config ForwarderConfig, manager *Manager) (*Forwarder, error)
NewForwarder 创建新的简化消息转发器
func (*Forwarder) GetMetrics ¶ added in v0.1.4
GetMetrics 获取转发器指标
type ForwarderBuilder ¶ added in v0.1.4
type ForwarderBuilder struct {
// contains filtered or unexported fields
}
ForwarderBuilder 转发器配置构建器
func NewForwarderBuilder ¶ added in v0.1.4
func NewForwarderBuilder(name string) *ForwarderBuilder
NewForwarderBuilder 创建转发器构建器
func (*ForwarderBuilder) BufferSize ¶ added in v0.1.4
func (fb *ForwarderBuilder) BufferSize(size int) *ForwarderBuilder
BufferSize 设置消息缓冲区大小
func (*ForwarderBuilder) Build ¶ added in v0.1.4
func (fb *ForwarderBuilder) Build() (ForwarderConfig, error)
Build 构建转发器配置
func (*ForwarderBuilder) Disable ¶ added in v0.1.4
func (fb *ForwarderBuilder) Disable() *ForwarderBuilder
Disable 禁用转发器
func (*ForwarderBuilder) Enable ¶ added in v0.1.4
func (fb *ForwarderBuilder) Enable() *ForwarderBuilder
Enable 启用转发器
func (*ForwarderBuilder) MapTopic ¶ added in v0.1.4
func (fb *ForwarderBuilder) MapTopic(srcTopic, dstTopic string) *ForwarderBuilder
MapTopic 添加单个主题映射
func (*ForwarderBuilder) MustBuild ¶ added in v0.1.4
func (fb *ForwarderBuilder) MustBuild() ForwarderConfig
MustBuild 构建转发器配置,出错时panic
func (*ForwarderBuilder) QoS ¶ added in v0.1.4
func (fb *ForwarderBuilder) QoS(qos byte) *ForwarderBuilder
QoS 设置转发消息的QoS等级
func (*ForwarderBuilder) Source ¶ added in v0.1.4
func (fb *ForwarderBuilder) Source(session string, topics ...string) *ForwarderBuilder
Source 设置源会话和主题
func (*ForwarderBuilder) Target ¶ added in v0.1.4
func (fb *ForwarderBuilder) Target(session string) *ForwarderBuilder
Target 设置目标会话
func (*ForwarderBuilder) TopicMapping ¶ added in v0.1.4
func (fb *ForwarderBuilder) TopicMapping(mappings map[string]string) *ForwarderBuilder
TopicMapping 设置主题映射
type ForwarderConfig ¶ added in v0.1.4
type ForwarderConfig struct {
Name string // 转发器名称
SourceSession string // 源会话名称
SourceTopics []string // 源主题列表
TargetSession string // 目标会话
TopicMap map[string]string // 主题映射表(源主题 -> 目标主题)
QoS byte // 服务质量等级
BufferSize int // 缓冲区大小
Enabled bool // 是否启用
}
ForwarderConfig 简化的消息转发器配置
type ForwarderRegistry ¶ added in v0.1.4
type ForwarderRegistry struct {
// contains filtered or unexported fields
}
ForwarderRegistry 简化的转发器注册表
func NewForwarderRegistry ¶ added in v0.1.4
func NewForwarderRegistry(manager *Manager) *ForwarderRegistry
NewForwarderRegistry 创建新的转发器注册表
func (*ForwarderRegistry) Get ¶ added in v0.1.4
func (sfr *ForwarderRegistry) Get(name string) (*Forwarder, error)
Get 获取指定名称的转发器
func (*ForwarderRegistry) GetAllMetrics ¶ added in v0.1.4
func (sfr *ForwarderRegistry) GetAllMetrics() map[string]map[string]interface{}
GetAllMetrics 获取所有转发器的指标
func (*ForwarderRegistry) List ¶ added in v0.1.4
func (sfr *ForwarderRegistry) List() []string
List 列出所有转发器的名称
func (*ForwarderRegistry) Register ¶ added in v0.1.4
func (sfr *ForwarderRegistry) Register(config ForwarderConfig) (*Forwarder, error)
Register 注册并启动一个新的转发器
func (*ForwarderRegistry) StopAll ¶ added in v0.1.4
func (sfr *ForwarderRegistry) StopAll()
StopAll 停止所有转发器
func (*ForwarderRegistry) Unregister ¶ added in v0.1.4
func (sfr *ForwarderRegistry) Unregister(name string) error
Unregister 注销并停止一个转发器
type GoRedisAdapter ¶ added in v0.1.4
type GoRedisAdapter struct {
// contains filtered or unexported fields
}
GoRedisAdapter 是go-redis库的适配器,实现RedisClient接口
func NewGoRedisClient ¶ added in v0.1.4
func NewGoRedisClient(opts *GoRedisOptions) *GoRedisAdapter
NewGoRedisClient 创建一个新的Go Redis客户端
func (*GoRedisAdapter) Close ¶ added in v0.1.4
func (a *GoRedisAdapter) Close() error
Close 实现RedisClient接口的Close方法
func (*GoRedisAdapter) Del ¶ added in v0.1.4
func (a *GoRedisAdapter) Del(ctx context.Context, keys ...string) error
Del 实现RedisClient接口的Del方法
type GoRedisOptions ¶ added in v0.1.4
type GoRedisOptions struct {
Addr string
Password string
DB int
Username string
// 连接池配置
PoolSize int
MinIdleConns int
// 超时设置
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}
GoRedisOptions Redis连接选项
func DefaultGoRedisOptions ¶ added in v0.1.4
func DefaultGoRedisOptions() *GoRedisOptions
DefaultGoRedisOptions 返回默认的Redis选项
type HandlerRegistry ¶
type HandlerRegistry struct {
// contains filtered or unexported fields
}
HandlerRegistry 处理函数注册表
func (*HandlerRegistry) AddMessageHandler ¶
func (h *HandlerRegistry) AddMessageHandler(topic string, handler MessageHandler)
AddMessageHandler 添加消息处理函数
func (*HandlerRegistry) ClearMessageHandlers ¶
func (h *HandlerRegistry) ClearMessageHandlers()
ClearMessageHandlers 清除所有消息处理函数
func (*HandlerRegistry) CountMessageHandlers ¶
func (h *HandlerRegistry) CountMessageHandlers() int
CountMessageHandlers 获取消息处理函数数量
func (*HandlerRegistry) GetAllTopics ¶
func (h *HandlerRegistry) GetAllTopics() []string
GetAllTopics 获取所有订阅的主题
func (*HandlerRegistry) GetMessageHandler ¶
func (h *HandlerRegistry) GetMessageHandler(topic string) (MessageHandler, bool)
GetMessageHandler 获取消息处理函数
func (*HandlerRegistry) RemoveMessageHandler ¶
func (h *HandlerRegistry) RemoveMessageHandler(topic string)
RemoveMessageHandler 移除消息处理函数
func (*HandlerRegistry) SetConnectHandler ¶
func (h *HandlerRegistry) SetConnectHandler(handler ConnectHandler)
SetConnectHandler 设置连接成功处理函数
func (*HandlerRegistry) SetConnectLostHandler ¶
func (h *HandlerRegistry) SetConnectLostHandler(handler ConnectLostHandler)
SetConnectLostHandler 设置连接断开处理函数
type InMemoryErrorReporter ¶ added in v0.1.4
type InMemoryErrorReporter struct {
// contains filtered or unexported fields
}
InMemoryErrorReporter 内存错误报告器
func NewInMemoryErrorReporter ¶ added in v0.1.4
func NewInMemoryErrorReporter() *InMemoryErrorReporter
NewInMemoryErrorReporter 创建内存错误报告器
func (*InMemoryErrorReporter) GetErrorStats ¶ added in v0.1.4
func (r *InMemoryErrorReporter) GetErrorStats() map[string]interface{}
GetErrorStats 获取错误统计
func (*InMemoryErrorReporter) GetRecentErrors ¶ added in v0.1.4
func (r *InMemoryErrorReporter) GetRecentErrors(limit int) []error
GetRecentErrors 获取最近的错误
func (*InMemoryErrorReporter) ReportError ¶ added in v0.1.4
func (r *InMemoryErrorReporter) ReportError(err error)
ReportError 报告错误
type JSONExporter ¶
type JSONExporter struct {
// contains filtered or unexported fields
}
JSONExporter JSON 格式导出器
func NewJSONExporter ¶
func NewJSONExporter(pretty bool) *JSONExporter
type JWTAuthProvider ¶ added in v0.1.4
type JWTAuthProvider struct {
// contains filtered or unexported fields
}
JWTAuthProvider JWT认证提供者
func NewJWTAuthProvider ¶ added in v0.1.4
func NewJWTAuthProvider(token string) *JWTAuthProvider
NewJWTAuthProvider 创建JWT认证提供者
func (*JWTAuthProvider) Complete ¶ added in v0.1.4
func (p *JWTAuthProvider) Complete(data []byte) (bool, error)
Complete 完成认证过程
func (*JWTAuthProvider) HandleChallenge ¶ added in v0.1.4
func (p *JWTAuthProvider) HandleChallenge(data []byte) ([]byte, error)
HandleChallenge 处理认证挑战
func (*JWTAuthProvider) Initialize ¶ added in v0.1.4
func (p *JWTAuthProvider) Initialize() ([]byte, error)
Initialize 初始化认证过程
func (*JWTAuthProvider) Method ¶ added in v0.1.4
func (p *JWTAuthProvider) Method() string
Method 返回认证方法名称
type JsonPathTransformer ¶ added in v0.1.4
type JsonPathTransformer struct {
// contains filtered or unexported fields
}
JsonPathTransformer JSON路径转换器 用于对JSON格式的负载进行特定字段的修改
func NewJsonPathTransformer ¶ added in v0.1.4
func NewJsonPathTransformer(path string, value interface{}) *JsonPathTransformer
NewJsonPathTransformer 创建JSON路径转换器 path格式为简单的点分路径,例如 "device.temperature.value"
func (*JsonPathTransformer) GetDescription ¶ added in v0.1.4
func (t *JsonPathTransformer) GetDescription() string
GetDescription 获取转换器的描述
type Logger ¶
type Logger interface {
Debug(msg string, args ...interface{})
Info(msg string, args ...interface{})
Warn(msg string, args ...interface{})
Error(msg string, args ...interface{})
Debugf(format string, args ...interface{})
Infof(format string, args ...interface{})
Warnf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
Logger 是日志接口定义
type MQTT5Auth ¶ added in v0.1.4
type MQTT5Auth struct {
Method string // 认证方法
Data []byte // 认证数据
Properties map[string]string // 认证属性
}
MQTT5Auth MQTT 5.0增强认证
func NewMQTT5Auth ¶ added in v0.1.4
NewMQTT5Auth 创建MQTT 5.0增强认证
func (*MQTT5Auth) WithProperty ¶ added in v0.1.4
WithProperty 添加认证属性
type MQTT5AuthManager ¶ added in v0.1.4
type MQTT5AuthManager struct {
// contains filtered or unexported fields
}
MQTT5AuthManager MQTT 5.0认证管理器
func NewMQTT5AuthManager ¶ added in v0.1.4
func NewMQTT5AuthManager(provider AuthProvider) *MQTT5AuthManager
NewMQTT5AuthManager 创建MQTT 5.0认证管理器
func (*MQTT5AuthManager) CompleteAuth ¶ added in v0.1.4
func (m *MQTT5AuthManager) CompleteAuth(data []byte) (bool, error)
CompleteAuth 完成认证过程
func (*MQTT5AuthManager) GetMethod ¶ added in v0.1.4
func (m *MQTT5AuthManager) GetMethod() string
GetMethod 获取认证方法
func (*MQTT5AuthManager) GetState ¶ added in v0.1.4
func (m *MQTT5AuthManager) GetState() int
GetState 获取认证状态
func (*MQTT5AuthManager) HandleAuth ¶ added in v0.1.4
func (m *MQTT5AuthManager) HandleAuth(data []byte) ([]byte, error)
HandleAuth 处理认证响应
type MQTT5Config ¶ added in v0.1.4
type MQTT5Config struct {
// 基本配置
ProtocolVersion byte // 协议版本,默认为MQTT50
SessionExpiry time.Duration // 会话过期间隔
ReceiveMaximum uint16 // 接收最大值
MaximumPacketSize uint32 // 最大报文大小
TopicAliasMaximum uint16 // 主题别名最大值
RequestRespInfo bool // 请求响应信息
RequestProblemInfo bool // 请求问题信息
// 认证配置
AuthMethod string // 认证方法
AuthData []byte // 认证数据
EnhancedAuth bool // 是否启用增强认证
SharedSubAvailable bool // 是否支持共享订阅
// 用户属性
UserProperties map[string]string // 用户属性
}
MQTT5Config MQTT 5.0配置
func DefaultMQTT5Config ¶ added in v0.1.4
func DefaultMQTT5Config() *MQTT5Config
DefaultMQTT5Config 返回默认MQTT 5.0配置
type MQTT5Options ¶ added in v0.1.4
type MQTT5Options struct {
// 基本配置
MQTT5Config *MQTT5Config
// 安全配置
TLS *TLSConfig // 基本TLS配置
EnhancedTLS *EnhancedTLSConfig // 增强TLS配置
// 增强认证
Auth *MQTT5Auth // 增强认证配置
SharedSubPrefix string // 共享订阅前缀,默认为"$share"
}
MQTT5Options MQTT 5.0选项扩展
type MQTTXError ¶ added in v0.1.4
type MQTTXError struct {
Type ErrorType `json:"type"` // 错误类型
Severity ErrorSeverity `json:"severity"` // 错误严重程度
Code string `json:"code,omitempty"` // 错误代码
Message string `json:"message"` // 错误消息
Session string `json:"session,omitempty"` // 相关会话
Topic string `json:"topic,omitempty"` // 相关主题
Timestamp time.Time `json:"timestamp"` // 错误发生时间
Context map[string]any `json:"context,omitempty"` // 额外上下文信息
Cause error `json:"-"` // 底层原因(不序列化)
}
MQTTXError 统一的错误结构
func FilterErrors ¶ added in v0.1.4
func FilterErrors(errors []error, errType ErrorType) []*MQTTXError
FilterErrors 过滤特定类型的错误
func NewConfigError ¶ added in v0.1.4
func NewConfigError(message string, cause error) *MQTTXError
NewConfigError 创建配置错误
func NewConnectionError ¶ added in v0.1.4
func NewConnectionError(message string, cause error) *MQTTXError
NewConnectionError 创建连接错误
func NewError ¶ added in v0.1.4
func NewError(errType ErrorType, severity ErrorSeverity, message string) *MQTTXError
NewError 创建新的错误
func NewForwarderError ¶ added in v0.1.4
func NewForwarderError(message string, cause error) *MQTTXError
NewForwarderError 创建转发器错误
func NewPublishError ¶ added in v0.1.4
func NewPublishError(topic string, cause error) *MQTTXError
NewPublishError 创建发布错误
func NewSubscribeError ¶ added in v0.1.4
func NewSubscribeError(topic string, cause error) *MQTTXError
NewSubscribeError 创建订阅错误
func WrapError ¶ added in v0.1.4
func WrapError(err error, errType ErrorType, message string) *MQTTXError
WrapError 封装错误,保留原始错误信息
func (*MQTTXError) IsCritical ¶ added in v0.1.4
func (e *MQTTXError) IsCritical() bool
IsCritical 判断是否为严重错误
func (*MQTTXError) IsRecoverable ¶ added in v0.1.4
func (e *MQTTXError) IsRecoverable() bool
IsRecoverable 判断是否为可恢复错误
func (*MQTTXError) WithContext ¶ added in v0.1.4
func (e *MQTTXError) WithContext(key string, value any) *MQTTXError
WithContext 添加上下文信息
func (*MQTTXError) WithSession ¶ added in v0.1.4
func (e *MQTTXError) WithSession(session string) *MQTTXError
WithSession 设置相关会话
func (*MQTTXError) WithTopic ¶ added in v0.1.4
func (e *MQTTXError) WithTopic(topic string) *MQTTXError
WithTopic 设置相关主题
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager 管理多个MQTT会话
func (*Manager) AddSession ¶
AddSession 添加新的MQTT会话
func (*Manager) ClearRecoveredErrors ¶ added in v0.1.4
ClearRecoveredErrors 清理已恢复的错误
func (*Manager) GetActiveErrors ¶ added in v0.1.4
GetActiveErrors 获取当前活跃的错误
func (*Manager) GetAllSessionsStatus ¶
GetAllSessionsStatus 获取所有会话的状态字符串
func (*Manager) GetAllSessionsStatusCode ¶ added in v0.1.4
GetAllSessionsStatusCode 获取所有会话的状态常量
func (*Manager) GetErrorStats ¶ added in v0.1.4
GetErrorStats 获取错误统计信息
func (*Manager) GetMetrics ¶
GetMetrics 获取指标信息
func (*Manager) GetSession ¶
GetSession 获取指定会话
func (*Manager) HandleToWithConfig ¶ added in v0.1.4
func (m *Manager) HandleToWithConfig(config RouteConfig) (*AdvancedRoute, error)
HandleToWithConfig 使用高级配置处理指定会话的指定主题消息
func (*Manager) HandleWithConfig ¶ added in v0.1.4
func (m *Manager) HandleWithConfig(config RouteConfig, handler func(*Message)) *AdvancedRoute
HandleWithConfig 使用高级配置处理所有会话的指定主题消息
func (*Manager) ListenToWithConfig ¶ added in v0.1.4
func (m *Manager) ListenToWithConfig(config RouteConfig) (chan *Message, *AdvancedRoute, error)
ListenToWithConfig 使用高级配置监听指定会话的指定主题消息
func (*Manager) ListenWithConfig ¶ added in v0.1.4
func (m *Manager) ListenWithConfig(config RouteConfig) (chan *Message, *AdvancedRoute)
ListenWithConfig 使用高级配置监听所有会话的指定主题消息
func (*Manager) PublishToAll ¶
PublishToAll 向所有会话发布消息
func (*Manager) RegisterError ¶ added in v0.1.4
func (m *Manager) RegisterError(sessionName string, err error, category ErrorCategory) *ErrorInfo
RegisterError 注册错误并启动恢复流程
func (*Manager) RemoveSession ¶
RemoveSession 移除会话
func (*Manager) SetMaxRetries ¶ added in v0.1.4
func (m *Manager) SetMaxRetries(category ErrorCategory, count int)
SetMaxRetries 设置特定类别错误的最大重试次数
func (*Manager) SubscribeAll ¶
func (m *Manager) SubscribeAll(topic string, handler MessageHandler, qos byte) []error
SubscribeAll 在所有会话中订阅主题
func (*Manager) SubscribeTo ¶
func (m *Manager) SubscribeTo(sessionName, topic string, handler MessageHandler, qos byte) error
SubscribeTo 订阅主题,增加错误处理
func (*Manager) UnsubscribeAll ¶
UnsubscribeAll 取消所有会话的主题订阅
func (*Manager) UnsubscribeTo ¶
UnsubscribeTo 取消指定会话的主题订阅
func (*Manager) WaitForAllSessions ¶
WaitForAllSessions 等待所有会话连接成功
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore 基于内存的会话状态存储
func (*MemoryStore) DeleteState ¶
func (s *MemoryStore) DeleteState(sessionName string) error
DeleteState 删除会话状态
func (*MemoryStore) LoadState ¶
func (s *MemoryStore) LoadState(sessionName string) (*SessionState, error)
LoadState 加载会话状态
func (*MemoryStore) SaveState ¶
func (s *MemoryStore) SaveState(sessionName string, state *SessionState) error
SaveState 保存会话状态
type Message ¶
type Message struct {
Topic string `json:"topic"` // 主题名称
Payload []byte `json:"payload"` // 消息内容
QoS byte `json:"qos"` // 服务质量等级
Retained bool `json:"retained"` // 是否为保留消息
Duplicate bool `json:"duplicate"` // 是否为重复消息
MessageID uint16 `json:"message_id"` // 消息ID
Timestamp time.Time `json:"timestamp"` // 消息时间戳
Properties map[string]interface{} `json:"properties,omitempty"` // MQTT 5.0属性
}
Message 消息结构
func (*Message) PayloadJSON ¶
PayloadJSON 将消息负载解析为JSON
type MessageFilter ¶ added in v0.1.4
type MessageFilter interface {
// Match 判断消息是否匹配过滤规则
Match(message *Message) bool
// GetDescription 获取过滤器的描述
GetDescription() string
}
MessageFilter 消息过滤器接口 用于根据指定的规则过滤消息
type MessageHandler ¶
MessageHandler 消息处理函数类型
type MessagePool ¶ added in v0.1.4
type MessagePool struct {
// contains filtered or unexported fields
}
MessagePool 消息对象池,减少内存分配
type MessageTransformer ¶ added in v0.1.4
type MessageTransformer interface {
// Transform 转换消息,返回转换后的消息
Transform(message *Message) (*Message, error)
// GetDescription 获取转换器的描述
GetDescription() string
}
MessageTransformer 消息转换器接口 用于对消息进行转换处理
type Metrics ¶
type Metrics struct {
TotalMessages uint64 // 总消息数
TotalBytes uint64 // 总字节数
ErrorCount uint64 // 错误计数
ReconnectCount uint64 // 重连次数
ActiveSessions int64 // 活跃会话数
LastUpdate int64 // 最后更新时间(Unix纳秒时间戳,原子操作安全)
LastMessageTime int64 // 最后消息时间(Unix纳秒时间戳,原子操作安全)
LastErrorTime int64 // 最后错误时间(Unix纳秒时间戳,原子操作安全)
}
Metrics 指标收集器
type MetricsExporter ¶
MetricsExporter 指标导出接口
type MetricsPool ¶ added in v0.1.4
type MetricsPool struct {
// contains filtered or unexported fields
}
MetricsPool 指标对象池
type MockRedisClient ¶ added in v0.1.4
type MockRedisClient struct {
// contains filtered or unexported fields
}
MockRedisClient 是Redis客户端的模拟实现
func NewMockRedisClient ¶ added in v0.1.4
func NewMockRedisClient() *MockRedisClient
NewMockRedisClient 创建一个新的模拟Redis客户端
func (*MockRedisClient) Del ¶ added in v0.1.4
func (m *MockRedisClient) Del(ctx context.Context, keys ...string) error
Del 删除键
func (*MockRedisClient) Ping ¶ added in v0.1.4
func (m *MockRedisClient) Ping(ctx context.Context) error
Ping 测试连接
func (*MockRedisClient) Set ¶ added in v0.1.4
func (m *MockRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
Set 设置键值对
func (*MockRedisClient) WithDelError ¶ added in v0.1.4
func (m *MockRedisClient) WithDelError(err error) *MockRedisClient
WithDelError 设置Del操作的错误
func (*MockRedisClient) WithGetError ¶ added in v0.1.4
func (m *MockRedisClient) WithGetError(err error) *MockRedisClient
WithGetError 设置Get操作的错误
func (*MockRedisClient) WithSetError ¶ added in v0.1.4
func (m *MockRedisClient) WithSetError(err error) *MockRedisClient
WithSetError 设置Set操作的错误
type OAuth2AuthProvider ¶ added in v0.1.4
type OAuth2AuthProvider struct {
// contains filtered or unexported fields
}
OAuth2AuthProvider OAuth 2.0认证提供者
func NewOAuth2AuthProvider ¶ added in v0.1.4
func NewOAuth2AuthProvider(accessToken, refreshToken string, expiresIn time.Duration) *OAuth2AuthProvider
NewOAuth2AuthProvider 创建OAuth 2.0认证提供者
func (*OAuth2AuthProvider) Complete ¶ added in v0.1.4
func (p *OAuth2AuthProvider) Complete(data []byte) (bool, error)
Complete 完成认证过程
func (*OAuth2AuthProvider) HandleChallenge ¶ added in v0.1.4
func (p *OAuth2AuthProvider) HandleChallenge(data []byte) ([]byte, error)
HandleChallenge 处理认证挑战
func (*OAuth2AuthProvider) Initialize ¶ added in v0.1.4
func (p *OAuth2AuthProvider) Initialize() ([]byte, error)
Initialize 初始化认证过程
func (*OAuth2AuthProvider) Method ¶ added in v0.1.4
func (p *OAuth2AuthProvider) Method() string
Method 返回认证方法名称
type Options ¶
type Options struct {
Name string // 会话名称
Brokers []string // MQTT broker地址列表
Username string // 用户名
Password string // 密码
ClientID string // 客户端ID
TLS *TLSConfig // TLS配置
EnhancedTLS *EnhancedTLSConfig // 增强的TLS配置
ConnectProps *ConnectProps // 连接属性
Topics []TopicConfig // 预订阅的主题
Performance *PerformanceOptions // 性能相关配置
Storage *StorageOptions // 存储选项
ProtocolVersion byte // 协议版本: MQTT311 (4) 或 MQTT50 (5)
MQTT5 *MQTT5Options // MQTT 5.0 特定选项
}
Options MQTT会话配置选项
func (*Options) ConfigureTLS ¶
ConfigureTLS 配置TLS
func (*Options) ConfigureTLSWithEnhanced ¶ added in v0.1.4
ConfigureTLSWithEnhanced 使用增强的TLS配置构建TLS配置
func (*Options) WithCleanSession ¶
WithCleanSession 设置清理会话选项
func (*Options) WithEnhancedAuth ¶ added in v0.1.4
WithEnhancedAuth 配置增强认证
func (*Options) WithEnhancedTLS ¶ added in v0.1.4
func (o *Options) WithEnhancedTLS(config *EnhancedTLSConfig) *Options
WithEnhancedTLS 使用增强的TLS配置
func (*Options) WithMQTT5 ¶ added in v0.1.4
func (o *Options) WithMQTT5(config *MQTT5Config) *Options
WithMQTT5 使用MQTT 5.0配置
func (*Options) WithPerformance ¶
WithPerformance 设置性能选项
func (*Options) WithPersistence ¶
WithPersistence 设置持久化选项
func (*Options) WithReconnect ¶
func (o *Options) WithReconnect(autoReconnect bool, initialInterval, maxInterval int64, backoffFactor float64) *Options
WithReconnect 设置重连选项
func (*Options) WithRedisStorage ¶ added in v0.1.4
func (o *Options) WithRedisStorage(addr, username, password string, db int, keyPrefix string, ttl int64) *Options
WithRedisStorage 设置Redis存储选项
func (*Options) WithSessionExpiry ¶ added in v0.1.4
WithSessionExpiry 设置会话过期间隔
func (*Options) WithSharedSubscription ¶ added in v0.1.4
WithSharedSubscription 配置共享订阅
func (*Options) WithSimpleReconnect ¶ added in v0.1.4
WithSimpleReconnect 设置简化版重连选项
func (*Options) WithStorage ¶ added in v0.1.4
WithStorage 设置存储选项
func (*Options) WithTimeout ¶
WithTimeout 设置超时选项
func (*Options) WithUserProperty ¶ added in v0.1.4
WithUserProperty 添加用户属性
type PayloadFilter ¶ added in v0.1.4
type PayloadFilter struct {
// contains filtered or unexported fields
}
PayloadFilter 基于消息负载的过滤器
func NewPayloadFilter ¶ added in v0.1.4
func NewPayloadFilter(pattern string, isRegex bool) (*PayloadFilter, error)
NewPayloadFilter 创建基于负载内容的消息过滤器
func (*PayloadFilter) GetDescription ¶ added in v0.1.4
func (f *PayloadFilter) GetDescription() string
GetDescription 获取过滤器的描述
func (*PayloadFilter) Match ¶ added in v0.1.4
func (f *PayloadFilter) Match(message *Message) bool
Match 判断消息负载是否匹配过滤规则
type PayloadTransformer ¶ added in v0.1.4
type PayloadTransformer struct {
// contains filtered or unexported fields
}
PayloadTransformer 负载转换器 用于对消息负载进行转换
func NewPayloadTransformer ¶ added in v0.1.4
func NewPayloadTransformer(description string, transformFunc func([]byte) ([]byte, error)) *PayloadTransformer
NewPayloadTransformer 创建自定义负载转换器
func (*PayloadTransformer) GetDescription ¶ added in v0.1.4
func (t *PayloadTransformer) GetDescription() string
GetDescription 获取转换器的描述
type PerformanceOptions ¶
type PerformanceOptions struct {
WriteBufferSize int // 写缓冲区大小
ReadBufferSize int // 读缓冲区大小
MessageChanSize uint // 消息通道大小
WriteTimeout time.Duration // 写超时
ReadTimeout time.Duration // 读超时
MaxMessageSize int64 // 最大消息大小
MaxPendingMessages int // 最大待处理消息数
}
PerformanceOptions 性能相关配置
type PlainAuthProvider ¶ added in v0.1.4
type PlainAuthProvider struct {
// contains filtered or unexported fields
}
PlainAuthProvider 明文认证提供者
func NewPlainAuthProvider ¶ added in v0.1.4
func NewPlainAuthProvider(username, password string) *PlainAuthProvider
NewPlainAuthProvider 创建明文认证提供者
func (*PlainAuthProvider) Complete ¶ added in v0.1.4
func (p *PlainAuthProvider) Complete(data []byte) (bool, error)
Complete 完成认证过程
func (*PlainAuthProvider) HandleChallenge ¶ added in v0.1.4
func (p *PlainAuthProvider) HandleChallenge(data []byte) ([]byte, error)
HandleChallenge 处理认证挑战
func (*PlainAuthProvider) Initialize ¶ added in v0.1.4
func (p *PlainAuthProvider) Initialize() ([]byte, error)
Initialize 初始化认证过程
func (*PlainAuthProvider) Method ¶ added in v0.1.4
func (p *PlainAuthProvider) Method() string
Method 返回认证方法名称
type PoolStats ¶ added in v0.1.4
type PoolStats struct {
MessagePoolHits uint64 `json:"message_pool_hits"`
MessagePoolMisses uint64 `json:"message_pool_misses"`
BufferPoolHits uint64 `json:"buffer_pool_hits"`
BufferPoolMisses uint64 `json:"buffer_pool_misses"`
MetricsPoolHits uint64 `json:"metrics_pool_hits"`
MetricsPoolMisses uint64 `json:"metrics_pool_misses"`
TimerPoolHits uint64 `json:"timer_pool_hits"`
TimerPoolMisses uint64 `json:"timer_pool_misses"`
}
PoolStats 对象池统计信息
type PooledBuffer ¶ added in v0.1.4
type PooledBuffer struct {
// contains filtered or unexported fields
}
PooledBuffer 池化缓冲区
func GetBuffer ¶ added in v0.1.4
func GetBuffer(expectedSize int) *PooledBuffer
GetBuffer 根据预期大小获取合适的缓冲区
type PooledMessage ¶ added in v0.1.4
type PooledMessage struct {
Topic string
Payload []byte
QoS byte
Retained bool
Timestamp time.Time
// contains filtered or unexported fields
}
PooledMessage 池化消息对象
func (*PooledMessage) Clone ¶ added in v0.1.4
func (m *PooledMessage) Clone() *PooledMessage
Clone 克隆消息(深拷贝)
func (*PooledMessage) SetPayload ¶ added in v0.1.4
func (m *PooledMessage) SetPayload(data []byte)
SetPayload 设置消息载荷,复用底层数组
type PooledTimer ¶ added in v0.1.4
PooledTimer 池化定时器
type PrometheusExporter ¶
type PrometheusExporter struct {
// contains filtered or unexported fields
}
PrometheusExporter Prometheus 格式导出器
func NewPrometheusExporter ¶
func NewPrometheusExporter(prefix string) *PrometheusExporter
func (*PrometheusExporter) Export ¶
func (e *PrometheusExporter) Export(metrics map[string]interface{}) string
type RecoveryManager ¶ added in v0.1.4
type RecoveryManager struct {
// contains filtered or unexported fields
}
RecoveryManager 恢复管理器
func NewRecoveryManager ¶ added in v0.1.4
func NewRecoveryManager(manager *Manager) *RecoveryManager
NewRecoveryManager 创建新的恢复管理器
func (*RecoveryManager) ClearRecoveredErrors ¶ added in v0.1.4
func (rm *RecoveryManager) ClearRecoveredErrors() int
ClearRecoveredErrors 清理已恢复的错误
func (*RecoveryManager) GetActiveErrors ¶ added in v0.1.4
func (rm *RecoveryManager) GetActiveErrors() []*ErrorInfo
GetActiveErrors 获取活跃错误
func (*RecoveryManager) GetErrorStats ¶ added in v0.1.4
func (rm *RecoveryManager) GetErrorStats() map[string]interface{}
GetErrorStats 获取错误统计信息
func (*RecoveryManager) MarkErrorRecovered ¶ added in v0.1.4
func (rm *RecoveryManager) MarkErrorRecovered(sessionName string, category ErrorCategory, err error) bool
MarkErrorRecovered 标记错误已恢复
func (*RecoveryManager) RegisterError ¶ added in v0.1.4
func (rm *RecoveryManager) RegisterError(sessionName string, err error, category ErrorCategory) *ErrorInfo
RegisterError 注册错误
func (*RecoveryManager) SetMaxRetries ¶ added in v0.1.4
func (rm *RecoveryManager) SetMaxRetries(category ErrorCategory, count int)
SetMaxRetries 设置最大重试次数
type RecoveryStrategy ¶ added in v0.1.4
type RecoveryStrategy int
RecoveryStrategy 恢复策略
const ( // StrategyRetry 重试策略 StrategyRetry RecoveryStrategy = iota // StrategyReconnect 重新连接策略 StrategyReconnect // StrategyNotify 仅通知策略 StrategyNotify )
type RedisClient ¶ added in v0.1.4
type RedisClient interface {
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
Get(ctx context.Context, key string) (string, error)
Del(ctx context.Context, keys ...string) error
Close() error
}
RedisClient Redis客户端接口 这是一个接口,允许使用不同的Redis客户端实现
type RedisOptions ¶ added in v0.1.4
type RedisOptions struct {
// Addr Redis服务器地址,格式为 "host:port"
Addr string
// Username Redis用户名
Username string
// Password Redis密码
Password string
// DB Redis数据库索引
DB int
// KeyPrefix 键前缀
KeyPrefix string
// TTL 会话状态的生存时间
TTL int64 // 秒
// PoolSize 连接池大小
PoolSize int
}
RedisOptions Redis配置选项
func DefaultRedisOptions ¶ added in v0.1.4
func DefaultRedisOptions() *RedisOptions
DefaultRedisOptions 返回默认的Redis选项
type RedisStore ¶ added in v0.1.4
type RedisStore struct {
// contains filtered or unexported fields
}
RedisStore 基于Redis的会话状态存储
func NewRedisStore ¶ added in v0.1.4
func NewRedisStore(client RedisClient, opts *RedisStoreOptions) *RedisStore
NewRedisStore 创建新的Redis存储
func (*RedisStore) DeleteState ¶ added in v0.1.4
func (s *RedisStore) DeleteState(sessionName string) error
DeleteState 删除会话状态
func (*RedisStore) LoadState ¶ added in v0.1.4
func (s *RedisStore) LoadState(sessionName string) (*SessionState, error)
LoadState 加载会话状态
func (*RedisStore) SaveState ¶ added in v0.1.4
func (s *RedisStore) SaveState(sessionName string, state *SessionState) error
SaveState 保存会话状态
type RedisStoreOptions ¶ added in v0.1.4
RedisStoreOptions Redis存储选项
type RetryableError ¶ added in v0.1.4
type RetryableError struct {
*MQTTXError
MaxRetries int `json:"max_retries"`
CurrentRetry int `json:"current_retry"`
RetryDelay int64 `json:"retry_delay"` // 纳秒
}
RetryableError 可重试的错误包装器
func NewRetryableError ¶ added in v0.1.4
func NewRetryableError(err *MQTTXError, maxRetries int, retryDelay int64) *RetryableError
NewRetryableError 创建可重试错误
func (*RetryableError) CanRetry ¶ added in v0.1.4
func (r *RetryableError) CanRetry() bool
CanRetry 检查是否可以重试
func (*RetryableError) GetRetryDelay ¶ added in v0.1.4
func (r *RetryableError) GetRetryDelay() int64
GetRetryDelay 获取重试延迟(纳秒)
func (*RetryableError) NextRetry ¶ added in v0.1.4
func (r *RetryableError) NextRetry()
NextRetry 准备下次重试
type RouteConfig ¶ added in v0.1.4
type RouteConfig struct {
Topic string // 主题
QoS byte // QoS级别
Filters []MessageFilter // 过滤器列表
Transformers []MessageTransformer // 转换器列表
BufferSize int // 消息缓冲区大小,仅在Listen模式下有效
Description string // 路由描述
SessionName string // 会话名称,空字符串表示所有会话
Handler func(*Message) // 消息处理函数
}
RouteConfig 路由配置选项
type RouteStats ¶
type RouteStats struct {
MessagesReceived uint64 // 接收的消息数量
MessagesDropped uint64 // 丢弃的消息数量
BytesReceived uint64 // 接收的字节数
LastMessageTime time.Time // 最后消息时间
LastError time.Time // 最后错误时间
ErrorCount uint64 // 错误计数
}
RouteStats 路由统计信息
type SHA256AuthProvider ¶ added in v0.1.4
type SHA256AuthProvider struct {
// contains filtered or unexported fields
}
SHA256AuthProvider SHA-256哈希认证提供者
func NewSHA256AuthProvider ¶ added in v0.1.4
func NewSHA256AuthProvider(username, password string) *SHA256AuthProvider
NewSHA256AuthProvider 创建SHA-256哈希认证提供者
func (*SHA256AuthProvider) Complete ¶ added in v0.1.4
func (p *SHA256AuthProvider) Complete(data []byte) (bool, error)
Complete 完成认证过程
func (*SHA256AuthProvider) HandleChallenge ¶ added in v0.1.4
func (p *SHA256AuthProvider) HandleChallenge(data []byte) ([]byte, error)
HandleChallenge 处理认证挑战
func (*SHA256AuthProvider) Initialize ¶ added in v0.1.4
func (p *SHA256AuthProvider) Initialize() ([]byte, error)
Initialize 初始化认证过程
func (*SHA256AuthProvider) Method ¶ added in v0.1.4
func (p *SHA256AuthProvider) Method() string
Method 返回认证方法名称
type SecurityError ¶ added in v0.1.4
SecurityError 安全相关错误
func (*SecurityError) Error ¶ added in v0.1.4
func (e *SecurityError) Error() string
func (*SecurityError) Unwrap ¶ added in v0.1.4
func (e *SecurityError) Unwrap() error
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session 表示单个MQTT会话
func (*Session) GetLastActivity ¶
GetLastActivity 获取最后活动时间
func (*Session) GetMetrics ¶
GetMetrics 获取会话指标
func (*Session) GetSessionStatus ¶ added in v0.1.4
GetSessionStatus 返回会话的当前状态常量
func (*Session) GetSubscribedTopics ¶
GetSubscribedTopics 获取所有已订阅的主题
func (*Session) GetSubscriptionCount ¶
GetSubscriptionCount 获取订阅数量
func (*Session) IsSubscribed ¶
IsSubscribed 检查主题是否已订阅
func (*Session) PrometheusMetrics ¶
PrometheusMetrics 格式的指标导出
func (*Session) Subscribe ¶
func (s *Session) Subscribe(topic string, handler MessageHandler, qos byte) error
Subscribe 订阅主题,增加错误重试机制
func (*Session) SubscribeShared ¶ added in v0.1.4
func (s *Session) SubscribeShared(group, topic string, handler MessageHandler, qos byte) error
SubscribeShared 订阅共享主题
func (*Session) Unsubscribe ¶
Unsubscribe 取消订阅主题
func (*Session) UnsubscribeShared ¶ added in v0.1.4
UnsubscribeShared 取消订阅共享主题
func (*Session) UpdateLastActivity ¶
func (s *Session) UpdateLastActivity()
UpdateLastActivity 更新最后活动时间
func (*Session) WithEnhancedAuth ¶ added in v0.1.4
func (s *Session) WithEnhancedAuth(provider AuthProvider) *Session
WithEnhancedAuth 为会话配置增强认证
type SessionBuilder ¶ added in v0.1.4
type SessionBuilder struct {
// contains filtered or unexported fields
}
SessionBuilder 会话配置构建器,提供流畅的API
func NewSessionBuilder ¶ added in v0.1.4
func NewSessionBuilder(name string) *SessionBuilder
NewSessionBuilder 创建新的会话构建器
func QuickConnect ¶ added in v0.1.4
func QuickConnect(name, broker string) *SessionBuilder
QuickConnect 快速连接配置(最简API)
func SecureConnect ¶ added in v0.1.4
func SecureConnect(name, broker, caFile string) *SessionBuilder
SecureConnect 安全连接配置(常用TLS场景)
func (*SessionBuilder) Auth ¶ added in v0.1.4
func (b *SessionBuilder) Auth(username, password string) *SessionBuilder
Auth 设置认证信息
func (*SessionBuilder) AutoReconnect ¶ added in v0.1.4
func (b *SessionBuilder) AutoReconnect() *SessionBuilder
AutoReconnect 启用自动重连
func (*SessionBuilder) Broker ¶ added in v0.1.4
func (b *SessionBuilder) Broker(addr string) *SessionBuilder
Broker 设置单个broker地址
func (*SessionBuilder) Brokers ¶ added in v0.1.4
func (b *SessionBuilder) Brokers(addrs ...string) *SessionBuilder
Brokers 设置多个broker地址(高可用)
func (*SessionBuilder) Build ¶ added in v0.1.4
func (b *SessionBuilder) Build() (*Options, error)
Build 构建配置选项,包含完整验证
func (*SessionBuilder) CleanSession ¶ added in v0.1.4
func (b *SessionBuilder) CleanSession(clean bool) *SessionBuilder
CleanSession 设置是否清理会话
func (*SessionBuilder) ClientID ¶ added in v0.1.4
func (b *SessionBuilder) ClientID(id string) *SessionBuilder
ClientID 设置客户端ID
func (*SessionBuilder) DisableReconnect ¶ added in v0.1.4
func (b *SessionBuilder) DisableReconnect() *SessionBuilder
DisableReconnect 禁用自动重连
func (*SessionBuilder) FileStorage ¶ added in v0.1.4
func (b *SessionBuilder) FileStorage(path string) *SessionBuilder
FileStorage 使用文件存储
func (*SessionBuilder) KeepAlive ¶ added in v0.1.4
func (b *SessionBuilder) KeepAlive(seconds int) *SessionBuilder
KeepAlive 设置保活时间(秒)
func (*SessionBuilder) MessageChannelSize ¶ added in v0.1.4
func (b *SessionBuilder) MessageChannelSize(size int) *SessionBuilder
MessageChannelSize 设置消息通道大小
func (*SessionBuilder) MustBuild ¶ added in v0.1.4
func (b *SessionBuilder) MustBuild() *Options
MustBuild 构建配置选项,出错时panic(用于示例和测试)
func (*SessionBuilder) Performance ¶ added in v0.1.4
func (b *SessionBuilder) Performance(bufferSizeKB, maxPendingMsgs int) *SessionBuilder
Performance 性能调优配置
func (*SessionBuilder) Persistent ¶ added in v0.1.4
func (b *SessionBuilder) Persistent() *SessionBuilder
Persistent 启用会话持久化
func (*SessionBuilder) ReconnectConfig ¶ added in v0.1.4
func (b *SessionBuilder) ReconnectConfig(initialSec, maxSec int, backoffFactor float64) *SessionBuilder
ReconnectConfig 详细重连配置
func (*SessionBuilder) RedisAuth ¶ added in v0.1.4
func (b *SessionBuilder) RedisAuth(username, password string, db int) *SessionBuilder
RedisAuth 设置Redis认证
func (*SessionBuilder) RedisStorage ¶ added in v0.1.4
func (b *SessionBuilder) RedisStorage(addr string) *SessionBuilder
RedisStorage 使用Redis存储
func (*SessionBuilder) Reset ¶ added in v0.1.4
func (b *SessionBuilder) Reset(name string) *SessionBuilder
Reset 重置构建器(复用构建器)
func (*SessionBuilder) Subscribe ¶ added in v0.1.4
func (b *SessionBuilder) Subscribe(topic string, qos byte, handler MessageHandler) *SessionBuilder
Subscribe 添加预订阅主题
func (*SessionBuilder) TLS ¶ added in v0.1.4
func (b *SessionBuilder) TLS(caFile, certFile, keyFile string, skipVerify bool) *SessionBuilder
TLS 配置TLS安全连接
func (*SessionBuilder) Timeouts ¶ added in v0.1.4
func (b *SessionBuilder) Timeouts(connectSec, writeSec int) *SessionBuilder
Timeouts 设置连接和写入超时(秒)
func (*SessionBuilder) Validate ¶ added in v0.1.4
func (b *SessionBuilder) Validate() []error
Validate 快速验证当前配置
type SessionError ¶
SessionError 会话相关错误(保持向后兼容)
func (*SessionError) Error ¶
func (e *SessionError) Error() string
func (*SessionError) Unwrap ¶
func (e *SessionError) Unwrap() error
type SessionMetrics ¶
type SessionMetrics struct {
MessagesSent uint64 // 发送的消息数
MessagesReceived uint64 // 接收的消息数
BytesSent uint64 // 发送的字节数
BytesReceived uint64 // 接收的字节数
Errors uint64 // 错误计数
Reconnects uint64 // 重连次数
LastMessage int64 // 最后消息时间(Unix纳秒时间戳,原子操作安全)
LastError int64 // 最后错误时间(Unix纳秒时间戳,原子操作安全)
}
SessionMetrics 会话指标
func GetSessionMetrics ¶ added in v0.1.4
func GetSessionMetrics() *SessionMetrics
GetSessionMetrics 获取会话指标对象
type SessionState ¶
type SessionState struct {
Topics []TopicSubscription `json:"topics"` // 订阅的主题列表
Messages []*Message `json:"messages"` // 待处理的消息
LastSequence uint64 `json:"last_sequence"` // 最后序列号
LastConnected time.Time `json:"last_connected"` // 最后连接时间
LastDisconnected time.Time `json:"last_disconnected"` // 最后断开时间
QoSMessages map[uint16]*Message `json:"qos_messages"` // QoS > 0 的消息,按MessageID索引
RetainedMessages map[string]*Message `json:"retained_messages"` // 保留消息,按主题索引
ClientID string `json:"client_id"` // 客户端ID
SessionExpiry time.Time `json:"session_expiry"` // 会话过期时间
Version int `json:"version"` // 状态版本,用于兼容性
}
SessionState 会话状态
type SessionStore ¶
type SessionStore interface {
SaveState(sessionName string, state *SessionState) error
LoadState(sessionName string) (*SessionState, error)
DeleteState(sessionName string) error
}
SessionStore 会话存储接口
type StorageOptions ¶ added in v0.1.4
type StorageOptions struct {
// Type 存储类型: "memory", "file", 或 "redis"
Type StoreType
// Path 文件存储路径,仅在Type="file"时有效
Path string
// Redis Redis存储选项,仅在Type="redis"时有效
Redis *RedisOptions
}
StorageOptions 存储选项
func DefaultStorageOptions ¶ added in v0.1.4
func DefaultStorageOptions() *StorageOptions
DefaultStorageOptions 返回默认的存储选项
type TLSConfig ¶
type TLSConfig struct {
CAFile string // CA证书文件路径
CertFile string // 客户端证书文件路径
KeyFile string // 客户端密钥文件路径
SkipVerify bool // 是否跳过服务器证书验证
}
TLSConfig TLS配置
type TimerPool ¶ added in v0.1.4
type TimerPool struct {
// contains filtered or unexported fields
}
TimerPool 定时器对象池
type TopicConfig ¶
type TopicConfig struct {
Topic string // 主题名称
QoS byte // 服务质量等级
Handler MessageHandler // 消息处理函数
}
TopicConfig 主题配置
type TopicFilter ¶ added in v0.1.4
type TopicFilter struct {
// contains filtered or unexported fields
}
TopicFilter 基于主题的消息过滤器
func NewTopicFilter ¶ added in v0.1.4
func NewTopicFilter(pattern string, isRegex bool) (*TopicFilter, error)
NewTopicFilter 创建基于主题的消息过滤器 pattern可以是: 1. 精确匹配的主题名称(例如 "sensor/temperature") 2. 包含通配符的模式(例如 "sensor/#" 或 "sensor/+/temperature") 3. 正则表达式(使用isRegex=true)
func (*TopicFilter) GetDescription ¶ added in v0.1.4
func (f *TopicFilter) GetDescription() string
GetDescription 获取过滤器的描述
func (*TopicFilter) Match ¶ added in v0.1.4
func (f *TopicFilter) Match(message *Message) bool
Match 判断消息是否匹配主题过滤规则
type TopicRewriteTransformer ¶ added in v0.1.4
type TopicRewriteTransformer struct {
// contains filtered or unexported fields
}
TopicRewriteTransformer 主题重写转换器 用于将消息的主题按照一定规则进行重写
func NewTopicRewriteTransformer ¶ added in v0.1.4
func NewTopicRewriteTransformer(pattern, replacement string, isRegex bool) (*TopicRewriteTransformer, error)
NewTopicRewriteTransformer 创建主题重写转换器 如果isRegex=true,pattern将被视为正则表达式,replacement可以包含正则表达式捕获组引用 例如:pattern="sensor/([^/]+)/temp", replacement="device/$1/temperature"
func (*TopicRewriteTransformer) GetDescription ¶ added in v0.1.4
func (t *TopicRewriteTransformer) GetDescription() string
GetDescription 获取转换器的描述
type TopicSubscription ¶
type TopicSubscription struct {
Topic string `json:"topic"` // 主题名称
QoS byte `json:"qos"` // QoS 级别
}
TopicSubscription 主题订阅信息
type ValidationErrors ¶ added in v0.1.4
type ValidationErrors struct {
Errors []error `json:"errors"`
}
ValidationErrors 配置验证错误集合
func NewValidationErrors ¶ added in v0.1.4
func NewValidationErrors() *ValidationErrors
NewValidationErrors 创建验证错误集合
func (*ValidationErrors) Add ¶ added in v0.1.4
func (ve *ValidationErrors) Add(err error)
func (*ValidationErrors) Error ¶ added in v0.1.4
func (ve *ValidationErrors) Error() string
func (*ValidationErrors) HasErrors ¶ added in v0.1.4
func (ve *ValidationErrors) HasErrors() bool
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
advanced_client
command
|
|
|
advanced_routing
command
|
|
|
basic_client
command
|
|
|
benchmark
command
|
|
|
enhanced_tls
command
|
|
|
forwarder
command
|
|
|
mqtt5_auth
command
|
|
|
redis_storage
command
|