Pub/Sub Mechanism
Table of Contents
- Introduction
- Project Structure
- Core Components
- Architecture Overview
- Detailed Component Analysis
- Dependency Analysis
- Performance Considerations
- Troubleshooting Guide
- Conclusion
- Appendix
Introduction
This document systematically describes the pub/sub mechanism in this repository, covering the following aspects:
- Publisher and subscriber design patterns and responsibility boundaries
- Topic naming and routing strategies, message distribution algorithms
- Publish acknowledgment and subscribe acknowledgment processes, receipt handling
- Configuration items and performance tuning parameters
- Custom publisher and subscriber implementation example paths
- Message loss and duplicate processing strategies
- Best practices and common pitfalls
Project Structure
Around pub/sub, related modules are mainly located under pkg/messaging, divided into three layers:
- Interface and Abstraction Layer: Defines publish/subscribe, event reading and other interfaces and generic constraints
- Adapter Layer: NATS JetStream-based publisher, subscriber and event reader implementations
- Business Integration Layer: Event publisher (publishes aggregate events to event bus), event subscriber (subscribes from event bus and distributes)
- StreamPublisher: Unified event publishing interface, supports single event and batch event publishing
- StreamSubscriber: Unified event subscription interface, supports Start/Stop/Close lifecycle
- StreamReader: Event stream reading and replay interface, supports replay by aggregate ID, version, offset
- EventPublisher: Publishes aggregate uncommitted events to event bus (generic event model)
- EventSubscriber[T]: Subscribes from event bus and filters by topic, deserializes to strongly-typed events and hands over to handler for processing
- JetStreamPublisher: NATS JetStream-based event publisher, automatically ensures stream exists, topic is aggregateType.aggregateID.eventType
- JetStreamSubscriber[T]: NATS JetStream-based event subscriber, explicit acknowledgment, graceful shutdown, ephemeral consumer mode
- JetStreamReader: NATS JetStream-based event reader, supports ephemeral consumer, batch pulling, replay by version/offset
The overall pub/sub flow is as follows:
- Aggregate produces events → Event publisher publishes events to event bus (or directly publishes to JetStream)
- Subscriber registers handler → Subscribes to event topic → Consumer pulls messages from JetStream → Deserializes to strongly-typed events → Calls domain handler
- Event reader is used for offline replay and aggregate reconstruction
Publisher Design and Responsibilities
- EventPublisher
- Responsibility: Reads events from event store, constructs complete topic (serviceName.aggregateType.eventType), serializes domain events and publishes to event bus
- Key Points: Topic naming, Base64 payload, publish one by one
- JetStreamPublisher
- Responsibility: Ensures JetStream stream exists, publishes events by aggregateType.aggregateID.eventType topic
- Key Points: Stream configuration Subjects, Retention policy, explicit acknowledgment, batch publishing
- EventSubscriber[T]
- Responsibility: Validates parameters during initialization, encodes topic by serviceName/aggregateType/eventType, registers to event bus; decodes Base64 in callback, deserializes to strongly-typed events, calls handler
- Key Points: Topic encoding, serviceName/aggregateType filtering, handler registration and concurrency safety
- JetStreamSubscriber[T]
- Responsibility: Creates persistent consumer, Consume loop processes messages; explicit acknowledgment; graceful shutdown (context control, WaitGroup wait)
- Key Points: AckPolicy explicit acknowledgment, FilterSubject precise subscription, ReplayPolicy instant replay, panic recover, state lock
- Event Bus (EventPublisher/EventSubscriber)
- Topic Naming: serviceName.aggregateType.eventType
- Subscription Filtering: Supports serviceName, aggregateType filtering; event payload Base64 encoded, decoded after arrival then deserialized
- JetStream (JetStreamPublisher/JetStreamSubscriber)
- Topic Naming: aggregateType.aggregateID.eventType
- Subscription Filtering: FilterSubject aggregateType.*.eventType, consumer configuration AckPolicy explicit acknowledgment, DeliverAllPolicy, ReplayInstantPolicy
- Event Reading: Ephemeral consumer filters by aggregate ID, supports replay by version/offset
- Event Bus Distribution: Subscriber generates topic during registration, after event arrives matches by topic, executes wrapper handler for filtering and deserialization, then calls user handler
- JetStream Distribution: Consumer subscribes by FilterSubject, in Consume callback deserializes BaseEvent and converts to strongly-typed event, calls handler, finally explicit acknowledgment
-
Publish Acknowledgment
- JetStreamPublisher: Uses jetstream.Publish, underlying reliability guarantee provided by NATS JetStream (depends on server configuration)
- Event Bus: EventPublisher publishes generic events, depends on event bus implementation reliability
-
Subscribe Acknowledgment
- JetStreamSubscriber: AckPolicy explicit acknowledgment; NAK on processing failure, triggers retry; waits for processing completion during graceful shutdown
- Event Bus: EventSubscriber manages subscriptions through event bus Sub/Unsub, confirmed by upper layer bus after successful processing
-
JetStream: Explicit Ack after successful message processing; Nak on processing failure, triggers retry; quick Ack and return when context cancelled
-
Event Bus: Confirmation semantics determined by event bus implementation (here determined by specific event bus implementation)
-
JetStreamPublisher Options
- WithTopics: Sets aggregate type list, as stream topic prefix
- WithMaxAge/WithMaxSize/WithMaxMsgSize: Stream and message size limits
- Debug: Memory storage debug switch
-
JetStreamSubscriber Subscriber
- Consumer Configuration: Durable, AckPolicy explicit acknowledgment, FilterSubject, DeliverAllPolicy, ReplayInstantPolicy
- Graceful Shutdown: Stop(ctx) supports timeout, WaitGroup waits for processing completion
-
Event Reader
- Ephemeral Consumer: ephemeral, avoids control plane pressure from persistent consumers
- Batch Pulling: FetchNoWait(1000), applies to aggregate after sorting by version
-
Custom Publisher
- Implement StreamPublisher interface: Reference JetStreamPublisher's Publish/PublishEvents and ensureStream stream creation logic
- Example paths: JetStreamPublisher, JetStreamPublisher.ensureStream, JetStreamPublisher.Publish
-
Custom Subscriber
- Implement StreamSubscriber interface: Reference JetStreamSubscriber's Start/Stop/Close and runConsumer/handleMessage
- Example paths: JetStreamSubscriber.Start, JetStreamSubscriber.Stop, JetStreamSubscriber.handleMessage
-
Event Bus Bridge
- EventPublisher: Reference EventPublisher, EventPublisher.publishEvent
- EventSubscriber[T]: Reference EventSubscriber[T], EventSubscriber[T].AddServiceHandler
-
Loss Handling
- JetStream: Uses explicit acknowledgment (AckPolicy explicit acknowledgment), NAK on processing failure triggers retry; waits for processing completion during graceful shutdown
- Event Bus: Depends on event bus implementation reliability semantics
-
Duplicate Handling
- Subscriber should have idempotency design; event reader supports replay by version, facilitating idempotent application
- Event Bus Bridge: EventPublisher can combine with event store sequentiality before publishing to reduce duplicate probability
-
Best Practices
- One service only maintains one JetStreamPublisher instance, stream name is service name, topic is aggregateType.>
- Subscriber uses explicit acknowledgment, processing logic should be short and idempotent
- Use ephemeral consumer for event reading, avoids control plane pressure from persistent consumers
- Unified topic naming specification when bridging event bus, facilitates cross-service decoupling
-
Common Pitfalls
- Multiple subscribers running in parallel causing event processing disorder and inconsistency
- Forgetting Ack causing message accumulation and duplicate retry
- Inconsistent topic naming causing events to be unable to route correctly
Dependency relationships between pub/sub components are as follows:
-
Consumer Batching: Event reader uses FetchNoWait(1000) for batch pulling, reduces control plane overhead
-
Ephemeral Consumer: Avoids control plane pressure from persistent consumers
-
Acknowledgment Strategy: Explicit acknowledgment avoids duplicate processing, reasonably set retry strategy
-
Topic Design: aggregateType.> as topic prefix, facilitates extension and isolation
-
Publish Failure
- Check if JetStream stream exists and configuration is correct
- View log output, locate serialization/publish errors
-
Subscribe No Response
- Confirm if topic naming matches FilterSubject
- Check if consumer is created successfully and is in running state
-
Processing Failure Retry
- Check Nak and retry strategy; ensure handler idempotency
-
Graceful Shutdown Timeout
- Check if WaitGroup is blocked; confirm context cancellation propagation
This pub/sub system provides high-reliability, scalable message publishing and subscription capabilities through unified interface abstraction and JetStream adapter. Combined with event bus bridge and event reader, it can meet event publishing, subscription, replay and aggregate reconstruction needs in event-driven architecture. Following topic naming specifications, explicit acknowledgment and idempotent processing are key to ensuring system stability.
Appendix
- Test Case References
- Event Bus Bridge Communication Test: publisher_subscriber_test.go
- JetStream Multi-aggregate and Pub/Sub Test: js_test.go