Concurrent Scheduler
Table of Contents
- Introduction
- Project Structure
- Core Components
- Architecture Overview
- Detailed Component Analysis
- Dependency Analysis
- Performance Considerations
- Troubleshooting Guide
- Conclusion
- Appendix
Introduction
This document systematically describes the design and implementation of the concurrent scheduler, covering concurrent task management, task pool control, resource limits, load balancing, task scheduling flow, execution monitoring, error handling and retry mechanisms, and providing performance optimization suggestions, resource usage monitoring, and fault recovery strategies. It focuses on explaining the working mechanism and configuration parameters of SetMaxConcurrentTasks, providing practical usage examples and best practices.
Project Structure
The concurrent scheduler is located in the pkg/tasks directory, implementing multiple execution modes around the unified TaskScheduler interface:
- Concurrent execution mode: concurrent_scheduler.go
- Sequential execution mode: sequential_scheduler.go
- Hybrid execution mode: hybrid_scheduler.go
- Task build and wrap: builder.go, wrapper.go
- Core types and constants: core.go
- Tests and examples: tasks_test.go
- Application startup and integration: app.go
- TaskScheduler Interface: Unified scheduler capability abstraction, including scheduling, cancellation, status query, listing, startup, stop, shutdown, etc.
- Task Interface: Task contract, including ID, type, scheduling time, handler, completion/cancellation callbacks, periodicity, and timeout.
- TaskInfo: Task metadata, including status, scheduling time, creation/update time, retry count and maximum retry, next retry time, TTL, etc.
- TaskBuilder: Task builder, providing chain configuration (ID, type, handler, callbacks, scheduling, timeout, retry, priority).
- SchedulerWrapper: Scheduler wrapper, as the only external entry point, responsible for task submission, configuration and mode selection, concurrency setting, status query and listing.
- Concurrent scheduler concurrentScheduler: Supports multi-task concurrent execution, built-in active task counting and condition variables, supports task expiration cleanup, timed/retry task processing, timeout and retry.
- Sequential scheduler sequentialScheduler: Serial execution, ensures task order, built-in queue and notification channel, supports timed/periodic tasks and exponential backoff retry.
- Hybrid scheduler HybridScheduler: Selects concurrent or sequential execution based on task type or scheduling characteristics, unified external interface.
The concurrent scheduler adopts a layered design of "interface abstraction + multiple implementations + wrapper entry":
- Interface layer: TaskScheduler, Task, TaskInfo define unified contracts.
- Implementation layer: concurrentScheduler, sequentialScheduler, HybridScheduler implement different execution strategies respectively.
- Tool layer: TaskBuilder, SchedulerWrapper provide easy-to-use build and call entry points.
- Lifecycle: Start/Stop/Close provide consistent startup and graceful shutdown semantics; concurrent scheduler implements concurrency slot control and task monitoring through goroutine + condition variables.
Concurrent Scheduler concurrentScheduler
- Concurrency Control: Controls concurrency slots through active task count activeTasks and condition variable activeTasksCond; checks and waits before execution, decreases count and notifies after completion.
- Task Monitoring: runTaskMonitor periodically scans timed/retry tasks and triggers execution; cleanupExpiredTasks cleans up expired completed tasks.
- Task Pool and Limits: maxConcurrent controls maximum concurrency; maxTaskCount limits total task count, removeOldestCompletedTask selects oldest completed tasks for elimination.
- Task Execution: executeTask sets running status, supports task timeout, error classification (timeout/cancellation vs failure), retry count and backoff, dead letter handling, completion callback.
- State Management: TaskInfo state machine covers waiting/running/completed/cancelled/failed/retrying/dead_letter.
- Function: Dynamically sets the maximum concurrent task count for the concurrent scheduler, only effective on concurrent scheduler and concurrent part of hybrid scheduler.
- Parameter Validation: Requires greater than 0, otherwise returns error.
- Concurrency Safety: Uses mutex to protect maxConcurrent field.
- Effective Paths:
- Direct concurrent scheduler: Calls concurrent scheduler's SetMaxConcurrentTasks through wrapper's SetMaxConcurrentTasks.
- Hybrid scheduler: Sets maxConcurrent for concurrent part through wrapper.
- Execution Wait: Concurrent scheduler polls and checks activeTasks and maxConcurrent before execution, briefly sleeps and waits for slot release when necessary.
- Scheduling Entry: Schedule(task) validates task legality, sets TaskInfo status and timestamp, stores task; if immediate or expired, executes asynchronously.
- Timed/Retry: runTaskMonitor periodic scan, matches waiting/timed and retrying/nextRetryAt expired tasks, triggers executeTask.
- Execution Monitoring: In executeTask:
- Set running status
- Support task timeout (context.WithTimeout)
- Success: mark completed; periodic tasks reschedule by interval
- Failure: distinguish timeout/cancellation from business errors; execute linear backoff retry by MaxRetries and RetryCount; enter dead_letter when exceeding threshold
- Trigger OnComplete callback
- Expired Cleanup: cleanupExpiredTasks cleans up completed/cancelled/dead letter tasks based on TTL to avoid memory bloat.
-
Error Classification:
- Timeout/Cancellation: context.Canceled or context.DeadlineExceeded, mark cancelled, no retry.
- Business Error: If MaxRetries > 0 and RetryCount < MaxRetries, enter retrying, linear backoff (100ms+100ms×count), capped at 500ms; otherwise enter dead_letter.
-
Retry Backoff: Linear growth, maximum 500ms; concurrent scheduler currently does not implement exponential backoff, sequential scheduler implements exponential backoff.
-
Dead Letter Handling: When exceeding maximum retry count or not retryable, mark dead_letter and retain LastError.
-
Sequential Execution: isRunning + pendingTasks queue + taskDoneChan notification, strict serialization.
-
Timed/Periodic: runTaskMonitor scans expired tasks and adds to queue; executeTask supports exponential backoff retry and periodic task rescheduling.
-
Concurrency: SetMaxConcurrentTasks has no effect on sequential scheduler (always 1).
-
Type Mapping: WithConcurrent/WithSequential maps task types to execution modes.
-
Selection Strategy: Type mapping priority; timed/periodic tasks default to sequential; others default to concurrent.
-
Unified Interface: Start/Stop/Close delegate to concurrent and sequential schedulers respectively; Schedule/Cancel/GetTaskStatus/ListTasks merge results.
-
Task Submission: RunTask/RunTaskWithID/RunTaskAt/RunTaskRecurring/RunTypedTask and other convenient methods.
-
Concurrency Setting: SetMaxConcurrentTasks calls underlying concurrent scheduler through type assertion.
-
Mode Configuration: WithMaxConcurrent, WithConcurrent, WithSequential; defaults to hybrid scheduler.
-
Default Retry: buildTaskWithRetry injects default retry count.
-
Concurrent scheduler depends on Task interface and TaskInfoProvider interface to read/write task metadata.
-
Hybrid scheduler composes concurrent and sequential schedulers, implementing mode selection by type/scheduling characteristics.
-
Wrapper composes scheduler and builder, providing unified entry point.
-
Tests cover key behaviors such as concurrency limits, retry, timeout, and memory management.
- Concurrency Slot Control: Concurrent scheduler uses polling + brief sleep to wait for slot release, avoiding busy waiting; suggests adjusting maxConcurrent based on CPU/IO characteristics.
- Task Expired Cleanup: Regularly cleans up completed/cancelled/dead letter tasks to reduce memory usage; controllable through taskTTL and maxTaskCount.
- Retry Backoff: Linear backoff (concurrent) or exponential backoff (sequential), avoiding avalanche effects; suggests choosing appropriate strategy based on business scenarios.
- Resource Monitoring: Suggests recording active task count, execution duration, retry count, dead letter ratio and other metrics at application layer, combined with logs and alerts.
- Load Balancing: Hybrid scheduler diverts by task type, can route high throughput tasks through concurrent and critical sequential tasks through sequential, improving overall throughput and consistency.
[This section is general performance advice, no specific file reference needed]
Troubleshooting Guide
- Task remains in waiting for long time: Check if Schedule time is correct, if runTaskMonitor is running normally.
- Task retries frequently: Confirm MaxRetries and backoff strategy; view LastError and NextRetryAt.
- Timeout/Cancellation: Check if task Handler correctly handles context; reasonably set GetTimeout.
- Dead letter accumulation: Verify dead_letter task count and reasons; consider relaxing retry or introducing dead letter queue.
- Insufficient concurrency: Call SetMaxConcurrentTasks to increase concurrency; observe active task peak and system resources.
- Graceful shutdown: Stop/Close will wait for active tasks to complete; ensure task Handler supports context cancellation.
The concurrent scheduler provides flexible task orchestration capabilities through clear interface abstraction and multiple execution mode implementations. The concurrent scheduler performs excellently in high throughput scenarios, and with reasonable concurrency, retry, and expired cleanup strategies, it can maximize throughput while ensuring stability. The hybrid scheduler further enhances the differentiated processing capability for task types. It is recommended to combine monitoring and alerts in production environments, continuously optimizing concurrency parameters and retry strategies.
[This section is summary content, no specific file reference needed]
Appendix
Practical Usage Examples and Best Practices
-
Quick Start: Use SchedulerWrapper default construction of concurrent scheduler, submit tasks, query status, cancel tasks.
-
Set Concurrency: Set concurrency limit through wrapper's WithMaxConcurrent or direct call to SetMaxConcurrentTasks.
-
Task Type and Mode: Use WithConcurrent/WithSequential to specify execution mode for task types; timed/periodic tasks default to sequential.
-
Retry and Timeout: Configure through TaskBuilder.WithRetry and GetTimeout; note distinction between timeout/cancellation and business errors.
-
Monitoring and Alerting: Record active task count, execution duration, retry count, dead letter ratio; combine with logs and metrics systems.
-
Concurrent Scheduler
- Start(ctx), Stop(), Close(ctx)
- Schedule(task), Cancel(taskID), GetTaskStatus(taskID), ListTasks()
- SetMaxConcurrentTasks(max)
-
Wrapper
- RunTask/RunTaskWithID/RunTaskAt/RunTaskRecurring/RunTypedTask
- SetMaxConcurrentTasks(max), SetTaskTypeMode(taskType, mode)
- GetTaskStatus(taskID), ListTasks(), CancelTask(taskID)
-
Task Build
- WithID/WithType/WithHandler/WithOnComplete/WithOnCancel
- WithSchedule/WithRecurring/WithTimeout/WithRetry/WithPriority
- Build()