Skip to main content

Message Passing

Table of Contents

Introduction

This document is for messaging system developers, systematically organizing the Sparrow message passing system: event bus architecture, NATS integration, pub/sub patterns, serialization and routing strategies, reliability guarantees, and support status and extension suggestions for protocols like RabbitMQ and Redis. The document also covers advanced features such as message filtering, deduplication and idempotency practices, batch and stream processing, and provides visual diagrams and best practices.

Project Structure

Message passing related code is mainly distributed in the following modules:

  • Message bus abstraction and implementation: bus, core, publisher, subscriber, jspub, jssub, nats_reader, etc. in the messaging package
  • Event bus (EventBus): memory, NATS, RabbitMQ, Redis implementations under the eventbus package
  • Configuration: NATsConfig, RabbitMQConfig, RedisConfig under the config package
  • Interfaces and Constraints
    • StreamPublisher: Defines publishing capabilities (single and batch)
    • StreamSubscriber: Defines subscription capabilities (Start/Stop)
    • StreamReader: Defines event reading and replay capabilities (GetEvents/Replay/ReplayFromVersion/ReplayFromOffset)
    • DomainEventConstraint: Generic constraint, limiting types that implement entity.DomainEvent
  • Event Bus Adaptation
    • EventPublisher: Publishes domain events from event store to generic event bus (including topic naming and serialization)
    • EventSubscriber[T]: Subscribes to generic event bus, filters by "serviceName.aggregateType.eventType" topic and deserializes to specific domain events
  • NATS Event Stream
    • JetStreamPublisher: Event publishing based on JetStream, automatically ensures stream exists, topic is "aggregateType.aggregateID.eventType"
    • JetStreamSubscriber[T]: Event subscription based on JetStream, explicit acknowledgment, graceful shutdown, ephemeral consumer mode
    • JetStreamReader: Event reading and replay based on JetStream, supports replay by version and offset
  • Bus Hub
    • StreamHub: Encapsulates subscriber collection, provides unified entry points such as AddSub, Start, Close

Sparrow's message passing system is divided into three layers:

  • Event Bus Layer: Generic event bus (memory/NATS/RabbitMQ/Redis) for cross-service broadcasting and subscription
  • Event Stream Layer: Event stream based on NATS JetStream for event storage and replay
  • Application Integration Layer: StreamHub/EventPublisher/EventSubscriber unified access and orchestration

Event Bus Adaptation: EventPublisher and EventSubscriber

  • Topic Naming and Serialization
    • EventPublisher serializes domain events to JSON and Base64 encodes them, constructs generic events, topic format is "serviceName.aggregateType.eventType", publishes to generic event bus
    • EventSubscriber filters through topics (supports three-part service name, aggregate type, event type), performs Base64 decoding and JSON deserialization on Payload, then calls specific handler
  • Handler Registration and Concurrency
    • EventSubscriber uses map to store handlers, protected by read-write lock; supports filtering by service name and wildcard aggregate type matching
  • Error Handling
    • Logs and returns errors for Base64 decoding, JSON deserialization, subscription registration failures, etc.
  • Publish (JetStreamPublisher)
    • Automatically ensures stream exists (topic collection is "aggregateType.>"), publish topic is "aggregateType.aggregateID.eventType"
    • Provides options: max message age, max stream size, max message size, debug mode (memory storage)
  • Subscribe (JetStreamSubscriber[T])
    • Explicit acknowledgment strategy, persistent consumer (Durable), supports delivery from earliest message and instant replay
    • Graceful shutdown: through context cancellation, WaitGroup waits for processing completion, cleanup resources
    • Message processing: deserializes BaseEvent, then deserializes to specific event type by entity tool, calls handler, NAK on failure
  • Read (JetStreamReader)
    • Ephemeral consumer (non-persistent), supports filtering by aggregate ID, version filtering, offset filtering
    • Full pull and sort then apply to aggregate root, facilitating replay and projection reconstruction
  • StreamHub encapsulates subscriber collection as Subscribers interface, provides unified entry points such as AddSub, Start, Close, facilitating centralized management on the application side
  • Works with NewJetStreamBus/StreamPub/StreamReader and other components
  • Component Coupling
    • JetStreamPublisher/JetStreamSubscriber/JetStreamReader depend on NATS JetStream client
    • EventPublisher/EventSubscriber depend on generic event bus interface and entity serialization tools
    • StreamHub depends on Subscribers interface, facilitating replacement of different implementations (memory/event stream/NATS, etc.)
  • External Dependencies
    • NATS (JetStream), RabbitMQ, Redis event bus implementations are located in subdirectories under eventbus
    • Configuration items are injected through config package (NATS/RabbitMQ/Redis)
  • Batch Publishing

    • JetStreamPublisher/Publisher both provide PublishEvents batch interface, reducing network round trips and system call overhead
  • Stream Reading and Replay

    • JetStreamReader uses FetchNoWait for batch pulling, combined with sorting and filtering by version/offset, reducing one-time memory usage
  • Consumer Lifecycle

    • JetStreamSubscriber[T] controls graceful shutdown through WaitGroup and context, avoiding message loss and resource leaks
  • Resource Cleanup

    • JetStreamReader uses ephemeral consumer, automatically cleaned up after consumption, reducing control plane pressure
  • Publish Failure

    • JetStreamPublisher: Check if stream exists, if topic matches, message size limits, serialization errors
    • EventPublisher: Check service name, topic concatenation, JSON serialization and Base64 encoding
  • Subscribe Failure

    • JetStreamSubscriber[T]: Confirm consumer creation, filter topic, explicit acknowledgment, graceful shutdown process
    • EventSubscriber[T]: Confirm topic filter conditions, Base64 decoding and deserialization chain
  • Read Anomaly

    • JetStreamReader: Focus on stream non-existence, ephemeral consumer creation, batch pulling and message acknowledgment
  • Log Localization

    • All components output key fields (such as consumer, subject, stream, error), facilitating quick problem location

Sparrow's message passing system uses unified interface abstraction for event bus and event stream, combined with NATS JetStream to implement high-reliability, replayable event storage and subscription models. Through clear topic naming, strict serialization and deserialization processes, explicit acknowledgment and graceful shutdown mechanisms, the system has a good foundation in maintainability and extensibility. For protocols like RabbitMQ and Redis, extensions can be implemented through eventbus subpackages to meet different deployment and performance needs.

Appendix

Message Filtering, Deduplication and Idempotency

  • Filtering

    • Event Bus: EventSubscriber[T] supports three-part filtering by service name, aggregate type, event type
    • Event Stream: JetStreamSubscriber[T] uses FilterSubject for precise subscription
  • Deduplication

    • Suggest application-layer deduplication based on event ID or aggregate version (e.g., maintaining processed event set in handler)
  • Idempotency

    • Suggest implementing idempotent logic in handler (e.g., deduplicate by event ID, judge whether already applied by version number)
  • Batch Publishing: PublishEvents of JetStreamPublisher/Publisher

  • Stream Reading: FetchNoWait batch pulling and sorting of JetStreamReader

  • Existing Implementations

    • Event Bus: Memory, NATS, RabbitMQ, Redis
    • Event Stream: NATS JetStream
  • Extension Suggestions

    • New Protocol: Add subpackage under eventbus, implement EventBus interface; provide corresponding Publisher/Subscriber/Reader in messaging
    • Configuration: Add corresponding configuration struct in config package, inject during bootstrap phase