Message Bus Architecture
Table of Contents
- Introduction
- Project Structure
- Core Components
- Architecture Overview
- Detailed Component Analysis
- Dependency Analysis
- Performance Considerations
- Troubleshooting Guide
- Conclusion
- Appendix
Introduction
This document systematically describes the design philosophy and implementation principles of StreamHub, covering the overall architecture pattern of the message bus, component interaction relationships, lifecycle management and graceful shutdown mechanisms, startup processes, subscriber registration and event handler binding methods, configuration options and performance parameter tuning, as well as integration patterns with other components. The goal is to help readers quickly understand and correctly use this message bus architecture.
Project Structure
Message bus related code is mainly located in the pkg/messaging directory, focusing on "Publisher (JetStreamPublisher) — Subscriber (JetStreamSubscriber) — Bus Adaptation (StreamHub) — Application Integration (App)"; it also provides a memory-based event bus implementation (MemoryEventBus) for testing and demonstration.
- StreamHub: Adaptation and lifecycle proxy for JetStream subscription collection, uniformly exposing Start/Close interfaces externally, and providing AddSub registration method.
- JetStreamPublisher: Implements StreamPublisher interface, responsible for publishing domain events to NATS/JetStream stream, supports automatic stream creation, topic naming and batch publishing.
- JetStreamSubscriber[T]: Implements StreamSubscriber interface, generic binding to single event type, supports explicit acknowledgment, graceful shutdown, concurrency safety and error recovery.
- Subscribers Interface: Subscription collection abstraction, used for centralized management of multi-event type handlers.
- DomainEventHandler[T]: Domain event handler function type, constrained to types that implement entity.DomainEvent.
- App: Application orchestration entry point, responsible for connecting NATS, creating publishers/subscribers, startup and graceful shutdown.
- MemoryEventBus: Pure memory event bus, used for testing and demonstration, provides Pub/Sub/Unsub/Close and other capabilities.
StreamHub integrates JetStream's publish/subscribe capabilities with application lifecycle management, forming a three-layer architecture of "Publisher + Subscription Collection + Application Orchestration". The publisher is responsible for event writing, the subscription collection is responsible for event distribution and processing, and the application layer is responsible for startup, retry and graceful shutdown.
StreamHub Design and Lifecycle
- Design Philosophy: By composing the Subscribers interface, delegating specific JetStream subscription collection to StreamHub, thereby unifying Start/Close lifecycle management.
- Lifecycle:
- Start: Delegate to Subscribers.Start, start all subscribers.
- Close: Delegate to Subscribers.Close, gracefully shutdown all subscribers.
- Subscription Registration: AddSub registers handler to Subscribers, underlying binding implemented by specific subscriber.
- Key Features:
- Automatically creates JetStream stream, topic pattern is "aggregateType.>", ensuring all aggregate types within the service are covered.
- When publishing events, constructs topic "aggregateType.aggregateID.eventType", and serializes events to BaseEvent, payload stores strongly-typed JSON.
- Supports options: WithTopics/WithMaxAge/WithMaxSize/WithMaxMsgSize/Debug.
- Performance and Reliability:
- Uses explicit acknowledgment strategy to ensure reliable message delivery.
- Configurable stream retention policy and capacity limits, combined with WithMaxAge/WithMaxSize to control costs.
- Design Points:
- Generic binding to single event type, avoiding cross-event type competition and disorder.
- Persistent consumer (Durable), explicit acknowledgment (AckExplicit), supports DeliverAll/ReplayInstant.
- Concurrency safety: running flag + Mutex, WaitGroup waits for processing completion, graceful shutdown through context cancellation and timeout control.
- Error recovery: panic recover, failed message NAK,交由 JetStream retry.
- Processing Flow:
- Start: Create consumer, asynchronous Consume, internal loop processes messages.
- handleMessage: Deserialize BaseEvent -> Strongly-typed event -> Call DomainEventHandler -> Ack/Nak.
- Stop/Close: Cancel consumption context, wait for WaitGroup, cleanup resources.
- Subscribers Interface: Unified AddHandler, facilitating centralized management of handlers for different aggregate/event types.
- Event Handler Type: DomainEventHandler[T DomainEventConstraint], constrained to types that implement entity.DomainEvent.
- Event Stream Reader: StreamReader provides GetEvents/Replay and other capabilities, supports replay by version/offset.
- App.NewHub: Creates StreamHub, injects NATS connection, service name, target service name and logger, and joins NeedCleanup for graceful shutdown.
- App.StreamPub: Creates JetStream publisher, combines configuration to set maximum retention time and other parameters.
- App.Start: Starts all subprocesses (including subscribers), built-in exponential backoff retry mechanism; captures system signals for graceful shutdown, calls Close one by one with timeout.
- Event Publisher (EventPublisher): Publishes events from event store to memory event bus for projection/subscription use (complementary to JetStream stream publishing).
- MemoryEventBus: Pure memory implementation, Pub/Sub/Unsub/Close, asynchronous handler calls, supports timeout control and statistics.
- Applicable Scenarios: Unit testing, local development, rapid verification of event processing chain.
- Component Coupling:
- StreamHub and Subscribers are coupled through interfaces, reducing specific implementation dependencies.
- JetStreamPublisher/Subscriber depend on NATS/JetStream client, connection injected through App.
- App serves as orchestration center, unified management of lifecycle and resource cleanup.
- External Dependencies:
- NATS/JetStream: Event transmission and persistence.
- Logger: Unified recording of key information during publish/subscribe process.
- Configuration: NATS parameters (such as MaxAge) affect stream policy and retention time.
-
Publisher Side (JetStreamPublisher):
- Topic Granularity: aggregateType.> simplifies subscription management; if event volume is huge, suggest splitting by business dimension into services/streams.
- Batch Publishing: PublishEvents publishes one by one, can be merged into batches based on business scenarios to reduce network overhead.
- Stream Policy: Retention/LimitsPolicy suitable for event storage; reasonably set MaxAge/MaxBytes to control costs.
-
Subscriber Side (JetStreamSubscriber[T]):
- Explicit Acknowledgment: Avoids message loss; automatic retry on processing failure (NAK).
- Concurrency Model: Single subscriber single consumer, avoiding disorder caused by multiple subscribers in parallel for same aggregate.
- Resource Management: WaitGroup + timeout shutdown, prevents graceful shutdown blocking.
-
Configuration Suggestions:
- MaxAge: Set based on business retention needs, avoid infinite growth.
- MaxSize/MaxMsgSize: Set based on message size and storage budget.
- Debug: Can enable memory storage to observe behavior during development, disable in production.
-
Publish Failure:
- Check JetStream client creation and stream creation logs, confirm connection and permissions.
- Verify if topic naming conforms to "aggregateType.aggregateID.eventType" specification.
-
Subscribe Failure:
- View consumer creation logs, confirm stream existence and filter topic correctness.
- Observe handleMessage deserialization and handler return values, locate failure reasons.
-
Graceful Shutdown Stuck:
- Check Stop/Close timeout settings and handler duration, optimize processing logic if necessary.
- Confirm if WaitGroup correctly waits, avoid goroutine leaks.
-
Application Startup Retry:
- Focus on App.Start's exponential backoff retry logs, locate specific subprocess startup failure reasons.
This message bus architecture uses StreamHub as the core, combined with JetStream's high-reliability publish/subscribe capabilities and application-layer lifecycle management, providing clear separation of responsibilities and good extensibility. Through unified interface abstraction and strict graceful shutdown mechanisms, it can meet production environment requirements for stability and observability. It is recommended to reasonably configure stream policies and subscriber concurrency models in production environments, and continuously monitor key metrics of publish/subscribe chains.
Appendix
Configuration Options and Parameters
-
JetStreamPublisher Options:
- WithTopics: Specify all aggregate types within service, ensure complete topic coverage.
- WithMaxAge: Maximum message retention time (days -> seconds).
- WithMaxSize: Maximum stream bytes.
- WithMaxMsgSize: Maximum single message bytes.
- Debug: Enable memory storage for debugging.
-
App Integration:
- StreamPub: Creates publisher based on App configuration.
- NewHub: Creates StreamHub and includes in graceful shutdown management.
-
Create StreamHub:
- Obtain Hub instance through App.NewHub(serviceName), and call AddSub to register handler.
-
Create JetStreamPublisher:
- Obtain publisher through App.StreamPub(aggregateType...), call Publish/PublishEvents to publish events.
-
Startup and Shutdown:
- App.Start starts all subprocesses and enters listening; after capturing signals App.CleanUp calls Close one by one.
-
Event Publisher (EventPublisher):
- Used to publish events from event store to memory event bus for projection/subscription use.