mqttx

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2025 License: MIT Imports: 22 Imported by: 0

README

MQTT X - High-Performance Multi-Session MQTT Client Library

PkgGoDev Go Report Card MIT License

🚀 Introduction

MQTT X is a high-performance multi-session MQTT client library designed for Go applications. With deep optimizations, it provides exceptional performance, clean APIs, and powerful features.

✨ Key Features

🏗️ Architecture Optimizations
  • Builder Pattern: Fluent API design that simplifies configuration
  • Object Pool Technology: Automatic memory management with 28.5x performance improvement and zero allocations
  • Atomic Operations: 4M+ atomic operations/sec ensuring concurrent safety
  • Unified Error Handling: Structured error types with enhanced error information quality
🎯 Functional Features
  • Multi-Session Management: Concurrent handling of multiple MQTT connections
  • Message Forwarding System: Cross-session and cross-topic message forwarding
  • Auto-Reconnection: Built-in exponential backoff reconnection strategy
  • TLS/SSL Support: Certificate-based secure communication
  • Session Persistence: Support for memory, file, and Redis storage
  • Real-time Monitoring: Detailed performance and health metrics
🔧 Technical Features
  • Thread-safe Design: All operations are concurrency-safe
  • Performance Monitoring: Built-in metrics collection and performance analysis
  • Flexible Configuration: Rich configuration options and tuning parameters
  • Error Recovery: Intelligent error detection and recovery mechanisms

Installation

go get github.com/darkit/mqttx

🚀 Quick Start

Basic Usage
package main

import (
    "log"
    "time"
    "github.com/darkit/mqttx"
)

func main() {
    // Create session manager
    manager := mqttx.NewSessionManager()
    defer manager.Close()

    // Use Builder pattern to create session
    opts, err := mqttx.QuickConnect("prod-device", "broker.example.com:1883").
        Auth("username", "password").
        KeepAlive(60).
        AutoReconnect().
        Build()
    if err != nil {
        log.Fatal(err)
    }

    // Add session and connect
    if err := manager.AddSession(opts); err != nil {
        log.Fatal(err)
    }

    if err := manager.ConnectAll(); err != nil {
        log.Fatal(err)
    }

    // Wait for connection to complete
    if err := manager.WaitForAllSessions(30 * time.Second); err != nil {
        log.Printf("Connection warnings: %v", err)
    }

    // Publish and subscribe messages
    session, _ := manager.GetSession("prod-device")
    
    // Subscribe to topic
    handler := func(topic string, payload []byte) {
        log.Printf("Received: %s = %s", topic, string(payload))
    }
    session.Subscribe("sensors/+/temperature", 1, handler)
    
    // Publish message
    session.Publish("sensors/room1/temperature", []byte("23.5"), 1, false)
    
    select {} // Keep running
}

📚 Core Concepts

Builder Pattern

MQTT X provides fluent APIs to simplify configuration:

// Quick connect
opts, err := mqttx.QuickConnect("session-name", "localhost:1883").Build()

// Secure connect
opts, err := mqttx.SecureConnect("secure-session", "ssl://broker:8883", "/path/to/ca.crt").
    Auth("user", "pass").
    KeepAlive(60).
    Build()

// Complex configuration
opts, err := mqttx.NewSessionBuilder("production-session").
    Brokers("tcp://broker1:1883", "tcp://broker2:1883").
    ClientID("client-001").
    Auth("admin", "secret").
    TLS("/etc/ssl/ca.crt", "/etc/ssl/client.crt", "/etc/ssl/client.key", false).
    Performance(16, 5000).
    RedisStorage("localhost:6379").
    Subscribe("sensors/+", 1, handler).
    Build()
Message Forwarding

Automatic message forwarding between sessions:

// Create forwarder
config, err := mqttx.NewForwarderBuilder("sensor-forwarder").
    Source("sensor-session", "sensors/+/temperature").
    Target("storage-session").
    QoS(1).
    MapTopic("sensors/room1/temperature", "storage/room1/temp").
    Build()

forwarder, err := mqttx.NewForwarder(config, manager)
forwarder.Start()
Error Handling

Unified error handling mechanism:

// Check error types
if mqttx.IsTemporary(err) {
    // Temporary error, can retry
    log.Printf("Temporary error: %v", err)
} else if mqttx.IsTimeout(err) {
    // Timeout error
    log.Printf("Timeout error: %v", err)
}

// Create custom error
err := mqttx.NewConnectionError("connection failed", originalErr).
    WithSession("my-session").
    WithContext("retry_count", 3)

📊 Performance Metrics

MQTT X performance on standard hardware:

  • Message Throughput: 100K+ messages/sec
  • Metric Operations: 4M+ atomic operations/sec
  • Object Pool Optimization: 28.5x performance improvement
  • Memory Efficiency: < 5 bytes per metric object
  • Forwarder Performance: 500K+ lifecycles/sec
Performance Monitoring
// Global metrics
globalMetrics := manager.GetMetrics()
log.Printf("Total messages: %d, Errors: %d", 
    globalMetrics.TotalMessages, globalMetrics.ErrorCount)

// Session metrics
sessionMetrics := session.GetMetrics()
log.Printf("Sent: %d, Received: %d", 
    sessionMetrics.MessagesSent, sessionMetrics.MessagesReceived)

// Forwarder metrics
forwarderMetrics := forwarder.GetMetrics()
log.Printf("Forwarded: %d, Dropped: %d", 
    forwarderMetrics.MessagesSent, forwarderMetrics.MessagesDropped)

Core Components

Session Manager

The session manager (Manager) is the central component that handles multiple MQTT sessions:

// Create a new manager
m := manager.NewSessionManager()

// Add a session
err := m.AddSession(&manager.Options{...})

// Get session status
status := m.GetAllSessionsStatus()

// Remove a session
err := m.RemoveSession("session-name")

// List all sessions
sessions := m.ListSessions()
Connection Management

The manager provides connection waiting mechanisms to ensure sessions are ready before operations:

// Wait for a specific session to connect
err := m.AddSession(opts)
if err != nil {
    log.Fatal(err)
}

// Wait up to 30 seconds for session to be ready
if err := m.WaitForSession("prod-device", 30*time.Second); err != nil {
    log.Fatal(err)
}

// Or wait for all sessions to be ready
if err := m.WaitForAllSessions(30*time.Second); err != nil {
    log.Fatal(err)
}
Message Handling

Four flexible message handling patterns are available:

  1. Handle - Global callback-based handling:
route := m.Handle("topic/#", func(msg *manager.Message) {
    log.Printf("Received: %s", msg.PayloadString())
})
defer route.Stop()
  1. HandleTo - Session-specific callback handling:
route, err := m.HandleTo("session-name", "topic/#", func(msg *manager.Message) {
    log.Printf("Received on session: %s", msg.PayloadString())
})
defer route.Stop()
  1. Listen - Channel-based message reception:
messages, route := m.Listen("topic/#")
go func() {
    for msg := range messages {
        log.Printf("Received: %s", msg.PayloadString())
    }
}()
defer route.Stop()
  1. ListenTo - Session-specific channel reception:
messages, route, err := m.ListenTo("session-name", "topic/#")
go func() {
    for msg := range messages {
        log.Printf("Received: %s", msg.PayloadString())
    }
}()
defer route.Stop()
Message Forwarder

The message forwarder allows automatic message forwarding between different sessions and topics, with support for filtering, transformation, and metadata injection:

// Create forwarder manager
forwarderManager := mqttx.NewForwarderManager(manager)

// Configure forwarder
forwarderConfig := mqttx.ForwarderConfig{
    Name:           "temperature-forwarder",
    SourceSessions: []string{"source-session1", "source-session2"},
    SourceTopics:   []string{"sensors/+/temperature"},
    TargetSession:  "target-session",
    TopicMapping:   map[string]string{
        "sensors/living-room/temperature": "processed/temperature/living-room",
    },
    QoS:            1,
    Metadata: map[string]interface{}{
        "forwarded_by": "temperature-forwarder",
        "timestamp":    time.Now().Unix(),
    },
    Enabled:        true,
}

// Add and start forwarder
forwarder, err := forwarderManager.AddForwarder(forwarderConfig)
if err != nil {
    log.Fatal(err)
}

// Get forwarder metrics
metrics := forwarder.GetMetrics()
log.Printf("Messages forwarded: %d", metrics["messages_forwarded"])

// Stop all forwarders
forwarderManager.StopAll()

The forwarder supports the following features:

  1. Multi-source Forwarding - Subscribe to messages from multiple sessions
  2. Topic Mapping - Map source topics to different target topics
  3. Message Filtering - Filter messages based on topic or content
  4. Message Transformation - Transform message content before forwarding
  5. Metadata Injection - Add metadata to forwarded messages
  6. Performance Metrics - Provide detailed forwarding statistics
Event System

Monitor session lifecycle and state changes with detailed event information:

// Monitor connection status
m.OnEvent("session_ready", func(event manager.Event) {
    log.Printf("Session %s is ready for operations", event.Session)
})

// Monitor state changes
m.OnEvent("session_state_changed", func(event manager.Event) {
    stateData := event.Data.(map[string]interface{})
    log.Printf("Session %s state changed from %v to %v",
        event.Session,
        stateData["old_state"],
        stateData["new_state"])
})

Available Events:

  • session_connecting - Session is attempting to connect
  • session_connected - Session has successfully connected
  • session_ready - Session is ready for operations
  • session_disconnected - Session has disconnected (includes error info if any)
  • session_reconnecting - Session is attempting to reconnect
  • session_added - New session has been added to the manager
  • session_removed - Session has been removed from the manager
  • session_state_changed - Session state has changed

Event Data Structure:

type Event struct {
    Type      string      // Event type
    Session   string      // Session name
    Data      interface{} // Additional event data
    Timestamp time.Time   // Event timestamp
}

Common Event Data Contents:

  • session_connected: Connection details
  • session_disconnected: Error information (if any)
  • session_state_changed: Map containing "old_state" and "new_state"
  • session_reconnecting: Reconnection attempt count
  • session_ready: Session configuration summary
Advanced Configuration
TLS Security
opts := &manager.Options{
    Name:     "secure-session",
    Brokers:  []string{"ssl://broker.example.com:8883"},
    ClientID: "secure-client-001",
    TLS: &manager.TLSConfig{
        CAFile:     "/path/to/ca.crt",
        CertFile:   "/path/to/client.crt",
        KeyFile:    "/path/to/client.key",
        SkipVerify: false,
    },
}
Performance Tuning
opts := &manager.Options{
    Performance: &manager.PerformanceOptions{
        WriteBufferSize:    4096,
        ReadBufferSize:     4096,
        MessageChanSize:    1000,
        MaxMessageSize:     32 * 1024,
        MaxPendingMessages: 5000,
        WriteTimeout:       time.Second * 30,
        ReadTimeout:        time.Second * 30,
    },
}
Session Persistence
opts := &manager.Options{
    ConnectProps: &manager.ConnectProps{
        PersistentSession: true,
        ResumeSubs:       true,
    },
}
Metrics Collection

Monitor session and manager performance:

// Get manager-level metrics
metrics := m.GetMetrics()

// Get session-specific metrics
session, _ := m.GetSession("session-name")
sessionMetrics := session.GetMetrics()

// Get all forwarder metrics
forwarderMetrics := forwarderManager.GetAllMetrics()
Prometheus Integration

Expose metrics in Prometheus format via HTTP endpoint:

// Create HTTP server to expose Prometheus metrics
go func() {
    promExporter := manager.NewPrometheusExporter("mqtt")
    
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        var output strings.Builder
        
        // Collect manager metrics
        metrics := m.GetMetrics()
        output.WriteString(promExporter.Export(metrics))
        
        // Collect all session metrics
        for _, name := range m.ListSessions() {
            if session, err := m.GetSession(name); err == nil {
                output.WriteString(session.PrometheusMetrics())
            }
        }
        
        w.Header().Set("Content-Type", "text/plain")
        fmt.Fprint(w, output.String())
    })
    
    log.Printf("Starting metrics server on :2112")
    http.ListenAndServe(":2112", nil)
}()

Add scrape target in Prometheus configuration:

scrape_configs:
  - job_name: 'mqtt_metrics'
    static_configs:
      - targets: ['localhost:2112']
    scrape_interval: 15s

Available Prometheus metrics include:

Message Metrics:

  • mqtt_session_messages_sent_total - Total messages sent
  • mqtt_session_messages_received_total - Total messages received
  • mqtt_session_bytes_sent_total - Total bytes sent
  • mqtt_session_bytes_received_total - Total bytes received
  • mqtt_session_message_rate - Current messages per second
  • mqtt_session_avg_message_rate - Average messages per second since start
  • mqtt_session_bytes_rate - Bytes per second

Status Metrics:

  • mqtt_session_connected - Session connection status (0/1)
  • mqtt_session_status - Session status code
  • mqtt_session_subscriptions - Active subscription count
  • mqtt_session_errors_total - Total error count
  • mqtt_session_reconnects_total - Reconnection attempts

Timestamp Metrics:

  • mqtt_session_last_message_timestamp_seconds - Unix timestamp of last message
  • mqtt_session_last_error_timestamp_seconds - Unix timestamp of last error

Session Properties:

  • mqtt_session_persistent - Persistent session flag (0/1)
  • mqtt_session_clean_session - Clean session flag (0/1)
  • mqtt_session_auto_reconnect - Auto reconnect flag (0/1)

All metrics include a session="session-name" label for filtering and aggregation by session.

🔧 Advanced Features

Session Persistence

Support for multiple storage backends:

// Memory storage (default, fastest)
opts := mqttx.NewSessionBuilder("memory-session").
    Broker("localhost:1883").
    Build()

// File storage
opts := mqttx.NewSessionBuilder("file-session").
    Broker("localhost:1883").
    FileStorage("/var/lib/mqttx").
    Build()

// Redis storage
opts := mqttx.NewSessionBuilder("redis-session").
    Broker("localhost:1883").
    RedisStorage("localhost:6379").
    RedisAuth("user", "pass", 1).
    Build()
Performance Tuning
// High-performance configuration
opts := mqttx.NewSessionBuilder("high-perf").
    Broker("localhost:1883").
    Performance(32, 10000).      // 32KB buffer, 10K pending messages
    MessageChannelSize(2000).    // 2K message channel
    KeepAlive(300).             // 5-minute keepalive
    Timeouts(10, 5).            // 10s connect, 5s write timeout
    Build()
TLS Security
// Secure connection
opts := mqttx.SecureConnect("secure-session", "ssl://broker:8883", "/path/to/ca.crt").
    Auth("username", "password").
    TLS("/path/to/ca.crt", "/path/to/client.crt", "/path/to/client.key", false).
    Build()

🧪 Testing

# Run all tests
go test ./...

# Run benchmarks
go test -bench=. -benchmem

# Run race condition tests
go test -race ./...

# Performance tests
go test -run TestPerformanceImprovement -v

Best Practices

  1. Resource Management

    • Always use defer route.Stop() for subscription cleanup
    • Implement proper error handling
    • Use meaningful session names and client IDs
  2. Performance Optimization

    • Configure appropriate buffer sizes for your use case
    • Use session-specific subscriptions (HandleTo/ListenTo) when possible
    • Monitor metrics to identify bottlenecks
    • Compare current and average message rates to identify traffic patterns
    • Use metrics data for capacity planning and performance tuning
  3. Reliability

    • Enable automatic reconnection for production use
    • Implement proper error handling and retry mechanisms
    • Use QoS levels appropriate for your use case
  4. Security

    • Enable TLS in production environments
    • Use strong client authentication
    • Regularly rotate credentials
  5. Forwarder Usage

    • Set appropriate buffer sizes for forwarders to avoid message loss
    • Use filters to reduce unnecessary message forwarding
    • Monitor forwarder metrics to detect issues early
    • Design appropriate topic mapping strategies for complex scenarios

📖 Documentation

  • GoDoc - Complete API reference

🤝 Contributing

We welcome Issues and Pull Requests! Please ensure:

  1. Code passes all tests
  2. Follow existing code style
  3. Add necessary test cases
  4. Update relevant documentation

📄 License

This project is licensed under the MIT License.

🏆 Acknowledgments

Thanks to the following projects for inspiration and support:


MQTT X - Making MQTT client development simpler and more efficient!

Documentation

Index

Constants

View Source
const (
	MQTT311 = 4 // MQTT 3.1.1
	MQTT50  = 5 // MQTT 5.0
)

MQTT协议版本常量

View Source
const (
	PropPayloadFormatIndicator      = 1  // 载荷格式指示器
	PropMessageExpiryInterval       = 2  // 消息过期间隔
	PropContentType                 = 3  // 内容类型
	PropResponseTopic               = 8  // 响应主题
	PropCorrelationData             = 9  // 关联数据
	PropSubscriptionIdentifier      = 11 // 订阅标识符
	PropSessionExpiryInterval       = 17 // 会话过期间隔
	PropAssignedClientIdentifier    = 18 // 分配的客户端标识符
	PropServerKeepAlive             = 19 // 服务端保活时间
	PropAuthenticationMethod        = 21 // 认证方法
	PropAuthenticationData          = 22 // 认证数据
	PropRequestProblemInformation   = 23 // 请求问题信息
	PropWillDelayInterval           = 24 // 遗嘱延迟间隔
	PropRequestResponseInformation  = 25 // 请求响应信息
	PropResponseInformation         = 26 // 响应信息
	PropServerReference             = 28 // 服务端引用
	PropReasonString                = 31 // 原因字符串
	PropReceiveMaximum              = 33 // 接收最大值
	PropTopicAliasMaximum           = 34 // 主题别名最大值
	PropTopicAlias                  = 35 // 主题别名
	PropMaximumQoS                  = 36 // 最大QoS
	PropRetainAvailable             = 37 // 保留可用
	PropUserProperty                = 38 // 用户属性
	PropMaximumPacketSize           = 39 // 最大报文大小
	PropWildcardSubscriptionAvail   = 40 // 通配符订阅可用
	PropSubscriptionIdentifierAvail = 41 // 订阅标识符可用
	PropSharedSubscriptionAvail     = 42 // 共享订阅可用
)

MQTT 5.0 属性标识符常量

View Source
const (
	ReasonSuccess                             = 0   // 成功
	ReasonNormalDisconnection                 = 0   // 正常断开连接
	ReasonGrantedQoS0                         = 0   // 授予QoS 0
	ReasonGrantedQoS1                         = 1   // 授予QoS 1
	ReasonGrantedQoS2                         = 2   // 授予QoS 2
	ReasonDisconnectWithWill                  = 4   // 带遗嘱消息断开连接
	ReasonNoMatchingSubscribers               = 16  // 没有匹配的订阅者
	ReasonNoSubscriptionExisted               = 17  // 没有存在的订阅
	ReasonContinueAuthentication              = 24  // 继续认证
	ReasonReAuthenticate                      = 25  // 重新认证
	ReasonUnspecifiedError                    = 128 // 未指定错误
	ReasonMalformedPacket                     = 129 // 报文格式错误
	ReasonProtocolError                       = 130 // 协议错误
	ReasonImplementationSpecificError         = 131 // 实现特定错误
	ReasonUnsupportedProtocolVersion          = 132 // 不支持的协议版本
	ReasonClientIdentifierNotValid            = 133 // 客户端标识符无效
	ReasonBadUsernameOrPassword               = 134 // 用户名或密码错误
	ReasonNotAuthorized                       = 135 // 未授权
	ReasonServerUnavailable                   = 136 // 服务端不可用
	ReasonServerBusy                          = 137 // 服务端繁忙
	ReasonBanned                              = 138 // 禁止
	ReasonServerShuttingDown                  = 139 // 服务端关闭
	ReasonBadAuthenticationMethod             = 140 // 认证方法错误
	ReasonKeepAliveTimeout                    = 141 // 保活超时
	ReasonSessionTakenOver                    = 142 // 会话被接管
	ReasonTopicFilterInvalid                  = 143 // 主题过滤器无效
	ReasonTopicNameInvalid                    = 144 // 主题名无效
	ReasonPacketIdentifierInUse               = 145 // 报文标识符已使用
	ReasonPacketIdentifierNotFound            = 146 // 报文标识符未找到
	ReasonReceiveMaximumExceeded              = 147 // 超出接收最大值
	ReasonTopicAliasInvalid                   = 148 // 主题别名无效
	ReasonPacketTooLarge                      = 149 // 报文过大
	ReasonMessageRateTooHigh                  = 150 // 消息速率过高
	ReasonQuotaExceeded                       = 151 // 超出配额
	ReasonAdministrativeAction                = 152 // 管理操作
	ReasonPayloadFormatInvalid                = 153 // 载荷格式无效
	ReasonRetainNotSupported                  = 154 // 不支持保留
	ReasonQoSNotSupported                     = 155 // 不支持QoS
	ReasonUseAnotherServer                    = 156 // 使用另一个服务端
	ReasonServerMoved                         = 157 // 服务端已移动
	ReasonSharedSubscriptionsNotSupported     = 158 // 不支持共享订阅
	ReasonConnectionRateExceeded              = 159 // 超出连接速率
	ReasonMaximumConnectTime                  = 160 // 最大连接时间
	ReasonSubscriptionIdentifiersNotSupported = 161 // 不支持订阅标识符
	ReasonWildcardSubscriptionsNotSupported   = 162 // 不支持通配符订阅
)

MQTT 5.0 原因码常量

View Source
const (
	// 标准认证方法
	AuthMethodPlain     = "PLAIN"      // 明文认证
	AuthMethodSHA256    = "SHA-256"    // SHA-256哈希认证
	AuthMethodScramSHA1 = "SCRAM-SHA1" // SCRAM-SHA1认证
	// OAuth 2.0认证
	AuthMethodOAuth2 = "oauth2" // OAuth 2.0认证
	// JWT认证
	AuthMethodJWT = "jwt" // JWT认证
)

MQTT 5.0 认证方法常量

View Source
const (
	AuthStateNone       = iota // 未认证
	AuthStateInProgress        // 认证进行中
	AuthStateSuccess           // 认证成功
	AuthStateFailed            // 认证失败
)

MQTT 5.0 认证状态常量

View Source
const (
	CertFormatPEM = "pem" // PEM格式证书
	CertFormatDER = "der" // DER格式证书
	CertFormatP12 = "p12" // PKCS#12格式证书
	CertFormatJKS = "jks" // Java KeyStore格式
	CertFormatPFX = "pfx" // PFX格式证书(PKCS#12的别名)
)

证书格式类型

View Source
const (
	DefaultKeepAlive                = 60                // 默认保活时间(秒)
	DefaultConnectTimeout           = 30 * time.Second  // 默认连接超时时间
	DefaultInitialReconnectInterval = 2 * time.Second   // 默认初始重连间隔
	DefaultMaxReconnectInterval     = 120 * time.Second // 默认最大重连间隔
	DefaultBackoffFactor            = 1.5               // 默认退避因子
	DefaultWriteTimeout             = 30 * time.Second  // 默认写超时时间
	DefaultMessageChanSize          = 100               // 默认消息通道大小
	DefaultMaxMessageSize           = 32 * 1024         // 默认最大消息大小(32KB)
	DefaultMaxPendingMessages       = 1000              // 默认最大待处理消息数
	DefaultWriteBufferSize          = 4096              // 默认写缓冲区大小
	DefaultReadBufferSize           = 4096              // 默认读缓冲区大小
)

默认配置常量

View Source
const (
	StatusDisconnected uint32 = 0 // 已断开连接
	StatusConnecting   uint32 = 1 // 正在连接
	StatusConnected    uint32 = 2 // 已连接
	StatusReconnecting uint32 = 3 // 正在重连
	StatusClosed       uint32 = 4 // 已关闭
)

会话状态常量 - 供外部代码使用

View Source
const (
	EventSessionConnecting   = "session_connecting"    // 会话正在连接
	EventSessionConnected    = "session_connected"     // 会话已连接
	EventSessionDisconnected = "session_disconnected"  // 会话已断开
	EventSessionReconnecting = "session_reconnecting"  // 会话重连中
	EventSessionAdded        = "session_added"         // 会话已添加
	EventSessionRemoved      = "session_removed"       // 会话已移除
	EventSessionReady        = "session_ready"         // 会话准备就绪(已成功连接)
	EventStateChanged        = "session_state_changed" // 会话状态已改变
)

事件类型常量

Variables

View Source
var (
	ErrSessionNotFound      = errors.New("session not found")
	ErrSessionExists        = errors.New("session already exists")
	ErrNotConnected         = errors.New("not connected")
	ErrSessionClosed        = errors.New("session closed")
	ErrInvalidOptions       = errors.New("invalid options")
	ErrInvalidBroker        = errors.New("invalid broker address")
	ErrInvalidClientID      = errors.New("invalid client ID")
	ErrInvalidTopic         = errors.New("invalid topic")
	ErrTimeout              = errors.New("operation timeout")
	ErrInvalidPayload       = errors.New("invalid payload")
	ErrSubscribeFailed      = errors.New("subscribe failed")
	ErrUnsubscribeFailed    = errors.New("unsubscribe failed")
	ErrPublishFailed        = errors.New("publish failed")
	ErrQoSNotSupported      = errors.New("QoS level not supported")
	ErrStorageFailure       = errors.New("storage operation failed")
	ErrInvalidConfig        = errors.New("invalid configuration")
	ErrConnectionRefused    = errors.New("connection refused")
	ErrAuthenticationFailed = errors.New("authentication failed")
	ErrForwarderFailed      = errors.New("forwarder operation failed")
)

基础错误定义

Functions

func ChainErrors added in v0.1.4

func ChainErrors(errors ...error) error

ChainErrors 链接多个错误

func CountErrorsBySeverity added in v0.1.4

func CountErrorsBySeverity(errors []error) map[ErrorSeverity]int

CountErrorsBySeverity 按严重程度统计错误

func FormatErrorSummary added in v0.1.4

func FormatErrorSummary(errors []error) string

FormatErrorSummary 格式化错误摘要

func GetErrorCode added in v0.1.4

func GetErrorCode(err error) string

GetErrorCode 获取错误代码

func HasCriticalErrors added in v0.1.4

func HasCriticalErrors(errors []error) bool

HasCriticalErrors 检查是否有严重错误

func IsTemporary added in v0.1.4

func IsTemporary(err error) bool

IsTemporary 判断是否为临时错误(可重试)

func IsTimeout added in v0.1.4

func IsTimeout(err error) bool

IsTimeout 判断是否为超时错误

func PutBuffer added in v0.1.4

func PutBuffer(buf *PooledBuffer)

PutBuffer 将缓冲区返回对应的池中

func PutMessage added in v0.1.4

func PutMessage(msg *PooledMessage)

PutMessage 将消息对象返回池中

func PutMetrics added in v0.1.4

func PutMetrics(metrics *Metrics)

PutMetrics 归还指标对象

func PutSessionMetrics added in v0.1.4

func PutSessionMetrics(metrics *SessionMetrics)

PutSessionMetrics 归还会话指标对象

func PutTimer added in v0.1.4

func PutTimer(timer *PooledTimer)

PutTimer 归还定时器

func ResetPoolStats added in v0.1.4

func ResetPoolStats()

ResetPoolStats 重置池统计

func StatusToString added in v0.1.4

func StatusToString(status uint32) string

StatusToString 将状态常量转换为状态字符串

func TopicMatch added in v0.1.4

func TopicMatch(filter, topic string) bool

TopicMatch 判断一个主题是否与主题过滤器匹配 支持MQTT的主题匹配规则: - "+" 匹配单个层级 - "#" 匹配多个层级,但必须在最后

Types

type AdvancedRoute added in v0.1.4

type AdvancedRoute struct {
	Route
	// contains filtered or unexported fields
}

AdvancedRoute 高级路由配置 支持消息过滤和转换

func (*AdvancedRoute) AddFilter added in v0.1.4

func (r *AdvancedRoute) AddFilter(filter MessageFilter)

AddFilter 添加消息过滤器

func (*AdvancedRoute) AddTransformer added in v0.1.4

func (r *AdvancedRoute) AddTransformer(transformer MessageTransformer)

AddTransformer 添加消息转换器

type AuthError added in v0.1.4

type AuthError struct {
	Method  string
	Message string
	Reason  error
}

AuthError 认证错误

func (*AuthError) Error added in v0.1.4

func (e *AuthError) Error() string

Error 实现error接口

func (*AuthError) Unwrap added in v0.1.4

func (e *AuthError) Unwrap() error

Unwrap 返回底层错误

type AuthProvider added in v0.1.4

type AuthProvider interface {
	// 认证方法名称
	Method() string

	// 初始化认证过程
	// 返回初始认证数据和可能的错误
	Initialize() ([]byte, error)

	// 处理认证挑战
	// data为服务器返回的认证数据
	// 返回响应数据和可能的错误
	HandleChallenge(data []byte) ([]byte, error)

	// 完成认证过程
	// data为服务器返回的最终认证数据
	// 返回是否认证成功和可能的错误
	Complete(data []byte) (bool, error)
}

AuthProvider 认证提供者接口 用于实现不同的认证机制

type BufferPool added in v0.1.4

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool 字节缓冲区对象池

type CertWatcher added in v0.1.4

type CertWatcher struct {
	// contains filtered or unexported fields
}

CertWatcher 证书监视器 用于监控证书文件变化和过期时间

type CompositeFilter added in v0.1.4

type CompositeFilter struct {
	// contains filtered or unexported fields
}

CompositeFilter 组合过滤器,可以组合多个过滤器形成复杂的过滤规则

func NewAndFilter added in v0.1.4

func NewAndFilter(filters ...MessageFilter) *CompositeFilter

NewAndFilter 创建一个AND复合过滤器,所有子过滤器都必须匹配

func NewOrFilter added in v0.1.4

func NewOrFilter(filters ...MessageFilter) *CompositeFilter

NewOrFilter 创建一个OR复合过滤器,任一子过滤器匹配即可

func (*CompositeFilter) GetDescription added in v0.1.4

func (f *CompositeFilter) GetDescription() string

GetDescription 获取过滤器的描述

func (*CompositeFilter) Match added in v0.1.4

func (f *CompositeFilter) Match(message *Message) bool

Match 判断消息是否匹配复合过滤规则

type CompositeTransformer added in v0.1.4

type CompositeTransformer struct {
	// contains filtered or unexported fields
}

CompositeTransformer 组合转换器 可以将多个转换器组合在一起,按顺序应用

func NewCompositeTransformer added in v0.1.4

func NewCompositeTransformer(description string, transformers ...MessageTransformer) *CompositeTransformer

NewCompositeTransformer 创建组合转换器

func (*CompositeTransformer) GetDescription added in v0.1.4

func (t *CompositeTransformer) GetDescription() string

GetDescription 获取转换器的描述

func (*CompositeTransformer) Transform added in v0.1.4

func (t *CompositeTransformer) Transform(message *Message) (*Message, error)

Transform 按顺序应用所有转换器

type ConnectHandler

type ConnectHandler func()

ConnectHandler 连接处理函数类型

type ConnectLostHandler

type ConnectLostHandler func(err error)

ConnectLostHandler 连接断开处理函数类型

type ConnectProps

type ConnectProps struct {
	KeepAlive                int           // 保活时间(秒)
	CleanSession             bool          // 是否清理会话
	AutoReconnect            bool          // 是否自动重连
	ConnectTimeout           time.Duration // 连接超时时间
	InitialReconnectInterval time.Duration // 初始重连间隔
	MaxReconnectInterval     time.Duration // 最大重连间隔
	BackoffFactor            float64       // 退避因子
	WriteTimeout             time.Duration // 写超时时间
	ResumeSubs               bool          // 是否恢复订阅
	PersistentSession        bool          // 是否持久化会话
}

ConnectProps 连接属性

type DefaultErrorHandler added in v0.1.4

type DefaultErrorHandler struct {
	// contains filtered or unexported fields
}

DefaultErrorHandler 默认错误处理器

func NewDefaultErrorHandler added in v0.1.4

func NewDefaultErrorHandler(logger *slog.Logger) *DefaultErrorHandler

NewDefaultErrorHandler 创建默认错误处理器

func (*DefaultErrorHandler) HandleError added in v0.1.4

func (h *DefaultErrorHandler) HandleError(ctx context.Context, err error) bool

HandleError 处理错误

type DefaultLogger added in v0.1.4

type DefaultLogger struct {
	// contains filtered or unexported fields
}

DefaultLogger 是默认的日志实现

func NewDefaultLogger added in v0.1.4

func NewDefaultLogger() *DefaultLogger

NewDefaultLogger 创建一个默认的日志实现

func (*DefaultLogger) Debug added in v0.1.4

func (l *DefaultLogger) Debug(msg string, args ...interface{})

Debug 记录Debug级别日志

func (*DefaultLogger) Debugf added in v0.1.4

func (l *DefaultLogger) Debugf(format string, args ...interface{})

Debugf 记录Debug级别格式化日志

func (*DefaultLogger) Error added in v0.1.4

func (l *DefaultLogger) Error(msg string, args ...interface{})

Error 记录Error级别日志

func (*DefaultLogger) Errorf added in v0.1.4

func (l *DefaultLogger) Errorf(format string, args ...interface{})

Errorf 记录Error级别格式化日志

func (*DefaultLogger) Info added in v0.1.4

func (l *DefaultLogger) Info(msg string, args ...interface{})

Info 记录Info级别日志

func (*DefaultLogger) Infof added in v0.1.4

func (l *DefaultLogger) Infof(format string, args ...interface{})

Infof 记录Info级别格式化日志

func (*DefaultLogger) Warn added in v0.1.4

func (l *DefaultLogger) Warn(msg string, args ...interface{})

Warn 记录Warn级别日志

func (*DefaultLogger) Warnf added in v0.1.4

func (l *DefaultLogger) Warnf(format string, args ...interface{})

Warnf 记录Warn级别格式化日志

type EnhancedTLSConfig added in v0.1.4

type EnhancedTLSConfig struct {
	// 基本TLS配置
	CAFile     string // CA证书文件路径
	CertFile   string // 客户端证书文件路径
	KeyFile    string // 客户端密钥文件路径
	SkipVerify bool   // 是否跳过服务器证书验证

	// 增强配置
	CAFormat         string        // CA证书格式
	CertFormat       string        // 客户端证书格式
	KeyFormat        string        // 客户端密钥格式
	KeyPassword      string        // 私钥密码
	CertPassword     string        // 证书密码
	MinVersion       uint16        // 最低TLS版本
	MaxVersion       uint16        // 最高TLS版本
	CipherSuites     []uint16      // 加密套件列表
	CurvePreferences []tls.CurveID // 曲线偏好

	// 证书轮换配置
	AutoReload        bool          // 是否自动重新加载证书
	ReloadInterval    time.Duration // 重新加载间隔
	CertExpireWarning time.Duration // 证书过期警告时间
	// contains filtered or unexported fields
}

EnhancedTLSConfig 增强的TLS配置

func NewEnhancedTLSConfig added in v0.1.4

func NewEnhancedTLSConfig() *EnhancedTLSConfig

NewEnhancedTLSConfig 创建增强的TLS配置

func (*EnhancedTLSConfig) BuildTLSConfig added in v0.1.4

func (c *EnhancedTLSConfig) BuildTLSConfig() (*tls.Config, error)

BuildTLSConfig 构建TLS配置

func (*EnhancedTLSConfig) GetCertExpiry added in v0.1.4

func (c *EnhancedTLSConfig) GetCertExpiry() time.Time

GetCertExpiry 获取证书过期时间

func (*EnhancedTLSConfig) IsExpiringSoon added in v0.1.4

func (c *EnhancedTLSConfig) IsExpiringSoon() bool

IsExpiringSoon 检查证书是否即将过期

func (*EnhancedTLSConfig) ReloadCertificates added in v0.1.4

func (c *EnhancedTLSConfig) ReloadCertificates() error

ReloadCertificates 重新加载证书

func (*EnhancedTLSConfig) StartCertWatcher added in v0.1.4

func (c *EnhancedTLSConfig) StartCertWatcher(logger Logger, reloadFunc func() error) error

StartCertWatcher 启动证书监视器

func (*EnhancedTLSConfig) StopCertWatcher added in v0.1.4

func (c *EnhancedTLSConfig) StopCertWatcher()

StopCertWatcher 停止证书监视器

func (*EnhancedTLSConfig) WithAutoReload added in v0.1.4

func (c *EnhancedTLSConfig) WithAutoReload(autoReload bool, reloadInterval time.Duration) *EnhancedTLSConfig

WithAutoReload 设置自动重新加载

func (*EnhancedTLSConfig) WithCACert added in v0.1.4

func (c *EnhancedTLSConfig) WithCACert(caFile, format string) *EnhancedTLSConfig

WithCACert 设置CA证书

func (*EnhancedTLSConfig) WithCertExpireWarning added in v0.1.4

func (c *EnhancedTLSConfig) WithCertExpireWarning(warning time.Duration) *EnhancedTLSConfig

WithCertExpireWarning 设置证书过期警告时间

func (*EnhancedTLSConfig) WithCipherSuites added in v0.1.4

func (c *EnhancedTLSConfig) WithCipherSuites(cipherSuites []uint16) *EnhancedTLSConfig

WithCipherSuites 设置加密套件

func (*EnhancedTLSConfig) WithClientCert added in v0.1.4

func (c *EnhancedTLSConfig) WithClientCert(certFile, keyFile, certFormat, keyFormat string) *EnhancedTLSConfig

WithClientCert 设置客户端证书

func (*EnhancedTLSConfig) WithCurvePreferences added in v0.1.4

func (c *EnhancedTLSConfig) WithCurvePreferences(curves []tls.CurveID) *EnhancedTLSConfig

WithCurvePreferences 设置曲线偏好

func (*EnhancedTLSConfig) WithPassword added in v0.1.4

func (c *EnhancedTLSConfig) WithPassword(certPassword, keyPassword string) *EnhancedTLSConfig

WithPassword 设置证书和私钥密码

func (*EnhancedTLSConfig) WithTLSVersion added in v0.1.4

func (c *EnhancedTLSConfig) WithTLSVersion(minVersion, maxVersion uint16) *EnhancedTLSConfig

WithTLSVersion 设置TLS版本范围

type ErrorCategory added in v0.1.4

type ErrorCategory string

ErrorCategory 错误类别

const (
	// CategoryConnection 连接相关错误
	CategoryConnection ErrorCategory = "connection"
	// CategorySubscription 订阅相关错误
	CategorySubscription ErrorCategory = "subscription"
	// CategoryPublish 发布相关错误
	CategoryPublish ErrorCategory = "publish"
	// CategoryAuthentication 认证相关错误
	CategoryAuthentication ErrorCategory = "authentication"
	// CategoryInternal 内部错误
	CategoryInternal ErrorCategory = "internal"
)

type ErrorHandler added in v0.1.4

type ErrorHandler interface {
	HandleError(ctx context.Context, err error) bool // 返回true表示错误已处理,false表示需要继续传播
}

ErrorHandler 错误处理器接口

type ErrorInfo added in v0.1.4

type ErrorInfo struct {
	Error      error            // 原始错误
	Time       time.Time        // 错误发生时间
	Session    string           // 相关会话
	Category   ErrorCategory    // 错误类别
	Severity   ErrorSeverity    // 错误严重程度
	Strategy   RecoveryStrategy // 恢复策略
	RetryCount int              // 重试次数
	Recovered  bool             // 是否已恢复
}

ErrorInfo 错误信息

type ErrorMetrics added in v0.1.4

type ErrorMetrics struct {
	TotalErrors      uint64            `json:"total_errors"`
	ErrorsByType     map[string]uint64 `json:"errors_by_type"`
	ErrorsBySeverity map[string]uint64 `json:"errors_by_severity"`
	LastError        string            `json:"last_error"`
	LastErrorTime    int64             `json:"last_error_time"`
}

ErrorMetrics 错误度量收集器

func NewErrorMetrics added in v0.1.4

func NewErrorMetrics() *ErrorMetrics

NewErrorMetrics 创建错误度量收集器

func (*ErrorMetrics) GetSnapshot added in v0.1.4

func (m *ErrorMetrics) GetSnapshot() map[string]interface{}

GetSnapshot 获取度量快照

func (*ErrorMetrics) RecordError added in v0.1.4

func (m *ErrorMetrics) RecordError(err error)

RecordError 记录错误

func (*ErrorMetrics) Reset added in v0.1.4

func (m *ErrorMetrics) Reset()

Reset 重置度量

type ErrorReporter added in v0.1.4

type ErrorReporter interface {
	ReportError(err error)
	GetErrorStats() map[string]interface{}
}

ErrorReporter 错误报告接口

type ErrorSeverity added in v0.1.4

type ErrorSeverity string

ErrorSeverity 错误严重程度

const (
	SeverityInfo     ErrorSeverity = "info"     // 信息级别(可忽略)
	SeverityWarning  ErrorSeverity = "warning"  // 警告级别(需关注)
	SeverityError    ErrorSeverity = "error"    // 错误级别(需处理)
	SeverityCritical ErrorSeverity = "critical" // 严重级别(需立即处理)
)

func GetErrorSeverity added in v0.1.4

func GetErrorSeverity(err error) ErrorSeverity

GetErrorSeverity 获取错误严重程度

type ErrorType added in v0.1.4

type ErrorType string

ErrorType 错误类型

const (
	TypeConnection     ErrorType = "connection"     // 连接相关错误
	TypeAuthentication ErrorType = "authentication" // 认证相关错误
	TypeSubscription   ErrorType = "subscription"   // 订阅相关错误
	TypePublish        ErrorType = "publish"        // 发布相关错误
	TypeConfiguration  ErrorType = "configuration"  // 配置相关错误
	TypeStorage        ErrorType = "storage"        // 存储相关错误
	TypeForwarder      ErrorType = "forwarder"      // 转发器相关错误
	TypeInternal       ErrorType = "internal"       // 内部错误
)

func GetErrorType added in v0.1.4

func GetErrorType(err error) ErrorType

GetErrorType 获取错误类型

type Event

type Event struct {
	Type      string      // 事件类型
	Session   string      // 相关会话名称
	Data      interface{} // 事件数据
	Timestamp time.Time   // 事件发生时间
}

Event 事件结构

type EventHandler

type EventHandler func(event *Event)

EventHandler 事件处理函数类型

type EventManager

type EventManager struct {
	// contains filtered or unexported fields
}

EventManager 事件管理器

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

FileStore 基于文件的会话状态存储

func NewFileStore

func NewFileStore(directory string) (*FileStore, error)

NewFileStore 创建新的文件存储

func (*FileStore) DeleteState

func (s *FileStore) DeleteState(sessionName string) error

DeleteState 删除会话状态

func (*FileStore) LoadState

func (s *FileStore) LoadState(sessionName string) (*SessionState, error)

LoadState 加载会话状态

func (*FileStore) SaveState

func (s *FileStore) SaveState(sessionName string, state *SessionState) error

SaveState 保存会话状态

type Forwarder added in v0.1.4

type Forwarder struct {
	// contains filtered or unexported fields
}

Forwarder 简化的消息转发器

func NewForwarder added in v0.1.4

func NewForwarder(config ForwarderConfig, manager *Manager) (*Forwarder, error)

NewForwarder 创建新的简化消息转发器

func (*Forwarder) GetMetrics added in v0.1.4

func (sf *Forwarder) GetMetrics() map[string]interface{}

GetMetrics 获取转发器指标

func (*Forwarder) IsRunning added in v0.1.4

func (sf *Forwarder) IsRunning() bool

IsRunning 检查转发器是否运行中

func (*Forwarder) Start added in v0.1.4

func (sf *Forwarder) Start() error

Start 启动转发器

func (*Forwarder) Stop added in v0.1.4

func (sf *Forwarder) Stop()

Stop 停止转发器

type ForwarderBuilder added in v0.1.4

type ForwarderBuilder struct {
	// contains filtered or unexported fields
}

ForwarderBuilder 转发器配置构建器

func NewForwarderBuilder added in v0.1.4

func NewForwarderBuilder(name string) *ForwarderBuilder

NewForwarderBuilder 创建转发器构建器

func (*ForwarderBuilder) BufferSize added in v0.1.4

func (fb *ForwarderBuilder) BufferSize(size int) *ForwarderBuilder

BufferSize 设置消息缓冲区大小

func (*ForwarderBuilder) Build added in v0.1.4

func (fb *ForwarderBuilder) Build() (ForwarderConfig, error)

Build 构建转发器配置

func (*ForwarderBuilder) Disable added in v0.1.4

func (fb *ForwarderBuilder) Disable() *ForwarderBuilder

Disable 禁用转发器

func (*ForwarderBuilder) Enable added in v0.1.4

func (fb *ForwarderBuilder) Enable() *ForwarderBuilder

Enable 启用转发器

func (*ForwarderBuilder) MapTopic added in v0.1.4

func (fb *ForwarderBuilder) MapTopic(srcTopic, dstTopic string) *ForwarderBuilder

MapTopic 添加单个主题映射

func (*ForwarderBuilder) MustBuild added in v0.1.4

func (fb *ForwarderBuilder) MustBuild() ForwarderConfig

MustBuild 构建转发器配置,出错时panic

func (*ForwarderBuilder) QoS added in v0.1.4

func (fb *ForwarderBuilder) QoS(qos byte) *ForwarderBuilder

QoS 设置转发消息的QoS等级

func (*ForwarderBuilder) Source added in v0.1.4

func (fb *ForwarderBuilder) Source(session string, topics ...string) *ForwarderBuilder

Source 设置源会话和主题

func (*ForwarderBuilder) Target added in v0.1.4

func (fb *ForwarderBuilder) Target(session string) *ForwarderBuilder

Target 设置目标会话

func (*ForwarderBuilder) TopicMapping added in v0.1.4

func (fb *ForwarderBuilder) TopicMapping(mappings map[string]string) *ForwarderBuilder

TopicMapping 设置主题映射

type ForwarderConfig added in v0.1.4

type ForwarderConfig struct {
	Name          string            // 转发器名称
	SourceSession string            // 源会话名称
	SourceTopics  []string          // 源主题列表
	TargetSession string            // 目标会话
	TopicMap      map[string]string // 主题映射表(源主题 -> 目标主题)
	QoS           byte              // 服务质量等级
	BufferSize    int               // 缓冲区大小
	Enabled       bool              // 是否启用
}

ForwarderConfig 简化的消息转发器配置

type ForwarderRegistry added in v0.1.4

type ForwarderRegistry struct {
	// contains filtered or unexported fields
}

ForwarderRegistry 简化的转发器注册表

func NewForwarderRegistry added in v0.1.4

func NewForwarderRegistry(manager *Manager) *ForwarderRegistry

NewForwarderRegistry 创建新的转发器注册表

func (*ForwarderRegistry) Get added in v0.1.4

func (sfr *ForwarderRegistry) Get(name string) (*Forwarder, error)

Get 获取指定名称的转发器

func (*ForwarderRegistry) GetAllMetrics added in v0.1.4

func (sfr *ForwarderRegistry) GetAllMetrics() map[string]map[string]interface{}

GetAllMetrics 获取所有转发器的指标

func (*ForwarderRegistry) List added in v0.1.4

func (sfr *ForwarderRegistry) List() []string

List 列出所有转发器的名称

func (*ForwarderRegistry) Register added in v0.1.4

func (sfr *ForwarderRegistry) Register(config ForwarderConfig) (*Forwarder, error)

Register 注册并启动一个新的转发器

func (*ForwarderRegistry) StopAll added in v0.1.4

func (sfr *ForwarderRegistry) StopAll()

StopAll 停止所有转发器

func (*ForwarderRegistry) Unregister added in v0.1.4

func (sfr *ForwarderRegistry) Unregister(name string) error

Unregister 注销并停止一个转发器

type GoRedisAdapter added in v0.1.4

type GoRedisAdapter struct {
	// contains filtered or unexported fields
}

GoRedisAdapter 是go-redis库的适配器,实现RedisClient接口

func NewGoRedisClient added in v0.1.4

func NewGoRedisClient(opts *GoRedisOptions) *GoRedisAdapter

NewGoRedisClient 创建一个新的Go Redis客户端

func (*GoRedisAdapter) Close added in v0.1.4

func (a *GoRedisAdapter) Close() error

Close 实现RedisClient接口的Close方法

func (*GoRedisAdapter) Del added in v0.1.4

func (a *GoRedisAdapter) Del(ctx context.Context, keys ...string) error

Del 实现RedisClient接口的Del方法

func (*GoRedisAdapter) Get added in v0.1.4

func (a *GoRedisAdapter) Get(ctx context.Context, key string) (string, error)

Get 实现RedisClient接口的Get方法

func (*GoRedisAdapter) Ping added in v0.1.4

func (a *GoRedisAdapter) Ping(ctx context.Context) error

Ping 测试Redis连接

func (*GoRedisAdapter) Set added in v0.1.4

func (a *GoRedisAdapter) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error

Set 实现RedisClient接口的Set方法

type GoRedisOptions added in v0.1.4

type GoRedisOptions struct {
	Addr     string
	Password string
	DB       int
	Username string
	// 连接池配置
	PoolSize     int
	MinIdleConns int
	// 超时设置
	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
}

GoRedisOptions Redis连接选项

func DefaultGoRedisOptions added in v0.1.4

func DefaultGoRedisOptions() *GoRedisOptions

DefaultGoRedisOptions 返回默认的Redis选项

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry 处理函数注册表

func (*HandlerRegistry) AddMessageHandler

func (h *HandlerRegistry) AddMessageHandler(topic string, handler MessageHandler)

AddMessageHandler 添加消息处理函数

func (*HandlerRegistry) ClearMessageHandlers

func (h *HandlerRegistry) ClearMessageHandlers()

ClearMessageHandlers 清除所有消息处理函数

func (*HandlerRegistry) CountMessageHandlers

func (h *HandlerRegistry) CountMessageHandlers() int

CountMessageHandlers 获取消息处理函数数量

func (*HandlerRegistry) GetAllTopics

func (h *HandlerRegistry) GetAllTopics() []string

GetAllTopics 获取所有订阅的主题

func (*HandlerRegistry) GetMessageHandler

func (h *HandlerRegistry) GetMessageHandler(topic string) (MessageHandler, bool)

GetMessageHandler 获取消息处理函数

func (*HandlerRegistry) RemoveMessageHandler

func (h *HandlerRegistry) RemoveMessageHandler(topic string)

RemoveMessageHandler 移除消息处理函数

func (*HandlerRegistry) SetConnectHandler

func (h *HandlerRegistry) SetConnectHandler(handler ConnectHandler)

SetConnectHandler 设置连接成功处理函数

func (*HandlerRegistry) SetConnectLostHandler

func (h *HandlerRegistry) SetConnectLostHandler(handler ConnectLostHandler)

SetConnectLostHandler 设置连接断开处理函数

type InMemoryErrorReporter added in v0.1.4

type InMemoryErrorReporter struct {
	// contains filtered or unexported fields
}

InMemoryErrorReporter 内存错误报告器

func NewInMemoryErrorReporter added in v0.1.4

func NewInMemoryErrorReporter() *InMemoryErrorReporter

NewInMemoryErrorReporter 创建内存错误报告器

func (*InMemoryErrorReporter) GetErrorStats added in v0.1.4

func (r *InMemoryErrorReporter) GetErrorStats() map[string]interface{}

GetErrorStats 获取错误统计

func (*InMemoryErrorReporter) GetRecentErrors added in v0.1.4

func (r *InMemoryErrorReporter) GetRecentErrors(limit int) []error

GetRecentErrors 获取最近的错误

func (*InMemoryErrorReporter) ReportError added in v0.1.4

func (r *InMemoryErrorReporter) ReportError(err error)

ReportError 报告错误

type JSONExporter

type JSONExporter struct {
	// contains filtered or unexported fields
}

JSONExporter JSON 格式导出器

func NewJSONExporter

func NewJSONExporter(pretty bool) *JSONExporter

func (*JSONExporter) Export

func (e *JSONExporter) Export(metrics map[string]interface{}) (string, error)

type JWTAuthProvider added in v0.1.4

type JWTAuthProvider struct {
	// contains filtered or unexported fields
}

JWTAuthProvider JWT认证提供者

func NewJWTAuthProvider added in v0.1.4

func NewJWTAuthProvider(token string) *JWTAuthProvider

NewJWTAuthProvider 创建JWT认证提供者

func (*JWTAuthProvider) Complete added in v0.1.4

func (p *JWTAuthProvider) Complete(data []byte) (bool, error)

Complete 完成认证过程

func (*JWTAuthProvider) HandleChallenge added in v0.1.4

func (p *JWTAuthProvider) HandleChallenge(data []byte) ([]byte, error)

HandleChallenge 处理认证挑战

func (*JWTAuthProvider) Initialize added in v0.1.4

func (p *JWTAuthProvider) Initialize() ([]byte, error)

Initialize 初始化认证过程

func (*JWTAuthProvider) Method added in v0.1.4

func (p *JWTAuthProvider) Method() string

Method 返回认证方法名称

type JsonPathTransformer added in v0.1.4

type JsonPathTransformer struct {
	// contains filtered or unexported fields
}

JsonPathTransformer JSON路径转换器 用于对JSON格式的负载进行特定字段的修改

func NewJsonPathTransformer added in v0.1.4

func NewJsonPathTransformer(path string, value interface{}) *JsonPathTransformer

NewJsonPathTransformer 创建JSON路径转换器 path格式为简单的点分路径,例如 "device.temperature.value"

func (*JsonPathTransformer) GetDescription added in v0.1.4

func (t *JsonPathTransformer) GetDescription() string

GetDescription 获取转换器的描述

func (*JsonPathTransformer) Transform added in v0.1.4

func (t *JsonPathTransformer) Transform(message *Message) (*Message, error)

Transform 转换JSON负载中的指定字段

type Logger

type Logger interface {
	Debug(msg string, args ...interface{})
	Info(msg string, args ...interface{})
	Warn(msg string, args ...interface{})
	Error(msg string, args ...interface{})
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

Logger 是日志接口定义

type MQTT5Auth added in v0.1.4

type MQTT5Auth struct {
	Method     string            // 认证方法
	Data       []byte            // 认证数据
	Properties map[string]string // 认证属性
}

MQTT5Auth MQTT 5.0增强认证

func NewMQTT5Auth added in v0.1.4

func NewMQTT5Auth(method string, data []byte) *MQTT5Auth

NewMQTT5Auth 创建MQTT 5.0增强认证

func (*MQTT5Auth) WithProperty added in v0.1.4

func (a *MQTT5Auth) WithProperty(key, value string) *MQTT5Auth

WithProperty 添加认证属性

type MQTT5AuthManager added in v0.1.4

type MQTT5AuthManager struct {
	// contains filtered or unexported fields
}

MQTT5AuthManager MQTT 5.0认证管理器

func NewMQTT5AuthManager added in v0.1.4

func NewMQTT5AuthManager(provider AuthProvider) *MQTT5AuthManager

NewMQTT5AuthManager 创建MQTT 5.0认证管理器

func (*MQTT5AuthManager) CompleteAuth added in v0.1.4

func (m *MQTT5AuthManager) CompleteAuth(data []byte) (bool, error)

CompleteAuth 完成认证过程

func (*MQTT5AuthManager) GetMethod added in v0.1.4

func (m *MQTT5AuthManager) GetMethod() string

GetMethod 获取认证方法

func (*MQTT5AuthManager) GetState added in v0.1.4

func (m *MQTT5AuthManager) GetState() int

GetState 获取认证状态

func (*MQTT5AuthManager) HandleAuth added in v0.1.4

func (m *MQTT5AuthManager) HandleAuth(data []byte) ([]byte, error)

HandleAuth 处理认证响应

func (*MQTT5AuthManager) StartAuth added in v0.1.4

func (m *MQTT5AuthManager) StartAuth() (string, []byte, error)

StartAuth 开始认证过程

type MQTT5Config added in v0.1.4

type MQTT5Config struct {
	// 基本配置
	ProtocolVersion    byte          // 协议版本,默认为MQTT50
	SessionExpiry      time.Duration // 会话过期间隔
	ReceiveMaximum     uint16        // 接收最大值
	MaximumPacketSize  uint32        // 最大报文大小
	TopicAliasMaximum  uint16        // 主题别名最大值
	RequestRespInfo    bool          // 请求响应信息
	RequestProblemInfo bool          // 请求问题信息

	// 认证配置
	AuthMethod   string // 认证方法
	AuthData     []byte // 认证数据
	EnhancedAuth bool   // 是否启用增强认证

	// 共享订阅配置
	SharedSubAvailable bool // 是否支持共享订阅

	// 用户属性
	UserProperties map[string]string // 用户属性
}

MQTT5Config MQTT 5.0配置

func DefaultMQTT5Config added in v0.1.4

func DefaultMQTT5Config() *MQTT5Config

DefaultMQTT5Config 返回默认MQTT 5.0配置

type MQTT5Options added in v0.1.4

type MQTT5Options struct {
	// 基本配置
	MQTT5Config *MQTT5Config

	// 安全配置
	TLS         *TLSConfig         // 基本TLS配置
	EnhancedTLS *EnhancedTLSConfig // 增强TLS配置

	// 增强认证
	Auth *MQTT5Auth // 增强认证配置

	// 共享订阅
	SharedSubPrefix string // 共享订阅前缀,默认为"$share"
}

MQTT5Options MQTT 5.0选项扩展

type MQTTXError added in v0.1.4

type MQTTXError struct {
	Type      ErrorType      `json:"type"`              // 错误类型
	Severity  ErrorSeverity  `json:"severity"`          // 错误严重程度
	Code      string         `json:"code,omitempty"`    // 错误代码
	Message   string         `json:"message"`           // 错误消息
	Session   string         `json:"session,omitempty"` // 相关会话
	Topic     string         `json:"topic,omitempty"`   // 相关主题
	Timestamp time.Time      `json:"timestamp"`         // 错误发生时间
	Context   map[string]any `json:"context,omitempty"` // 额外上下文信息
	Cause     error          `json:"-"`                 // 底层原因(不序列化)
}

MQTTXError 统一的错误结构

func FilterErrors added in v0.1.4

func FilterErrors(errors []error, errType ErrorType) []*MQTTXError

FilterErrors 过滤特定类型的错误

func NewConfigError added in v0.1.4

func NewConfigError(message string, cause error) *MQTTXError

NewConfigError 创建配置错误

func NewConnectionError added in v0.1.4

func NewConnectionError(message string, cause error) *MQTTXError

NewConnectionError 创建连接错误

func NewError added in v0.1.4

func NewError(errType ErrorType, severity ErrorSeverity, message string) *MQTTXError

NewError 创建新的错误

func NewForwarderError added in v0.1.4

func NewForwarderError(message string, cause error) *MQTTXError

NewForwarderError 创建转发器错误

func NewPublishError added in v0.1.4

func NewPublishError(topic string, cause error) *MQTTXError

NewPublishError 创建发布错误

func NewSubscribeError added in v0.1.4

func NewSubscribeError(topic string, cause error) *MQTTXError

NewSubscribeError 创建订阅错误

func WrapError added in v0.1.4

func WrapError(err error, errType ErrorType, message string) *MQTTXError

WrapError 封装错误,保留原始错误信息

func (*MQTTXError) Error added in v0.1.4

func (e *MQTTXError) Error() string

Error 实现error接口

func (*MQTTXError) Is added in v0.1.4

func (e *MQTTXError) Is(target error) bool

Is 检查错误类型

func (*MQTTXError) IsCritical added in v0.1.4

func (e *MQTTXError) IsCritical() bool

IsCritical 判断是否为严重错误

func (*MQTTXError) IsRecoverable added in v0.1.4

func (e *MQTTXError) IsRecoverable() bool

IsRecoverable 判断是否为可恢复错误

func (*MQTTXError) Unwrap added in v0.1.4

func (e *MQTTXError) Unwrap() error

Unwrap 返回底层错误

func (*MQTTXError) WithContext added in v0.1.4

func (e *MQTTXError) WithContext(key string, value any) *MQTTXError

WithContext 添加上下文信息

func (*MQTTXError) WithSession added in v0.1.4

func (e *MQTTXError) WithSession(session string) *MQTTXError

WithSession 设置相关会话

func (*MQTTXError) WithTopic added in v0.1.4

func (e *MQTTXError) WithTopic(topic string) *MQTTXError

WithTopic 设置相关主题

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager 管理多个MQTT会话

func NewSessionManager

func NewSessionManager() *Manager

NewSessionManager 创建新的会话管理器

func (*Manager) AddSession

func (m *Manager) AddSession(opts *Options) error

AddSession 添加新的MQTT会话

func (*Manager) ClearRecoveredErrors added in v0.1.4

func (m *Manager) ClearRecoveredErrors() int

ClearRecoveredErrors 清理已恢复的错误

func (*Manager) Close

func (m *Manager) Close()

Close 关闭管理器

func (*Manager) DisconnectAll

func (m *Manager) DisconnectAll()

DisconnectAll 断开所有会话连接

func (*Manager) GetActiveErrors added in v0.1.4

func (m *Manager) GetActiveErrors() []*ErrorInfo

GetActiveErrors 获取当前活跃的错误

func (*Manager) GetAllSessionsStatus

func (m *Manager) GetAllSessionsStatus() map[string]string

GetAllSessionsStatus 获取所有会话的状态字符串

func (*Manager) GetAllSessionsStatusCode added in v0.1.4

func (m *Manager) GetAllSessionsStatusCode() map[string]uint32

GetAllSessionsStatusCode 获取所有会话的状态常量

func (*Manager) GetErrorStats added in v0.1.4

func (m *Manager) GetErrorStats() map[string]interface{}

GetErrorStats 获取错误统计信息

func (*Manager) GetMetrics

func (m *Manager) GetMetrics() map[string]interface{}

GetMetrics 获取指标信息

func (*Manager) GetSession

func (m *Manager) GetSession(name string) (*Session, error)

GetSession 获取指定会话

func (*Manager) Handle

func (m *Manager) Handle(topic string, handler func(*Message)) *Route

Handle 处理所有会话的指定主题消息

func (*Manager) HandleTo

func (m *Manager) HandleTo(session, topic string, handler func(*Message)) (*Route, error)

HandleTo 处理指定会话的指定主题消息

func (*Manager) HandleToWithConfig added in v0.1.4

func (m *Manager) HandleToWithConfig(config RouteConfig) (*AdvancedRoute, error)

HandleToWithConfig 使用高级配置处理指定会话的指定主题消息

func (*Manager) HandleWithConfig added in v0.1.4

func (m *Manager) HandleWithConfig(config RouteConfig, handler func(*Message)) *AdvancedRoute

HandleWithConfig 使用高级配置处理所有会话的指定主题消息

func (*Manager) ListSessions

func (m *Manager) ListSessions() []string

ListSessions 列出所有会话

func (*Manager) Listen

func (m *Manager) Listen(topic string) (chan *Message, *Route)

Listen 监听所有会话的指定主题消息

func (*Manager) ListenTo

func (m *Manager) ListenTo(session, topic string) (chan *Message, *Route, error)

ListenTo 监听指定会话的指定主题消息

func (*Manager) ListenToWithConfig added in v0.1.4

func (m *Manager) ListenToWithConfig(config RouteConfig) (chan *Message, *AdvancedRoute, error)

ListenToWithConfig 使用高级配置监听指定会话的指定主题消息

func (*Manager) ListenWithConfig added in v0.1.4

func (m *Manager) ListenWithConfig(config RouteConfig) (chan *Message, *AdvancedRoute)

ListenWithConfig 使用高级配置监听所有会话的指定主题消息

func (*Manager) OnEvent

func (m *Manager) OnEvent(eventType string, handler func(event Event))

OnEvent 注册事件处理函数

func (*Manager) PublishTo

func (m *Manager) PublishTo(sessionName, topic string, payload []byte, qos byte) error

PublishTo 发布消息到指定会话,增加错误处理

func (*Manager) PublishToAll

func (m *Manager) PublishToAll(topic string, payload []byte, qos byte) []error

PublishToAll 向所有会话发布消息

func (*Manager) RegisterError added in v0.1.4

func (m *Manager) RegisterError(sessionName string, err error, category ErrorCategory) *ErrorInfo

RegisterError 注册错误并启动恢复流程

func (*Manager) RemoveSession

func (m *Manager) RemoveSession(name string) error

RemoveSession 移除会话

func (*Manager) SetLogger

func (m *Manager) SetLogger(logger Logger) Logger

SetLogger 设置日志记录器

func (*Manager) SetMaxRetries added in v0.1.4

func (m *Manager) SetMaxRetries(category ErrorCategory, count int)

SetMaxRetries 设置特定类别错误的最大重试次数

func (*Manager) SubscribeAll

func (m *Manager) SubscribeAll(topic string, handler MessageHandler, qos byte) []error

SubscribeAll 在所有会话中订阅主题

func (*Manager) SubscribeTo

func (m *Manager) SubscribeTo(sessionName, topic string, handler MessageHandler, qos byte) error

SubscribeTo 订阅主题,增加错误处理

func (*Manager) UnsubscribeAll

func (m *Manager) UnsubscribeAll(topic string) []error

UnsubscribeAll 取消所有会话的主题订阅

func (*Manager) UnsubscribeTo

func (m *Manager) UnsubscribeTo(name string, topic string) error

UnsubscribeTo 取消指定会话的主题订阅

func (*Manager) WaitForAllSessions

func (m *Manager) WaitForAllSessions(timeout time.Duration) error

WaitForAllSessions 等待所有会话连接成功

func (*Manager) WaitForSession

func (m *Manager) WaitForSession(name string, timeout time.Duration) error

WaitForSession 等待指定会话连接成功

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore 基于内存的会话状态存储

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore 创建新的内存存储

func (*MemoryStore) DeleteState

func (s *MemoryStore) DeleteState(sessionName string) error

DeleteState 删除会话状态

func (*MemoryStore) LoadState

func (s *MemoryStore) LoadState(sessionName string) (*SessionState, error)

LoadState 加载会话状态

func (*MemoryStore) SaveState

func (s *MemoryStore) SaveState(sessionName string, state *SessionState) error

SaveState 保存会话状态

type Message

type Message struct {
	Topic      string                 `json:"topic"`                // 主题名称
	Payload    []byte                 `json:"payload"`              // 消息内容
	QoS        byte                   `json:"qos"`                  // 服务质量等级
	Retained   bool                   `json:"retained"`             // 是否为保留消息
	Duplicate  bool                   `json:"duplicate"`            // 是否为重复消息
	MessageID  uint16                 `json:"message_id"`           // 消息ID
	Timestamp  time.Time              `json:"timestamp"`            // 消息时间戳
	Properties map[string]interface{} `json:"properties,omitempty"` // MQTT 5.0属性
}

Message 消息结构

func (*Message) PayloadJSON

func (m *Message) PayloadJSON(v interface{}) error

PayloadJSON 将消息负载解析为JSON

func (*Message) PayloadString

func (m *Message) PayloadString() string

PayloadString 获取消息负载的字符串形式

type MessageFilter added in v0.1.4

type MessageFilter interface {
	// Match 判断消息是否匹配过滤规则
	Match(message *Message) bool
	// GetDescription 获取过滤器的描述
	GetDescription() string
}

MessageFilter 消息过滤器接口 用于根据指定的规则过滤消息

type MessageHandler

type MessageHandler func(topic string, payload []byte)

MessageHandler 消息处理函数类型

type MessagePool added in v0.1.4

type MessagePool struct {
	// contains filtered or unexported fields
}

MessagePool 消息对象池,减少内存分配

type MessageTransformer added in v0.1.4

type MessageTransformer interface {
	// Transform 转换消息,返回转换后的消息
	Transform(message *Message) (*Message, error)
	// GetDescription 获取转换器的描述
	GetDescription() string
}

MessageTransformer 消息转换器接口 用于对消息进行转换处理

type Metrics

type Metrics struct {
	TotalMessages   uint64 // 总消息数
	TotalBytes      uint64 // 总字节数
	ErrorCount      uint64 // 错误计数
	ReconnectCount  uint64 // 重连次数
	ActiveSessions  int64  // 活跃会话数
	LastUpdate      int64  // 最后更新时间(Unix纳秒时间戳,原子操作安全)
	LastMessageTime int64  // 最后消息时间(Unix纳秒时间戳,原子操作安全)
	LastErrorTime   int64  // 最后错误时间(Unix纳秒时间戳,原子操作安全)
}

Metrics 指标收集器

func GetMetrics added in v0.1.4

func GetMetrics() *Metrics

GetMetrics 获取指标对象

type MetricsExporter

type MetricsExporter interface {
	Export(metrics map[string]interface{}) error
}

MetricsExporter 指标导出接口

type MetricsPool added in v0.1.4

type MetricsPool struct {
	// contains filtered or unexported fields
}

MetricsPool 指标对象池

type MockRedisClient added in v0.1.4

type MockRedisClient struct {
	// contains filtered or unexported fields
}

MockRedisClient 是Redis客户端的模拟实现

func NewMockRedisClient added in v0.1.4

func NewMockRedisClient() *MockRedisClient

NewMockRedisClient 创建一个新的模拟Redis客户端

func (*MockRedisClient) Close added in v0.1.4

func (m *MockRedisClient) Close() error

Close 关闭连接

func (*MockRedisClient) Del added in v0.1.4

func (m *MockRedisClient) Del(ctx context.Context, keys ...string) error

Del 删除键

func (*MockRedisClient) Get added in v0.1.4

func (m *MockRedisClient) Get(ctx context.Context, key string) (string, error)

Get 获取键对应的值

func (*MockRedisClient) Ping added in v0.1.4

func (m *MockRedisClient) Ping(ctx context.Context) error

Ping 测试连接

func (*MockRedisClient) Set added in v0.1.4

func (m *MockRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error

Set 设置键值对

func (*MockRedisClient) WithDelError added in v0.1.4

func (m *MockRedisClient) WithDelError(err error) *MockRedisClient

WithDelError 设置Del操作的错误

func (*MockRedisClient) WithGetError added in v0.1.4

func (m *MockRedisClient) WithGetError(err error) *MockRedisClient

WithGetError 设置Get操作的错误

func (*MockRedisClient) WithSetError added in v0.1.4

func (m *MockRedisClient) WithSetError(err error) *MockRedisClient

WithSetError 设置Set操作的错误

type OAuth2AuthProvider added in v0.1.4

type OAuth2AuthProvider struct {
	// contains filtered or unexported fields
}

OAuth2AuthProvider OAuth 2.0认证提供者

func NewOAuth2AuthProvider added in v0.1.4

func NewOAuth2AuthProvider(accessToken, refreshToken string, expiresIn time.Duration) *OAuth2AuthProvider

NewOAuth2AuthProvider 创建OAuth 2.0认证提供者

func (*OAuth2AuthProvider) Complete added in v0.1.4

func (p *OAuth2AuthProvider) Complete(data []byte) (bool, error)

Complete 完成认证过程

func (*OAuth2AuthProvider) HandleChallenge added in v0.1.4

func (p *OAuth2AuthProvider) HandleChallenge(data []byte) ([]byte, error)

HandleChallenge 处理认证挑战

func (*OAuth2AuthProvider) Initialize added in v0.1.4

func (p *OAuth2AuthProvider) Initialize() ([]byte, error)

Initialize 初始化认证过程

func (*OAuth2AuthProvider) Method added in v0.1.4

func (p *OAuth2AuthProvider) Method() string

Method 返回认证方法名称

type Options

type Options struct {
	Name            string              // 会话名称
	Brokers         []string            // MQTT broker地址列表
	Username        string              // 用户名
	Password        string              // 密码
	ClientID        string              // 客户端ID
	TLS             *TLSConfig          // TLS配置
	EnhancedTLS     *EnhancedTLSConfig  // 增强的TLS配置
	ConnectProps    *ConnectProps       // 连接属性
	Topics          []TopicConfig       // 预订阅的主题
	Performance     *PerformanceOptions // 性能相关配置
	Storage         *StorageOptions     // 存储选项
	ProtocolVersion byte                // 协议版本: MQTT311 (4) 或 MQTT50 (5)
	MQTT5           *MQTT5Options       // MQTT 5.0 特定选项
}

Options MQTT会话配置选项

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions 返回默认选项

func (*Options) Clone

func (o *Options) Clone() *Options

Clone 克隆选项

func (*Options) ConfigureTLS

func (o *Options) ConfigureTLS() (*tls.Config, error)

ConfigureTLS 配置TLS

func (*Options) ConfigureTLSWithEnhanced added in v0.1.4

func (o *Options) ConfigureTLSWithEnhanced() (*tls.Config, error)

ConfigureTLSWithEnhanced 使用增强的TLS配置构建TLS配置

func (*Options) Validate

func (o *Options) Validate() error

Validate 验证选项

func (*Options) WithAuth

func (o *Options) WithAuth(username, password string) *Options

WithAuth 设置认证信息

func (*Options) WithCleanSession

func (o *Options) WithCleanSession(clean bool) *Options

WithCleanSession 设置清理会话选项

func (*Options) WithEnhancedAuth added in v0.1.4

func (o *Options) WithEnhancedAuth(auth *MQTT5Auth) *Options

WithEnhancedAuth 配置增强认证

func (*Options) WithEnhancedTLS added in v0.1.4

func (o *Options) WithEnhancedTLS(config *EnhancedTLSConfig) *Options

WithEnhancedTLS 使用增强的TLS配置

func (*Options) WithMQTT5 added in v0.1.4

func (o *Options) WithMQTT5(config *MQTT5Config) *Options

WithMQTT5 使用MQTT 5.0配置

func (*Options) WithPerformance

func (o *Options) WithPerformance(writeBuffer, readBuffer, chanSize int) *Options

WithPerformance 设置性能选项

func (*Options) WithPersistence

func (o *Options) WithPersistence(enabled bool) *Options

WithPersistence 设置持久化选项

func (*Options) WithReconnect

func (o *Options) WithReconnect(autoReconnect bool, initialInterval, maxInterval int64, backoffFactor float64) *Options

WithReconnect 设置重连选项

func (*Options) WithRedisStorage added in v0.1.4

func (o *Options) WithRedisStorage(addr, username, password string, db int, keyPrefix string, ttl int64) *Options

WithRedisStorage 设置Redis存储选项

func (*Options) WithSessionExpiry added in v0.1.4

func (o *Options) WithSessionExpiry(expiry time.Duration) *Options

WithSessionExpiry 设置会话过期间隔

func (*Options) WithSharedSubscription added in v0.1.4

func (o *Options) WithSharedSubscription(enable bool, prefix string) *Options

WithSharedSubscription 配置共享订阅

func (*Options) WithSimpleReconnect added in v0.1.4

func (o *Options) WithSimpleReconnect(autoReconnect bool, maxInterval int64) *Options

WithSimpleReconnect 设置简化版重连选项

func (*Options) WithStorage added in v0.1.4

func (o *Options) WithStorage(storeType StoreType, path string) *Options

WithStorage 设置存储选项

func (*Options) WithTLS

func (o *Options) WithTLS(caFile, certFile, keyFile string, skipVerify bool) *Options

WithTLS 设置TLS配置

func (*Options) WithTimeout

func (o *Options) WithTimeout(connect, write int64) *Options

WithTimeout 设置超时选项

func (*Options) WithUserProperty added in v0.1.4

func (o *Options) WithUserProperty(key, value string) *Options

WithUserProperty 添加用户属性

type PayloadFilter added in v0.1.4

type PayloadFilter struct {
	// contains filtered or unexported fields
}

PayloadFilter 基于消息负载的过滤器

func NewPayloadFilter added in v0.1.4

func NewPayloadFilter(pattern string, isRegex bool) (*PayloadFilter, error)

NewPayloadFilter 创建基于负载内容的消息过滤器

func (*PayloadFilter) GetDescription added in v0.1.4

func (f *PayloadFilter) GetDescription() string

GetDescription 获取过滤器的描述

func (*PayloadFilter) Match added in v0.1.4

func (f *PayloadFilter) Match(message *Message) bool

Match 判断消息负载是否匹配过滤规则

type PayloadTransformer added in v0.1.4

type PayloadTransformer struct {
	// contains filtered or unexported fields
}

PayloadTransformer 负载转换器 用于对消息负载进行转换

func NewPayloadTransformer added in v0.1.4

func NewPayloadTransformer(description string, transformFunc func([]byte) ([]byte, error)) *PayloadTransformer

NewPayloadTransformer 创建自定义负载转换器

func (*PayloadTransformer) GetDescription added in v0.1.4

func (t *PayloadTransformer) GetDescription() string

GetDescription 获取转换器的描述

func (*PayloadTransformer) Transform added in v0.1.4

func (t *PayloadTransformer) Transform(message *Message) (*Message, error)

Transform 转换消息的负载

type PerformanceOptions

type PerformanceOptions struct {
	WriteBufferSize    int           // 写缓冲区大小
	ReadBufferSize     int           // 读缓冲区大小
	MessageChanSize    uint          // 消息通道大小
	WriteTimeout       time.Duration // 写超时
	ReadTimeout        time.Duration // 读超时
	MaxMessageSize     int64         // 最大消息大小
	MaxPendingMessages int           // 最大待处理消息数
}

PerformanceOptions 性能相关配置

type PlainAuthProvider added in v0.1.4

type PlainAuthProvider struct {
	// contains filtered or unexported fields
}

PlainAuthProvider 明文认证提供者

func NewPlainAuthProvider added in v0.1.4

func NewPlainAuthProvider(username, password string) *PlainAuthProvider

NewPlainAuthProvider 创建明文认证提供者

func (*PlainAuthProvider) Complete added in v0.1.4

func (p *PlainAuthProvider) Complete(data []byte) (bool, error)

Complete 完成认证过程

func (*PlainAuthProvider) HandleChallenge added in v0.1.4

func (p *PlainAuthProvider) HandleChallenge(data []byte) ([]byte, error)

HandleChallenge 处理认证挑战

func (*PlainAuthProvider) Initialize added in v0.1.4

func (p *PlainAuthProvider) Initialize() ([]byte, error)

Initialize 初始化认证过程

func (*PlainAuthProvider) Method added in v0.1.4

func (p *PlainAuthProvider) Method() string

Method 返回认证方法名称

type PoolStats added in v0.1.4

type PoolStats struct {
	MessagePoolHits   uint64 `json:"message_pool_hits"`
	MessagePoolMisses uint64 `json:"message_pool_misses"`
	BufferPoolHits    uint64 `json:"buffer_pool_hits"`
	BufferPoolMisses  uint64 `json:"buffer_pool_misses"`
	MetricsPoolHits   uint64 `json:"metrics_pool_hits"`
	MetricsPoolMisses uint64 `json:"metrics_pool_misses"`
	TimerPoolHits     uint64 `json:"timer_pool_hits"`
	TimerPoolMisses   uint64 `json:"timer_pool_misses"`
}

PoolStats 对象池统计信息

func GetPoolStats added in v0.1.4

func GetPoolStats() PoolStats

GetPoolStats 获取池统计信息

type PooledBuffer added in v0.1.4

type PooledBuffer struct {
	// contains filtered or unexported fields
}

PooledBuffer 池化缓冲区

func GetBuffer added in v0.1.4

func GetBuffer(expectedSize int) *PooledBuffer

GetBuffer 根据预期大小获取合适的缓冲区

func (*PooledBuffer) Bytes added in v0.1.4

func (b *PooledBuffer) Bytes() []byte

Bytes 获取字节数据

func (*PooledBuffer) Grow added in v0.1.4

func (b *PooledBuffer) Grow(n int)

Grow 扩容缓冲区

func (*PooledBuffer) Reset added in v0.1.4

func (b *PooledBuffer) Reset()

Reset 重置缓冲区

func (*PooledBuffer) Write added in v0.1.4

func (b *PooledBuffer) Write(p []byte) (n int, err error)

Write 写入数据

type PooledMessage added in v0.1.4

type PooledMessage struct {
	Topic     string
	Payload   []byte
	QoS       byte
	Retained  bool
	Timestamp time.Time
	// contains filtered or unexported fields
}

PooledMessage 池化消息对象

func GetMessage added in v0.1.4

func GetMessage() *PooledMessage

GetMessage 从池中获取消息对象

func (*PooledMessage) Clone added in v0.1.4

func (m *PooledMessage) Clone() *PooledMessage

Clone 克隆消息(深拷贝)

func (*PooledMessage) Reset added in v0.1.4

func (m *PooledMessage) Reset()

Reset 重置消息对象以供复用

func (*PooledMessage) SetPayload added in v0.1.4

func (m *PooledMessage) SetPayload(data []byte)

SetPayload 设置消息载荷,复用底层数组

type PooledTimer added in v0.1.4

type PooledTimer struct {
	*time.Timer
	// contains filtered or unexported fields
}

PooledTimer 池化定时器

func GetTimer added in v0.1.4

func GetTimer(d time.Duration) *PooledTimer

GetTimer 获取定时器

func (*PooledTimer) Reset added in v0.1.4

func (t *PooledTimer) Reset(d time.Duration) bool

Reset 重置定时器

func (*PooledTimer) Stop added in v0.1.4

func (t *PooledTimer) Stop() bool

Stop 停止定时器

type PrometheusExporter

type PrometheusExporter struct {
	// contains filtered or unexported fields
}

PrometheusExporter Prometheus 格式导出器

func NewPrometheusExporter

func NewPrometheusExporter(prefix string) *PrometheusExporter

func (*PrometheusExporter) Export

func (e *PrometheusExporter) Export(metrics map[string]interface{}) string

type RecoveryManager added in v0.1.4

type RecoveryManager struct {
	// contains filtered or unexported fields
}

RecoveryManager 恢复管理器

func NewRecoveryManager added in v0.1.4

func NewRecoveryManager(manager *Manager) *RecoveryManager

NewRecoveryManager 创建新的恢复管理器

func (*RecoveryManager) ClearRecoveredErrors added in v0.1.4

func (rm *RecoveryManager) ClearRecoveredErrors() int

ClearRecoveredErrors 清理已恢复的错误

func (*RecoveryManager) GetActiveErrors added in v0.1.4

func (rm *RecoveryManager) GetActiveErrors() []*ErrorInfo

GetActiveErrors 获取活跃错误

func (*RecoveryManager) GetErrorStats added in v0.1.4

func (rm *RecoveryManager) GetErrorStats() map[string]interface{}

GetErrorStats 获取错误统计信息

func (*RecoveryManager) MarkErrorRecovered added in v0.1.4

func (rm *RecoveryManager) MarkErrorRecovered(sessionName string, category ErrorCategory, err error) bool

MarkErrorRecovered 标记错误已恢复

func (*RecoveryManager) RegisterError added in v0.1.4

func (rm *RecoveryManager) RegisterError(sessionName string, err error, category ErrorCategory) *ErrorInfo

RegisterError 注册错误

func (*RecoveryManager) SetMaxRetries added in v0.1.4

func (rm *RecoveryManager) SetMaxRetries(category ErrorCategory, count int)

SetMaxRetries 设置最大重试次数

type RecoveryStrategy added in v0.1.4

type RecoveryStrategy int

RecoveryStrategy 恢复策略

const (
	// StrategyRetry 重试策略
	StrategyRetry RecoveryStrategy = iota
	// StrategyReconnect 重新连接策略
	StrategyReconnect
	// StrategyNotify 仅通知策略
	StrategyNotify
)

type RedisClient added in v0.1.4

type RedisClient interface {
	Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
	Get(ctx context.Context, key string) (string, error)
	Del(ctx context.Context, keys ...string) error
	Close() error
}

RedisClient Redis客户端接口 这是一个接口,允许使用不同的Redis客户端实现

type RedisOptions added in v0.1.4

type RedisOptions struct {
	// Addr Redis服务器地址,格式为 "host:port"
	Addr string
	// Username Redis用户名
	Username string
	// Password Redis密码
	Password string
	// DB Redis数据库索引
	DB int
	// KeyPrefix 键前缀
	KeyPrefix string
	// TTL 会话状态的生存时间
	TTL int64 // 秒
	// PoolSize 连接池大小
	PoolSize int
}

RedisOptions Redis配置选项

func DefaultRedisOptions added in v0.1.4

func DefaultRedisOptions() *RedisOptions

DefaultRedisOptions 返回默认的Redis选项

type RedisStore added in v0.1.4

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore 基于Redis的会话状态存储

func NewRedisStore added in v0.1.4

func NewRedisStore(client RedisClient, opts *RedisStoreOptions) *RedisStore

NewRedisStore 创建新的Redis存储

func (*RedisStore) Close added in v0.1.4

func (s *RedisStore) Close() error

Close 关闭Redis连接

func (*RedisStore) DeleteState added in v0.1.4

func (s *RedisStore) DeleteState(sessionName string) error

DeleteState 删除会话状态

func (*RedisStore) LoadState added in v0.1.4

func (s *RedisStore) LoadState(sessionName string) (*SessionState, error)

LoadState 加载会话状态

func (*RedisStore) SaveState added in v0.1.4

func (s *RedisStore) SaveState(sessionName string, state *SessionState) error

SaveState 保存会话状态

type RedisStoreOptions added in v0.1.4

type RedisStoreOptions struct {
	Prefix string        // 键前缀
	TTL    time.Duration // 过期时间
}

RedisStoreOptions Redis存储选项

type RetryableError added in v0.1.4

type RetryableError struct {
	*MQTTXError
	MaxRetries   int   `json:"max_retries"`
	CurrentRetry int   `json:"current_retry"`
	RetryDelay   int64 `json:"retry_delay"` // 纳秒
}

RetryableError 可重试的错误包装器

func NewRetryableError added in v0.1.4

func NewRetryableError(err *MQTTXError, maxRetries int, retryDelay int64) *RetryableError

NewRetryableError 创建可重试错误

func (*RetryableError) CanRetry added in v0.1.4

func (r *RetryableError) CanRetry() bool

CanRetry 检查是否可以重试

func (*RetryableError) GetRetryDelay added in v0.1.4

func (r *RetryableError) GetRetryDelay() int64

GetRetryDelay 获取重试延迟(纳秒)

func (*RetryableError) NextRetry added in v0.1.4

func (r *RetryableError) NextRetry()

NextRetry 准备下次重试

type Route

type Route struct {
	// contains filtered or unexported fields
}

Route 表示MQTT主题的路由

func (*Route) GetStats

func (r *Route) GetStats() RouteStats

GetStats 获取路由统计信息

func (*Route) Stop

func (r *Route) Stop()

Stop 停止路由订阅

type RouteConfig added in v0.1.4

type RouteConfig struct {
	Topic        string               // 主题
	QoS          byte                 // QoS级别
	Filters      []MessageFilter      // 过滤器列表
	Transformers []MessageTransformer // 转换器列表
	BufferSize   int                  // 消息缓冲区大小,仅在Listen模式下有效
	Description  string               // 路由描述
	SessionName  string               // 会话名称,空字符串表示所有会话
	Handler      func(*Message)       // 消息处理函数
}

RouteConfig 路由配置选项

type RouteStats

type RouteStats struct {
	MessagesReceived uint64    // 接收的消息数量
	MessagesDropped  uint64    // 丢弃的消息数量
	BytesReceived    uint64    // 接收的字节数
	LastMessageTime  time.Time // 最后消息时间
	LastError        time.Time // 最后错误时间
	ErrorCount       uint64    // 错误计数
}

RouteStats 路由统计信息

type SHA256AuthProvider added in v0.1.4

type SHA256AuthProvider struct {
	// contains filtered or unexported fields
}

SHA256AuthProvider SHA-256哈希认证提供者

func NewSHA256AuthProvider added in v0.1.4

func NewSHA256AuthProvider(username, password string) *SHA256AuthProvider

NewSHA256AuthProvider 创建SHA-256哈希认证提供者

func (*SHA256AuthProvider) Complete added in v0.1.4

func (p *SHA256AuthProvider) Complete(data []byte) (bool, error)

Complete 完成认证过程

func (*SHA256AuthProvider) HandleChallenge added in v0.1.4

func (p *SHA256AuthProvider) HandleChallenge(data []byte) ([]byte, error)

HandleChallenge 处理认证挑战

func (*SHA256AuthProvider) Initialize added in v0.1.4

func (p *SHA256AuthProvider) Initialize() ([]byte, error)

Initialize 初始化认证过程

func (*SHA256AuthProvider) Method added in v0.1.4

func (p *SHA256AuthProvider) Method() string

Method 返回认证方法名称

type SecurityError added in v0.1.4

type SecurityError struct {
	Message string
	Err     error
}

SecurityError 安全相关错误

func (*SecurityError) Error added in v0.1.4

func (e *SecurityError) Error() string

func (*SecurityError) Unwrap added in v0.1.4

func (e *SecurityError) Unwrap() error

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session 表示单个MQTT会话

func (*Session) Disconnect

func (s *Session) Disconnect()

Disconnect 断开连接

func (*Session) GetBrokers

func (s *Session) GetBrokers() []string

GetBrokers 获取已配置的Broker地址列表

func (*Session) GetClientID

func (s *Session) GetClientID() string

GetClientID 获取客户端ID

func (*Session) GetLastActivity

func (s *Session) GetLastActivity() time.Time

GetLastActivity 获取最后活动时间

func (*Session) GetMetrics

func (s *Session) GetMetrics() map[string]interface{}

GetMetrics 获取会话指标

func (*Session) GetName

func (s *Session) GetName() string

GetName 获取会话名称

func (*Session) GetOptions

func (s *Session) GetOptions() *Options

GetOptions 获取会话选项

func (*Session) GetSessionStatus added in v0.1.4

func (s *Session) GetSessionStatus() uint32

GetSessionStatus 返回会话的当前状态常量

func (*Session) GetStatus

func (s *Session) GetStatus() string

GetStatus 获取会话状态的字符串表示

func (*Session) GetSubscribedTopics

func (s *Session) GetSubscribedTopics() []string

GetSubscribedTopics 获取所有已订阅的主题

func (*Session) GetSubscriptionCount

func (s *Session) GetSubscriptionCount() int

GetSubscriptionCount 获取订阅数量

func (*Session) IsConnected

func (s *Session) IsConnected() bool

IsConnected 检查是否已连接

func (*Session) IsPersistent

func (s *Session) IsPersistent() bool

IsPersistent 检查是否为持久会话

func (*Session) IsSubscribed

func (s *Session) IsSubscribed(topic string) bool

IsSubscribed 检查主题是否已订阅

func (*Session) PrometheusMetrics

func (s *Session) PrometheusMetrics() string

PrometheusMetrics 格式的指标导出

func (*Session) Publish

func (s *Session) Publish(topic string, payload []byte, qos byte) error

Publish 发布消息,增加错误重试机制

func (*Session) ResetMetrics

func (s *Session) ResetMetrics()

ResetMetrics 重置会话指标

func (*Session) String

func (s *Session) String() string

String 返回会话的字符串表示

func (*Session) Subscribe

func (s *Session) Subscribe(topic string, handler MessageHandler, qos byte) error

Subscribe 订阅主题,增加错误重试机制

func (*Session) SubscribeShared added in v0.1.4

func (s *Session) SubscribeShared(group, topic string, handler MessageHandler, qos byte) error

SubscribeShared 订阅共享主题

func (*Session) Unsubscribe

func (s *Session) Unsubscribe(topics ...string) error

Unsubscribe 取消订阅主题

func (*Session) UnsubscribeShared added in v0.1.4

func (s *Session) UnsubscribeShared(group, topic string) error

UnsubscribeShared 取消订阅共享主题

func (*Session) UpdateLastActivity

func (s *Session) UpdateLastActivity()

UpdateLastActivity 更新最后活动时间

func (*Session) WithEnhancedAuth added in v0.1.4

func (s *Session) WithEnhancedAuth(provider AuthProvider) *Session

WithEnhancedAuth 为会话配置增强认证

type SessionBuilder added in v0.1.4

type SessionBuilder struct {
	// contains filtered or unexported fields
}

SessionBuilder 会话配置构建器,提供流畅的API

func NewSessionBuilder added in v0.1.4

func NewSessionBuilder(name string) *SessionBuilder

NewSessionBuilder 创建新的会话构建器

func QuickConnect added in v0.1.4

func QuickConnect(name, broker string) *SessionBuilder

QuickConnect 快速连接配置(最简API)

func SecureConnect added in v0.1.4

func SecureConnect(name, broker, caFile string) *SessionBuilder

SecureConnect 安全连接配置(常用TLS场景)

func (*SessionBuilder) Auth added in v0.1.4

func (b *SessionBuilder) Auth(username, password string) *SessionBuilder

Auth 设置认证信息

func (*SessionBuilder) AutoReconnect added in v0.1.4

func (b *SessionBuilder) AutoReconnect() *SessionBuilder

AutoReconnect 启用自动重连

func (*SessionBuilder) Broker added in v0.1.4

func (b *SessionBuilder) Broker(addr string) *SessionBuilder

Broker 设置单个broker地址

func (*SessionBuilder) Brokers added in v0.1.4

func (b *SessionBuilder) Brokers(addrs ...string) *SessionBuilder

Brokers 设置多个broker地址(高可用)

func (*SessionBuilder) Build added in v0.1.4

func (b *SessionBuilder) Build() (*Options, error)

Build 构建配置选项,包含完整验证

func (*SessionBuilder) CleanSession added in v0.1.4

func (b *SessionBuilder) CleanSession(clean bool) *SessionBuilder

CleanSession 设置是否清理会话

func (*SessionBuilder) ClientID added in v0.1.4

func (b *SessionBuilder) ClientID(id string) *SessionBuilder

ClientID 设置客户端ID

func (*SessionBuilder) DisableReconnect added in v0.1.4

func (b *SessionBuilder) DisableReconnect() *SessionBuilder

DisableReconnect 禁用自动重连

func (*SessionBuilder) FileStorage added in v0.1.4

func (b *SessionBuilder) FileStorage(path string) *SessionBuilder

FileStorage 使用文件存储

func (*SessionBuilder) KeepAlive added in v0.1.4

func (b *SessionBuilder) KeepAlive(seconds int) *SessionBuilder

KeepAlive 设置保活时间(秒)

func (*SessionBuilder) MessageChannelSize added in v0.1.4

func (b *SessionBuilder) MessageChannelSize(size int) *SessionBuilder

MessageChannelSize 设置消息通道大小

func (*SessionBuilder) MustBuild added in v0.1.4

func (b *SessionBuilder) MustBuild() *Options

MustBuild 构建配置选项,出错时panic(用于示例和测试)

func (*SessionBuilder) Performance added in v0.1.4

func (b *SessionBuilder) Performance(bufferSizeKB, maxPendingMsgs int) *SessionBuilder

Performance 性能调优配置

func (*SessionBuilder) Persistent added in v0.1.4

func (b *SessionBuilder) Persistent() *SessionBuilder

Persistent 启用会话持久化

func (*SessionBuilder) ReconnectConfig added in v0.1.4

func (b *SessionBuilder) ReconnectConfig(initialSec, maxSec int, backoffFactor float64) *SessionBuilder

ReconnectConfig 详细重连配置

func (*SessionBuilder) RedisAuth added in v0.1.4

func (b *SessionBuilder) RedisAuth(username, password string, db int) *SessionBuilder

RedisAuth 设置Redis认证

func (*SessionBuilder) RedisStorage added in v0.1.4

func (b *SessionBuilder) RedisStorage(addr string) *SessionBuilder

RedisStorage 使用Redis存储

func (*SessionBuilder) Reset added in v0.1.4

func (b *SessionBuilder) Reset(name string) *SessionBuilder

Reset 重置构建器(复用构建器)

func (*SessionBuilder) Subscribe added in v0.1.4

func (b *SessionBuilder) Subscribe(topic string, qos byte, handler MessageHandler) *SessionBuilder

Subscribe 添加预订阅主题

func (*SessionBuilder) TLS added in v0.1.4

func (b *SessionBuilder) TLS(caFile, certFile, keyFile string, skipVerify bool) *SessionBuilder

TLS 配置TLS安全连接

func (*SessionBuilder) Timeouts added in v0.1.4

func (b *SessionBuilder) Timeouts(connectSec, writeSec int) *SessionBuilder

Timeouts 设置连接和写入超时(秒)

func (*SessionBuilder) Validate added in v0.1.4

func (b *SessionBuilder) Validate() []error

Validate 快速验证当前配置

type SessionError

type SessionError struct {
	SessionName string
	Err         error
}

SessionError 会话相关错误(保持向后兼容)

func (*SessionError) Error

func (e *SessionError) Error() string

func (*SessionError) Unwrap

func (e *SessionError) Unwrap() error

type SessionMetrics

type SessionMetrics struct {
	MessagesSent     uint64 // 发送的消息数
	MessagesReceived uint64 // 接收的消息数
	BytesSent        uint64 // 发送的字节数
	BytesReceived    uint64 // 接收的字节数
	Errors           uint64 // 错误计数
	Reconnects       uint64 // 重连次数
	LastMessage      int64  // 最后消息时间(Unix纳秒时间戳,原子操作安全)
	LastError        int64  // 最后错误时间(Unix纳秒时间戳,原子操作安全)
}

SessionMetrics 会话指标

func GetSessionMetrics added in v0.1.4

func GetSessionMetrics() *SessionMetrics

GetSessionMetrics 获取会话指标对象

type SessionState

type SessionState struct {
	Topics           []TopicSubscription `json:"topics"`            // 订阅的主题列表
	Messages         []*Message          `json:"messages"`          // 待处理的消息
	LastSequence     uint64              `json:"last_sequence"`     // 最后序列号
	LastConnected    time.Time           `json:"last_connected"`    // 最后连接时间
	LastDisconnected time.Time           `json:"last_disconnected"` // 最后断开时间
	QoSMessages      map[uint16]*Message `json:"qos_messages"`      // QoS > 0 的消息,按MessageID索引
	RetainedMessages map[string]*Message `json:"retained_messages"` // 保留消息,按主题索引
	ClientID         string              `json:"client_id"`         // 客户端ID
	SessionExpiry    time.Time           `json:"session_expiry"`    // 会话过期时间
	Version          int                 `json:"version"`           // 状态版本,用于兼容性
}

SessionState 会话状态

type SessionStore

type SessionStore interface {
	SaveState(sessionName string, state *SessionState) error
	LoadState(sessionName string) (*SessionState, error)
	DeleteState(sessionName string) error
}

SessionStore 会话存储接口

type StorageOptions added in v0.1.4

type StorageOptions struct {
	// Type 存储类型: "memory", "file", 或 "redis"
	Type StoreType
	// Path 文件存储路径,仅在Type="file"时有效
	Path string
	// Redis Redis存储选项,仅在Type="redis"时有效
	Redis *RedisOptions
}

StorageOptions 存储选项

func DefaultStorageOptions added in v0.1.4

func DefaultStorageOptions() *StorageOptions

DefaultStorageOptions 返回默认的存储选项

type StoreType added in v0.1.4

type StoreType string

StoreType 存储类型

const (
	// StoreTypeMemory 内存存储
	StoreTypeMemory StoreType = "memory"
	// StoreTypeFile 文件存储
	StoreTypeFile StoreType = "file"
	// StoreTypeRedis Redis存储
	StoreTypeRedis StoreType = "redis"
)

type TLSConfig

type TLSConfig struct {
	CAFile     string // CA证书文件路径
	CertFile   string // 客户端证书文件路径
	KeyFile    string // 客户端密钥文件路径
	SkipVerify bool   // 是否跳过服务器证书验证
}

TLSConfig TLS配置

type TimerPool added in v0.1.4

type TimerPool struct {
	// contains filtered or unexported fields
}

TimerPool 定时器对象池

type TopicConfig

type TopicConfig struct {
	Topic   string         // 主题名称
	QoS     byte           // 服务质量等级
	Handler MessageHandler // 消息处理函数
}

TopicConfig 主题配置

type TopicFilter added in v0.1.4

type TopicFilter struct {
	// contains filtered or unexported fields
}

TopicFilter 基于主题的消息过滤器

func NewTopicFilter added in v0.1.4

func NewTopicFilter(pattern string, isRegex bool) (*TopicFilter, error)

NewTopicFilter 创建基于主题的消息过滤器 pattern可以是: 1. 精确匹配的主题名称(例如 "sensor/temperature") 2. 包含通配符的模式(例如 "sensor/#" 或 "sensor/+/temperature") 3. 正则表达式(使用isRegex=true)

func (*TopicFilter) GetDescription added in v0.1.4

func (f *TopicFilter) GetDescription() string

GetDescription 获取过滤器的描述

func (*TopicFilter) Match added in v0.1.4

func (f *TopicFilter) Match(message *Message) bool

Match 判断消息是否匹配主题过滤规则

type TopicRewriteTransformer added in v0.1.4

type TopicRewriteTransformer struct {
	// contains filtered or unexported fields
}

TopicRewriteTransformer 主题重写转换器 用于将消息的主题按照一定规则进行重写

func NewTopicRewriteTransformer added in v0.1.4

func NewTopicRewriteTransformer(pattern, replacement string, isRegex bool) (*TopicRewriteTransformer, error)

NewTopicRewriteTransformer 创建主题重写转换器 如果isRegex=true,pattern将被视为正则表达式,replacement可以包含正则表达式捕获组引用 例如:pattern="sensor/([^/]+)/temp", replacement="device/$1/temperature"

func (*TopicRewriteTransformer) GetDescription added in v0.1.4

func (t *TopicRewriteTransformer) GetDescription() string

GetDescription 获取转换器的描述

func (*TopicRewriteTransformer) Transform added in v0.1.4

func (t *TopicRewriteTransformer) Transform(message *Message) (*Message, error)

Transform 转换消息的主题

type TopicSubscription

type TopicSubscription struct {
	Topic string `json:"topic"` // 主题名称
	QoS   byte   `json:"qos"`   // QoS 级别
}

TopicSubscription 主题订阅信息

type ValidationErrors added in v0.1.4

type ValidationErrors struct {
	Errors []error `json:"errors"`
}

ValidationErrors 配置验证错误集合

func NewValidationErrors added in v0.1.4

func NewValidationErrors() *ValidationErrors

NewValidationErrors 创建验证错误集合

func (*ValidationErrors) Add added in v0.1.4

func (ve *ValidationErrors) Add(err error)

func (*ValidationErrors) Error added in v0.1.4

func (ve *ValidationErrors) Error() string

func (*ValidationErrors) HasErrors added in v0.1.4

func (ve *ValidationErrors) HasErrors() bool

Directories

Path Synopsis
advanced_client command
basic_client command
benchmark command
enhanced_tls command
forwarder command
mqtt5_auth command
redis_storage command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL