Message Publish-Subscribe
Table of Contents
- Introduction
- Project Structure
- Core Components
- Architecture Overview
- Component Details
- Dependency Analysis
- Performance and Optimization
- Troubleshooting Guide
- Conclusion
- Appendix
Introduction
This technical document focuses on message publish-subscribe mechanism, covers implementation principles of publishers and subscribers, message routing rules, filter mechanisms; details message serialization and deserialization flow, message format standards and version compatibility strategies; demonstrates implementation ideas of point-to-point communication, publish-subscribe mode and request-response mode; and provides message acknowledgment mechanism, retry strategies and dead letter queue handling recommendations. The document also gives performance optimization tips, batch processing strategies and memory management suggestions, providing a complete implementation guide for publish-subscribe systems for message middleware developers.
Project Structure
The messaging subsystem of this repository is mainly located in pkg/messaging and pkg/eventbus directories, providing two types of capabilities: event stream (JetStream/NATS) and generic event bus (memory/other implementations):
- Event Stream (JetStream/NATS): Provides high throughput, persistence, replay capabilities, suitable for event storage and distribution in event-driven architecture.
- Generic Event Bus: Provides memory event bus implementation, convenient for testing and local development.
Core Components
Publisher Interface and Implementation
- StreamPublisher: Unified event stream publish interface, defines Publish and PublishEvents.
- JetStreamPublisher: NATS JetStream-based event stream publisher, responsible for ensuring stream exists, subject mapping and event publishing.
- EventPublisher: Converts domain events to generic events and publishes to generic event bus (used for integration with projection/event store).
Subscriber Interface and Implementation
- StreamSubscriber: Unified event stream subscribe interface, defines Start, Stop.
- JetStreamSubscriber: Generic subscriber, supports graceful shutdown, explicit acknowledgment, error handling and retry.
- EventSubscriber: Generic event subscriber, supports filtering by service name, aggregate type and event type.
Event Stream Reader
- JetStreamReader: JetStream-based event reader, supports replay by aggregate ID, version, offset, and filter functions.
Event Bus
- EventBus Interface and MemoryEventBus: Provides memory event bus implementation, convenient for testing and local development.
Architecture Overview
The following diagram shows the interaction relationship between publishers, subscribers, event streams and event buses, as well as message flow paths in the system.
Component Details
Publisher: JetStreamPublisher
Role and Responsibilities
- Ensures event stream exists (using service name as stream name), automatically creates stream and subjects.
- Serializes domain events to JSON, and publishes to JetStream with subject aggregateType.aggregateID.eventType.
- Provides Publish and PublishEvents batch publishing capabilities.
Subject Naming and Routing
- Subject Format: aggregateType.aggregateID.eventType, convenient for JetStream Consumer to filter by subject.
- Aggregate type collection is injected through options, ensures stream's subject collection covers all aggregate types in service.
Serialization and Payload
- Event object JSON serialization is written to BaseEvent.Payload field, uses Base64 encoding to be compatible with binary payloads.
Error Handling and Logging
- Records error log and returns error when publishing fails; records fatal log and terminates when stream creation fails.
Subscriber: JetStreamSubscriber
Role and Responsibilities
- Generic subscriber, binds to single event type, supports graceful start and stop.
- Explicit acknowledgment (Ack) ensures reliability; NAK triggers retry when processing fails.
- Precise lifecycle control: ensures graceful shutdown through WaitGroup and context cancellation.
Consumption and Deserialization
- In consumption callback, first deserializes to BaseEvent, then restores to specific event type through entity layer deserialization function.
- Records error and NAKs when message processing fails,交由 JetStream retry.
Filtering and Replay
- Consumer configuration uses FilterSubject to limit event type, supports DeliverAllPolicy and ReplayInstantPolicy.
- Supports replay by offset through OptStartSeq, combines with event version for ordered application.
Event Stream Reader: JetStreamReader
Role and Responsibilities
- JetStream-based event reader, supports replay by aggregate ID, version range, offset.
- Adopts ephemeral consumer mode, avoids state complexity from persistent consumers.
Reading and Filtering
- Uses FilterSubject to limit aggregate type and aggregate ID, combines with filter functions to further filter events.
- Supports sorting by version ascending then applying to aggregate root, ensures event application order.
Error Handling
- Records error and skips or returns error when deserialization fails or application fails.
Generic Event Bus and Event Subscriber
Generic Event Bus (MemoryEventBus)
- Provides Pub/Sub/Unsub/Close capabilities, internally stores handler list with map, supports asynchronous calling.
- Suitable for testing and local development, convenient for verifying event distribution and handler behavior.
Generic Event Subscriber (EventSubscriber)
- Supports filtering by service name, aggregate type and event type, format is serviceName.aggregateType.eventType.
- Performs Base64 decoding and JSON deserialization on generic events received from event bus, then calls domain event handler.
Event Publisher
Role and Responsibilities
- Converts domain events from event store to generic events (containing complete subject: serviceName.aggregateType.eventType), and publishes to generic event bus.
- Supports publishing all events and uncommitted events of aggregate.
Serialization and Subject Naming
- Uses JSON serialization for domain events, Base64 encoding then puts into Payload field of generic event.
- Subject naming follows serviceName.aggregateType.eventType, convenient for cross-service event distribution and filtering.
Dependency Analysis
Interface and Implementation
- StreamPublisher/StreamSubscriber are abstract interfaces for event stream publish and subscribe, JetStreamPublisher/JetStreamSubscriber are their concrete implementations.
- EventBus is generic event bus interface, MemoryEventBus is its memory implementation.
Component Coupling
- JetStreamPublisher depends on NATS JetStream client, responsible for stream and subject management.
- JetStreamSubscriber depends on NATS JetStream Consumer, responsible for consumption and acknowledgment.
- EventPublisher depends on event store and generic event bus, responsible for domain event to generic event conversion.
- EventSubscriber depends on generic event bus, responsible for generic event to domain event deserialization and filtering.
Performance and Optimization
Publisher Side Optimization
- Batch Publishing: Use PublishEvents to publish multiple events at once, reduces network round trips and JetStream call overhead.
- Subject Design: aggregateType.aggregateID.eventType subject has good partitioning and filtering capabilities, helps consumers process in parallel.
- Payload Compression: If event payload is large, can introduce compression before Base64 encoding at business layer, reduces transmission volume.
Subscriber Side Optimization
- Concurrent Processing: Parallel consumption of different event types through multiple JetStreamSubscriber instances, improves throughput.
- Partitioning and Replay: Use OptStartSeq and version replay, avoids pressure from full replay.
- Backpressure and Rate Limiting: Add rate limiting and backpressure control in handlers, prevents instantaneous peaks from causing memory and CPU spikes.
Memory Management
- Event deserialization tries to reuse object pools or buffers, reduces GC pressure.
- Avoid holding large object references in handlers, release resources that are no longer used in time.
Event Stream Configuration
- JetStream stream configuration (maximum message age, size, message size) can be adjusted according to business needs, avoids unlimited growth causing disk and memory pressure.
Troubleshooting Guide
Publishing Failure
- Check JetStream client connection and permissions; view error information "stream creation failed/event publishing failed" in logs.
- Confirm if subject format and aggregate type collection are correctly configured.
Subscription Failure or No Messages
- Check Consumer configuration (FilterSubject/Durable/AckPolicy) and if JetStream stream exists.
- Confirm if event payload is correctly serialized and Base64 encoded.
Processing Failure and Retry
- Subscriber will NAK when handler returns error, triggers JetStream retry; check "event processing failure/message acknowledgment failure" in logs.
- If persistent failure, consider introducing dead letter queue (DLQ) strategy: transfer messages that fail multiple times to dedicated subject, subsequent manual intervention.
Graceful Shutdown
- Use Stop(ctx) and pass timeout context, ensures processing completes or forces exit after timeout; check if WaitGroup correctly waits.
Conclusion
This message publish-subscribe system provides high reliability, scalable event-driven architecture support through unified interface abstraction and JetStream/NATS event stream capabilities. Both publishers and subscribers have clear lifecycle control and error handling mechanisms; event serialization and subject naming conventions ensure cross-service discoverability and maintainability. Combined with batch processing, parallel consumption and memory management optimization methods, can achieve stable performance in high concurrency scenarios. Recommend cooperating with dead letter queue and monitoring alerting system in production environment, further improve system observability and operability.
Appendix
Message Format and Version Compatibility
Message Format
- Event payload uses JSON serialization, Base64 encoding then puts into Payload field.
- Subject format: aggregateType.aggregateID.eventType; generic event format contains id, event_type, timestamp, payload.
Version Compatibility
- Event version field can be used for replay and sorting, ensures event application order.
- Recommend introducing version number field in event structure, for backward compatibility handling during upgrades.
Test Cases
JetStream Publish-Subscribe Multi-Aggregate and Multi-Event Type Test
- Start NATS JetStream container, create publisher and bus, register multiple handlers, publish multiple events and verify reception.
Event Stream Reading and Replay Test
- Use JetStreamReader to read events by aggregate ID, version range and offset and apply to aggregate root.