Skip to main content

NATS Integration

Table of Contents

  1. Introduction
  2. Project Structure
  3. Core Components
  4. Architecture Overview
  5. Detailed Component Analysis
  6. Dependency Analysis
  7. Performance Considerations
  8. Troubleshooting Guide
  9. Conclusion
  10. Appendix

Introduction

This document systematically describes the NATS integration in this repository, covering the following aspects:

  • NATS connection management and lifecycle
  • JetStream stream configuration and creation process
  • Consumer creation, configuration and management
  • Message publishing and subscription process
  • Error handling and retry mechanisms
  • Configuration options and performance tuning
  • Best practices and common pitfalls

Project Structure

NATS integration related code is mainly located in the pkg/messaging directory, including:

  • NATS client initialization and connection management
  • JetStream publisher, subscriber and event reader implementations
  • Stream configuration and creation
  • Consumer configuration and management
  • NATS Connection: NATS client initialization, connection establishment and lifecycle management
  • JetStreamPublisher: Event publisher based on NATS JetStream, supports automatic stream creation and configuration
  • JetStreamSubscriber[T]: Event subscriber based on NATS JetStream, supports consumer creation and message processing
  • JetStreamReader: Event reader based on NATS JetStream, supports replay and reading
  • Stream: JetStream stream configuration and creation, including topic, retention policy, etc.
  • Consumer: JetStream consumer configuration and management, including acknowledgment mode, filtering, etc.

NATS integration architecture is divided into three layers:

  • Connection Layer: Manages NATS client connection, provides JetStream context
  • Stream Layer: Configures and creates JetStream streams, defines topic and retention policies
  • Application Layer: Publishers, subscribers and readers implement specific message publishing, subscription and reading functions

Connection Management and Lifecycle

  • Connection Establishment
    • Uses nats.Connect to establish connection with NATS server, supports options configuration
    • Automatically handles reconnection, supports setting maximum reconnection attempts and intervals
    • Connection status can be monitored through Status and Drain methods
  • JetStream Context Creation
    • Obtains JetStream context through nc.JetStream()
    • JetStream context is the entry point for all stream operations
  • Connection Lifecycle
    • Connection established during application initialization
    • Connection closed during application shutdown (nc.Drain())
    • Supports graceful shutdown, ensures all pending messages are processed
  • Stream Creation Process
    • Checks if stream exists, creates if it doesn't
    • Configures stream name, topic list (Subjects), retention policy
    • Supports setting maximum message age, maximum bytes, maximum message size
    • Supports debug mode (memory storage)
  • Stream Configuration Options
    • Name: Stream name, typically service name
    • Subjects: Topic list, supports wildcards (e.g., aggregateType.>)
    • Retention: Retention policy (LimitsPolicy or InterestPolicy)
    • MaxAge: Maximum message retention time
    • MaxBytes: Maximum stream bytes
    • MaxMsgSize: Maximum single message size
    • Storage: Storage type (FileStorage or MemoryStorage)
  • Consumer Creation
    • Creates consumer through CreateOrUpdateConsumer
    • Configures consumer name, filter topic, acknowledgment mode, etc.
    • Supports persistent consumer (Durable) and ephemeral consumer
  • Consumer Configuration
    • Durable: Consumer name, empty for ephemeral consumer
    • FilterSubject: Filter topic, supports wildcards
    • AckPolicy: Acknowledgment mode (AckExplicit, AckAll, AckNone)
    • DeliverPolicy: Delivery policy (DeliverAll, DeliverLast, DeliverNew)
    • ReplayPolicy: Replay policy (ReplayInstant, ReplayOriginal)
    • MaxDeliver: Maximum delivery attempts
  • Consumer Lifecycle
    • Created during subscriber Start
    • Actively deleted or automatically cleaned up during Stop/Close
    • Ephemeral consumer automatically cleaned up after disconnection
  • Publishing Process
    • Constructs complete topic (aggregateType.aggregateID.eventType)
    • Serializes event to JSON format
    • Calls js.Publish to publish message
    • Waits for server acknowledgment (depends on configuration)
  • Publishing Options
    • Supports setting message headers
    • Supports asynchronous publishing
    • Supports batch publishing
  • Subscription Process
    • Creates consumer and configures filter topic
    • Calls consumer.Consume to start consumption
    • Callback function processes messages
    • Explicit acknowledgment after successful processing
  • Message Processing
    • Deserializes message payload to BaseEvent
    • Converts BaseEvent to strongly-typed domain event
    • Calls domain event handler
    • Ack or Nak based on processing result
  • Connection Errors

    • Automatically reconnects on connection failure
    • Triggers error callback after exceeding maximum reconnection attempts
    • Suggests setting reasonable reconnection parameters
  • Publish Errors

    • Returns error on stream non-existence
    • Returns error on message size exceeding limit
    • Returns error on timeout
  • Subscription Errors

    • Returns error on consumer creation failure
    • Returns error on message processing failure (triggers Nak retry)
    • Handles panic in callback function
  • Retry Mechanisms

    • Connection layer: Automatic reconnection
    • Subscription layer: Nak triggers message redelivery
    • Application layer: Exponential backoff retry
  • NATS Configuration

    • URL: NATS server address
    • MaxReconnects: Maximum reconnection attempts
    • ReconnectWait: Reconnection wait time
    • Timeout: Connection timeout
  • JetStream Configuration

    • Stream name and topic prefix
    • Retention policy and storage type
    • Capacity limits (MaxAge, MaxBytes, MaxMsgSize)
  • Consumer Configuration

    • Acknowledgment mode (suggests AckExplicit)
    • Delivery policy (suggests DeliverAll)
    • Replay policy (suggests ReplayInstant)
    • Maximum delivery attempts
  • Connection Management

    • Use singleton pattern to manage NATS connection
    • Set reasonable reconnection parameters
    • Use Drain for graceful shutdown
  • Stream Design

    • Topic design follows aggregateType.> pattern
    • Reasonably set retention policy and capacity limits
    • Use file storage in production environment
  • Consumer Design

    • Use explicit acknowledgment mode
    • Set reasonable maximum delivery attempts
    • Handle message idempotency
  • Error Handling

    • Handle all possible error cases
    • Set up monitoring and alarm mechanisms
    • Record detailed error logs
  • Connection Failure

    • Check NATS server status
    • Check network connectivity
    • View reconnection logs
  • Stream Creation Failure

    • Check if stream name conflicts
    • Check if topic format is correct
    • Check if permissions are sufficient
  • Consumer Creation Failure

    • Check if stream exists
    • Check if filter topic is correct
    • Check if consumer name conflicts
  • Message Publishing Failure

    • Check if stream exists
    • Check if message size exceeds limit
    • Check if topic format is correct
  • Message Subscription Failure

    • Check if consumer exists
    • Check if filter topic matches
    • Check if handler is registered correctly

NATS integration provides high-performance, high-reliability messaging infrastructure through JetStream. Reasonable configuration of streams and consumers, correct handling of connection lifecycle and error cases are key to ensuring system stability. It is recommended to follow best practices in production environments and set up complete monitoring and alarm mechanisms.

Appendix

Configuration Example

// NATS connection configuration
natsConfig := &config.NATSConfig{
URL: "nats://localhost:4222",
MaxReconnects: 10,
ReconnectWait: time.Second,
Timeout: 5 * time.Second,
}

// JetStream stream configuration
streamConfig := &config.StreamConfig{
Name: "my-service",
Subjects: []string{"aggregateType.>"},
Retention: jetstream.LimitsPolicy,
MaxAge: 7 * 24 * time.Hour,
MaxBytes: 1024 * 1024 * 1024, // 1GB
MaxMsgSize: 1024 * 1024, // 1MB
Storage: jetstream.FileStorage,
}

// Consumer configuration
consumerConfig := &config.ConsumerConfig{
Durable: "my-consumer",
FilterSubject: "aggregateType.*.eventType",
AckPolicy: jetstream.AckExplicitPolicy,
DeliverPolicy: jetstream.DeliverAllPolicy,
ReplayPolicy: jetstream.ReplayInstantPolicy,
MaxDeliver: 5,
}

Key Metrics

  • Connection Status: Connected, Disconnected, Reconnecting, Closed
  • Stream Statistics: Message count, byte size, consumer count
  • Consumer Statistics: Pending message count, redelivery count, acknowledgment rate
  • Performance Metrics: Publishing latency, consumption latency, throughput