约 5452 字大约 18 分钟
mooncake-gate
主要功能
客户端提交任务,根据任务的过期时间,判断是否需要进行持久化,然后分发到不同的topic。
主要类
- OpenController 类
submit
方法:接收外部任务请求,生成UUID(如果未提供),并调用submitInner
方法提交任务。submitInner
方法:将任务分发到相应的处理器(如Kafka),并返回提交结果。uuid
方法:生成并返回一个新的UUID。markTaskDone
方法:标记任务为完成状态。
- Dispatcher 接口
- 定义了
dispatch
方法,用于将任务发送到相应的处理器或持久化存储。
- 定义了
- DispatcherImpl 类
- 实现了
Dispatcher
接口,具体实现了任务分发逻辑。 dispatch
方法:根据任务的过期时间,将任务发送到不同的Kafka topic。send
方法:将任务发送到指定的Kafka topic。needStore
方法:判断任务是否需要持久化存储。
- 实现了
提供的接口
POST /submit
:提交延时任务。POST /inner/submit
:内部提交任务接口。GET /uuid
:生成UUID。POST /api/task/done
:标记任务完成
时序图
sequenceDiagram
participant 客户端
participant OpenController
participant UidGenerator
participant Dispatcher
participant Kafka
客户端->>OpenController: POST /submit
OpenController->>UidGenerator: fetchRandomId (如果UUID为空)
UidGenerator-->>OpenController: 返回UUID
OpenController->>OpenController: 创建任务 (createTask)
OpenController->>Dispatcher: dispatch(task)
Dispatcher->>Kafka: 发送到持久化Topic (如果需要)
Dispatcher->>Kafka: 发送到触发持久化Topic
Dispatcher->>Kafka: 发送到触发Topic
Dispatcher-->>OpenController: 返回分发结果
OpenController-->>客户端: ResultDTO<SubmitResultVo>
客户端->>OpenController: GET /uuid
OpenController->>UidGenerator: fetchRandomId
UidGenerator-->>OpenController: 返回UUID
OpenController-->>客户端: ResultDTO<UuidResultVo>
客户端->>OpenController: POST /api/task/done
OpenController->>ConsumerClient: done(appId, uuid)
ConsumerClient-->>OpenController: 返回结果
OpenController-->>客户端: ResultDTO<?>
topic
trigger-topic: course_mooncake_trigger
trigger-persistent-topic: course_mooncake_persistent
triggered-persistent-topic: course_mooncake_triggered_persistent
- persistentTask:用于持久化任务的Topic。
- triggeredPersistentTask:用于触发任务的Topic。
发送到不同Topic的逻辑
- 持久化任务:
- 当有新的任务需要持久化时,任务会被发送到
persistentTask
Topic。 - 这些任务通常是需要在未来某个时间点执行的任务。
- 当有新的任务需要持久化时,任务会被发送到
- 触发任务:
- 当持久化任务的执行时间到达时,任务会被发送到
triggeredPersistentTask
Topic。 - 这些任务是已经到达执行时间,需要立即处理的任务。
- 当持久化任务的执行时间到达时,任务会被发送到
不同Topic的处理逻辑
persistentTask Topic的处理逻辑:
- 消费任务:
PersistentConsumer
会从persistentTask
Topic中消费消息。 - 保存到数据库:消费到的任务会被保存到数据库中,记录任务的详细信息和状态。
- 提交偏移量:任务保存成功后,提交Kafka的消费偏移量,确保消息不会被重复消费。
- 消费任务:
triggeredPersistentTask Topic的处理逻辑:
消费任务:
PersistentConsumer
会从triggeredPersistentTask
Topic中消费消息。保存到数据库:消费到的任务会被保存到数据库中,更新任务的状态为“待执行”。
提交偏移量:任务保存成功后,提交Kafka的消费偏移量,确保消息不会被重复消费
mooncake-producer
- Timer 类
Timer
类负责管理时间轮的拨动和任务的添加。start
方法:启动定时任务,每隔一定时间拨动时间轮。add
方法:将任务添加到时间轮中。
- TimerService 类
TimerService
类负责管理任务的触发和消费。afterPropertiesSet
方法:初始化服务,启动检查点服务和触发器,并启动Kafka消费者。doTrigger
方法:从队列中获取任务并发送到Kafka的执行器Topic。doConsume
方法:从Kafka的触发Topic中消费任务,并将任务添加到检查点服务中。doConsumerInner
方法:具体的消费逻辑,包括分区的分配和撤销处理。createLoad
方法:从Kafka记录中创建任务负载对象。
- TimeWheel 类
TimeWheel
类实现了时间轮的核心逻辑,包括任务的添加和时间的推进。add
方法:将任务添加到时间轮中。pointTo
方法:推进时间轮到指定时间点,并触发过期任务。
- Load 类
Load
类表示一个任务负载,包含任务的相关信息。getExpireTime
方法:获取任务的过期时间。
- Expired 接口
Expired
接口定义了获取过期时间的方法。
- CheckPointService 类
CheckPointService
类负责管理检查点,确保任务的正确处理和提交。start
方法:启动检查点服务。register
方法:注册检查点。deregister
方法:注销检查点。add
方法:添加任务到检查点。remove
方法:从检查点移除任务。doConsume
方法:处理检查点的添加和移除操作。
- CheckPoint 类
CheckPoint
类表示一个检查点,管理任务的提交和标记。add
方法:添加任务到检查点。tryMark
方法:尝试标记任务为已处理。tryUpdateCommitOffset
方法:更新提交偏移量。
- Bucket 类
Bucket
类表示一个任务桶,管理任务的计数和最大偏移量。merge
方法:合并任务到桶中。decrement
方法:减少桶中的任务计数。
提供的接口
TimerService
类提供了任务触发和消费的核心逻辑,通过Kafka进行任务的分发和消费。CheckPointService
类提供了检查点的管理接口,确保任务的正确处理和提交。
sequenceDiagram
participant 客户端
participant TimerService
participant Timer
participant TimeWheel
participant CheckPointService
participant Kafka
客户端->>TimerService: 提交任务
TimerService->>Timer: add(task)
Timer->>TimeWheel: add(task)
TimerService->>TimerService: afterPropertiesSet()
TimerService->>CheckPointService: start()
TimerService->>Kafka: 启动消费者 (doConsume)
TimerService->>Kafka: 启动触发器 (doTrigger)
loop 每隔一定时间
Timer->>TimeWheel: pointTo(currentTime)
end
loop 消费任务
Kafka->>TimerService: 消费任务 (doConsume)
TimerService->>CheckPointService: add(load)
end
loop 触发任务
TimerService->>Queue: take()
Queue-->>TimerService: load
TimerService->>Kafka: 发送任务到执行器 (doTrigger)
Kafka-->>TimerService: 发送成功
TimerService->>CheckPointService: remove(load)
end
系统时序图解释
该系统时序图展示了延时任务调度系统的主要流程,包括任务的提交、时间轮的拨动、任务的消费和触发。以下是主要流程的详细描述:
- 任务提交
- 客户端向
TimerService
提交任务。 TimerService
调用Timer
的add
方法,将任务添加到时间轮中。Timer
调用TimeWheel
的add
方法,将任务添加到具体的时间槽中。
- 客户端向
- 系统初始化
TimerService
调用afterPropertiesSet
方法,初始化服务。TimerService
启动CheckPointService
,调用其start
方法。TimerService
启动 Kafka 消费者,调用doConsume
方法。TimerService
启动触发器,调用doTrigger
方法。
- 时间轮拨动
Timer
每隔一定时间调用TimeWheel
的pointTo
方法,推进时间轮到当前时间点。TimeWheel
检查并触发过期任务。
- 任务消费
- Kafka 消费者从 Kafka 的触发 Topic 中消费任务,调用
TimerService
的doConsume
方法。 TimerService
调用CheckPointService
的add
方法,将任务添加到检查点服务中。
- Kafka 消费者从 Kafka 的触发 Topic 中消费任务,调用
- 任务触发
TimerService
从队列中获取任务,调用doTrigger
方法。TimerService
将任务发送到 Kafka 的执行器 Topic。- Kafka 确认任务发送成功。
TimerService
调用CheckPointService
的remove
方法,从检查点服务中移除任务。
主要流程总结
- 任务提交:客户端提交任务,任务被添加到时间轮中。
- 系统初始化:初始化服务,启动检查点服务、Kafka 消费者和触发器。
- 时间轮拨动:定时拨动时间轮,检查并触发过期任务。
- 任务消费:Kafka 消费者消费任务,将任务添加到检查点服务中。
- 任务触发:从队列中获取任务,发送到执行器,确认发送成功后从检查点服务中移除任务。
- Timer 类
Timer
类负责管理时间轮的拨动和任务的添加。start
方法:启动定时任务,每隔一定时间拨动时间轮。add
方法:将任务添加到时间轮中。
- TimerService 类
TimerService
类负责管理任务的触发和消费。afterPropertiesSet
方法:初始化服务,启动检查点服务和触发器,并启动Kafka消费者。doTrigger
方法:从队列中获取任务并发送到Kafka的执行器Topic。doConsume
方法:从Kafka的触发Topic中消费任务,并将任务添加到检查点服务中。doConsumerInner
方法:具体的消费逻辑,包括分区的分配和撤销处理。createLoad
方法:从Kafka记录中创建任务负载对象。
- TimeWheel 类
TimeWheel
类实现了时间轮的核心逻辑,包括任务的添加和时间的推进。add
方法:将任务添加到时间轮中。pointTo
方法:推进时间轮到指定时间点,并触发过期任务。
- Load 类
Load
类表示一个任务负载,包含任务的相关信息。getExpireTime
方法:获取任务的过期时间。
- Expired 接口
Expired
接口定义了获取过期时间的方法。
- CheckPointService 类
CheckPointService
类负责管理检查点,确保任务的正确处理和提交。start
方法:启动检查点服务。register
方法:注册检查点。deregister
方法:注销检查点。add
方法:添加任务到检查点。remove
方法:从检查点移除任务。doConsume
方法:处理检查点的添加和移除操作。
- CheckPoint 类
CheckPoint
类表示一个检查点,管理任务的提交和标记。add
方法:添加任务到检查点。tryMark
方法:尝试标记任务为已处理。tryUpdateCommitOffset
方法:更新提交偏移量。
- Bucket 类
Bucket
类表示一个任务桶,管理任务的计数和最大偏移量。merge
方法:合并任务到桶中。decrement
方法:减少桶中的任务计数。
mooncake-consumer
主要实现了延时任务的消费、执行和管理。它包括了任务的HTTP请求处理、任务状态管理、任务重试机制、Redis缓存管理、以及与其他服务的通信等功能。
代码分析
- TomcatAccessLogCustomizer 类
- 该类实现了
WebServerFactoryCustomizer<TomcatServletWebServerFactory>
接口,用于自定义 Tomcat 的访问日志配置。 customize
方法:获取 Tomcat 的AccessLogValve
并设置日志保留天数为1天。
- 该类实现了
- Stage 枚举
- 该枚举定义了任务的不同阶段,包括
TODO
、INIT
和DONE
。 - 提供了获取阶段值的方法
getValue
和根据值获取阶段的方法of
。
- 该枚举定义了任务的不同阶段,包括
- Props 类
- 该类用于读取配置文件中的 Redis 配置信息。
- 包含一个内部静态类
Redis
,用于存储 Redis 地址列表。
- InternalController 类
- 该类是一个 Spring MVC 控制器,提供了两个接口用于执行任务和标记任务完成。
executeTask
方法:接收任务并调用CakeExecutor
执行任务。interceptTask
方法:接收任务标识并调用CakeExecutor
标记任务完成。
- GateClient 接口
- 该接口使用 FeignClient 与
mooncake-gate
服务通信,提供了提交任务和获取 UUID 的接口。 - 包含一个内部静态类
Fallback
,用于处理服务调用失败的情况。
- 该接口使用 FeignClient 与
- CoreConfig 类
- 该类是一个 Spring 配置类,提供了 OkHttpClient 和 JedisCluster 的 Bean 配置。
okHttpClient
方法:配置 OkHttpClient。getJedisCluster
方法:配置 JedisCluster。
- TaskHandlers 类
- 该类用于查找并创建任务处理器。
find
方法:根据任务创建HttpTaskHandler
。
- TaskHandler 接口
- 该接口定义了任务处理器的基本方法,包括异步执行和同步执行。
- HttpTaskHandler 类
- 该类实现了
TaskHandler
接口,负责处理 HTTP 任务。 executeAsync
方法:异步执行任务。execute
方法:同步执行任务。httpRequest
方法:创建 HTTP 请求。handleHttpTaskResponse
方法:处理 HTTP 响应。
- 该类实现了
- CakeExecutor 类
- 该类实现了
TaskManager
接口,负责任务的执行和管理。 mooncakeExecutor
方法:Kafka 消费者,接收并处理任务。tryRunTask
方法:尝试执行任务。handleAfterExecution
方法:处理任务执行后的逻辑。executeTask
方法:执行任务。markTaskDone
方法:标记任务完成。
- 该类实现了
- OkHttpService 类
- 该类提供了 OkHttpClient 的管理和配置。
client
方法:根据超时时间获取 OkHttpClient。
提供的接口
InternalController
提供了两个接口:
/execute
:执行任务。/done
:标记任务完成。
GateClient
提供了两个接口:
/inner/submit
:提交任务。/uuid
:获取 UUID。
系统时序图
sequenceDiagram
participant 客户端
participant InternalController
participant CakeExecutor
participant TaskHandlers
participant HttpTaskHandler
participant GateClient
participant Redis
participant Kafka
客户端->>InternalController: 提交任务 (executeTask)
InternalController->>CakeExecutor: executeTask(task)
CakeExecutor->>Redis: tryRunTask(task)
Redis-->>CakeExecutor: 返回结果
CakeExecutor->>TaskHandlers: find(task)
TaskHandlers->>HttpTaskHandler: 创建 HttpTaskHandler
HttpTaskHandler->>CakeExecutor: executeAsync(commitCallback)
HttpTaskHandler->>HttpTaskHandler: 发送 HTTP 请求
HttpTaskHandler->>CakeExecutor: handleAfterExecution(success, task)
CakeExecutor->>Redis: markTaskDone(appId, uuid)
客户端->>InternalController: 标记任务完成 (interceptTask)
InternalController->>CakeExecutor: markTaskDone(appId, uuid)
CakeExecutor->>Redis: markTaskDone(appId, uuid)
Kafka->>CakeExecutor: 消费任务 (mooncakeExecutor)
CakeExecutor->>Redis: tryRunTask(task)
Redis-->>CakeExecutor: 返回结果
CakeExecutor->>TaskHandlers: find(task)
TaskHandlers->>HttpTaskHandler: 创建 HttpTaskHandler
HttpTaskHandler->>CakeExecutor: executeAsync(commitCallback)
HttpTaskHandler->>HttpTaskHandler: 发送 HTTP 请求
HttpTaskHandler->>CakeExecutor: handleAfterExecution(success, task)
CakeExecutor->>Redis: markTaskDone(appId, uuid)
loop 重试任务
CakeExecutor->>GateClient: 提交任务 (retryNextTask)
GateClient-->>CakeExecutor: 返回结果
end
主要流程总结
- 任务提交:客户端提交任务,任务被添加到时间轮中。
- 系统初始化:初始化服务,启动检查点服务、Kafka 消费者和触发器。
- 时间轮拨动:定时拨动时间轮,检查并触发过期任务。
- 任务消费:Kafka 消费者消费任务,将任务添加到检查点服务中。
- 任务触发:从队列中获取任务,发送到执行器,确认发送成功后从检查点服务中移除任务。
- 任务执行:通过 HTTP 请求执行任务,处理任务执行后的逻辑,包括重试机制和任务状态管理。
mooncake-watcher
主要实现了延时任务的持久化、分片管理、任务生命周期管理以及与其他服务的通信。它包括了任务的持久化、任务的分片管理、任务的触发和执行、任务的状态管理、以及与其他服务的通信等功能。
代码分析
- InternalController 类
- 该类是一个 Spring MVC 控制器,提供了三个接口用于分片管理。
flush
方法:刷新所有分片。pop
方法:获取指定分片的任务。createShard
方法:创建或更新分片总数。
- PersistentConsumer 类
- 该类实现了
InitializingBean
接口,用于消费 Kafka 中的持久化任务并将其保存到数据库中。 afterPropertiesSet
方法:启动持久化任务和触发任务的消费线程。persistentTask
方法:消费持久化任务并保存到数据库中。triggeredPersistentTask
方法:消费触发任务并保存到数据库中。
- 该类实现了
- ShardManager 接口
- 该接口定义了分片管理的基本方法,包括获取分片配置、获取分片、填充分片和创建或更新分片总数。
- ShardManagerImpl 类
- 该类实现了
ShardManager
接口,负责分片的管理。 afterPropertiesSet
方法:初始化分片配置。getShard
方法:获取指定分片的任务。fillShards
方法:填充分片。createOrUpdateShardTotal
方法:创建或更新分片总数。
- 该类实现了
- TaskLifeCycleManager 接口
- 该接口定义了任务生命周期管理的基本方法,包括刷新任务到触发器、刷新任务到消费者和清除过期任务。
- TaskLifeCycleManagerImpl 类
- 该类实现了
TaskLifeCycleManager
接口,负责任务的生命周期管理。 flushToTrigger
方法:定期将持久化任务发送到触发器。flushToConsumer
方法:定期将持久化任务发送到消费者。flushToBin
方法:定期清除过期任务。
- 该类实现了
- TaskRepository 接口
- 该接口继承了
JpaRepository
,用于操作数据库中的任务实体。 - 提供了根据状态和过期时间查询任务的方法。
- 该接口继承了
- TaskRepositoryHelper 类
- 该类提供了批量更新任务状态的方法。
- Shard 类
- 该类表示一个分片,包含分片总数和当前分片。
- 提供了分片的有效性检查和分片命中判断的方法。
- ShardConfigValue 类
- 该类表示分片配置的值,包含分片总数和分片键。
- TaskEntity 类
- 该类表示任务实体,包含任务的基本信息和状态。
- 提供了初始化任务和触发任务的方法。
- TaskId 类
- 该类表示任务的唯一标识,包含应用ID和任务ID。
- ConsumerClient 接口
- 该接口使用 FeignClient 与
mooncake-consumer
服务通信,提供了执行任务的接口。 - 包含一个内部静态类
Fallback
,用于处理服务调用失败的情况。
- 该接口使用 FeignClient 与
提供的接口
InternalController
提供了三个接口:
/shard/fill
:刷新所有分片。/shard/pop
:获取指定分片的任务。/shard/create
:创建或更新分片总数。
ConsumerClient
提供了一个接口:
/execute
:执行任务。
系统时序图
sequenceDiagram
participant 客户端
participant InternalController
participant ShardManager
participant PersistentConsumer
participant TaskRepository
participant Kafka
participant TaskLifeCycleManager
participant ConsumerClient
客户端->>InternalController: 刷新分片 (flush)
InternalController->>ShardManager: fillShards(id)
ShardManager->>ShardManager: 填充分片
客户端->>InternalController: 获取分片任务 (pop)
InternalController->>ShardManager: getShard(id)
ShardManager->>InternalController: 返回分片任务
客户端->>InternalController: 创建分片 (createShard)
InternalController->>ShardManager: createOrUpdateShardTotal(k, total)
ShardManager->>ShardManager: 创建或更新分片总数
Kafka->>PersistentConsumer: 消费持久化任务 (persistentTask)
PersistentConsumer->>TaskRepository: 保存任务到数据库
TaskRepository-->>PersistentConsumer: 返回结果
PersistentConsumer->>Kafka: 提交消费偏移量
Kafka->>PersistentConsumer: 消费触发任务 (triggeredPersistentTask)
PersistentConsumer->>TaskRepository: 保存任务到数据库
TaskRepository-->>PersistentConsumer: 返回结果
PersistentConsumer->>Kafka: 提交消费偏移量
TaskLifeCycleManager->>TaskRepository: 查询需要触发的任务
TaskRepository-->>TaskLifeCycleManager: 返回任务列表
TaskLifeCycleManager->>Kafka: 发送任务到触发器 (flushToTrigger)
Kafka-->>TaskLifeCycleManager: 返回结果
TaskLifeCycleManager->>TaskRepository: 查询需要执行的任务
TaskRepository-->>TaskLifeCycleManager: 返回任务列表
TaskLifeCycleManager->>ConsumerClient: 执行任务 (flushToConsumer)
ConsumerClient-->>TaskLifeCycleManager: 返回结果
TaskLifeCycleManager->>TaskRepository: 查询过期任务
TaskRepository-->>TaskLifeCycleManager: 返回任务列表
TaskLifeCycleManager->>TaskRepository: 删除过期任务 (flushToBin)
TaskRepository-->>TaskLifeCycleManager: 返回结果
主要流程总结
- 分片管理:通过
ShardManager
实现分片的创建、更新和填充。 - 任务持久化:通过
PersistentConsumer
消费 Kafka 中的持久化任务并保存到数据库中。 - 任务生命周期管理:通过
TaskLifeCycleManager
定期刷新任务到触发器和消费者,并清除过期任务。 - 任务执行:通过
ConsumerClient
与mooncake-consumer
服务通信,执行任务。
Mooncake 模块分析及主要流程
主要模块
- InternalController:提供分片管理的接口。
- PersistentConsumer:消费 Kafka 中的持久化任务并保存到数据库。
- ShardManager:管理分片,包括获取、填充和创建/更新分片。
- TaskLifeCycleManager:管理任务的生命周期,包括刷新任务到触发器、消费者和清除过期任务。
- TaskRepository:操作数据库中的任务实体。
- ConsumerClient:与
mooncake-consumer
服务通信,执行任务。
主要流程
- 分片管理:
- 创建或更新分片总数。
- 获取指定分片的任务。
- 刷新所有分片。
- 任务持久化:
- 消费 Kafka 中的持久化任务并保存到数据库。
- 消费 Kafka 中的触发任务并保存到数据库。
- 任务生命周期管理:
- 定期将持久化任务发送到触发器。
- 定期将持久化任务发送到消费者。
- 定期清除过期任务。
- 任务执行:
- 通过
ConsumerClient
与mooncake-consumer
服务通信,执行任务。
- 通过
流程图
flowchart TD
A[客户端] -->|刷新分片| B[InternalController]
B -->|fillShards| C[ShardManager]
C -->|填充分片| C
A -->|获取分片任务| B
B -->|getShard| C
C -->|返回分片任务| B
B -->|返回分片任务| A
A -->|创建分片| B
B -->|createOrUpdateShardTotal| C
C -->|创建或更新分片总数| C
D[Kafka] -->|消费持久化任务| E[PersistentConsumer]
E -->|保存任务到数据库| F[TaskRepository]
F -->|返回结果| E
E -->|提交消费偏移量| D
D -->|消费触发任务| E
E -->|保存任务到数据库| F
F -->|返回结果| E
E -->|提交消费偏移量| D
G[TaskLifeCycleManager] -->|查询需要触发的任务| F
F -->|返回任务列表| G
G -->|发送任务到触发器| D
D -->|返回结果| G
G -->|查询需要执行的任务| F
F -->|返回任务列表| G
G -->|执行任务| H[ConsumerClient]
H -->|返回结果| G
G -->|查询过期任务| F
F -->|返回任务列表| G
G -->|删除过期任务| F
F -->|返回结果| G
时序图
sequenceDiagram
participant 客户端
participant InternalController
participant ShardManager
participant PersistentConsumer
participant TaskRepository
participant Kafka
participant TaskLifeCycleManager
participant ConsumerClient
客户端->>InternalController: 刷新分片 (flush)
InternalController->>ShardManager: fillShards(id)
ShardManager->>ShardManager: 填充分片
客户端->>InternalController: 获取分片任务 (pop)
InternalController->>ShardManager: getShard(id)
ShardManager->>InternalController: 返回分片任务
客户端->>InternalController: 创建分片 (createShard)
InternalController->>ShardManager: createOrUpdateShardTotal(k, total)
ShardManager->>ShardManager: 创建或更新分片总数
Kafka->>PersistentConsumer: 消费持久化任务 (persistentTask)
PersistentConsumer->>TaskRepository: 保存任务到数据库
TaskRepository-->>PersistentConsumer: 返回结果
PersistentConsumer->>Kafka: 提交消费偏移量
Kafka->>PersistentConsumer: 消费触发任务 (triggeredPersistentTask)
PersistentConsumer->>TaskRepository: 保存任务到数据库
TaskRepository-->>PersistentConsumer: 返回结果
PersistentConsumer->>Kafka: 提交消费偏移量
TaskLifeCycleManager->>TaskRepository: 查询需要触发的任务
TaskRepository-->>TaskLifeCycleManager: 返回任务列表
TaskLifeCycleManager->>Kafka: 发送任务到触发器 (flushToTrigger)
Kafka-->>TaskLifeCycleManager: 返回结果
TaskLifeCycleManager->>TaskRepository: 查询需要执行的任务
TaskRepository-->>TaskLifeCycleManager: 返回任务列表
TaskLifeCycleManager->>ConsumerClient: 执行任务 (flushToConsumer)
ConsumerClient-->>TaskLifeCycleManager: 返回结果
TaskLifeCycleManager->>TaskRepository: 查询过期任务
TaskRepository-->>TaskLifeCycleManager: 返回任务列表
TaskLifeCycleManager->>TaskRepository: 删除过期任务 (flushToBin)
TaskRepository-->>TaskLifeCycleManager: 返回结果
主要实现细节
- InternalController:
- 提供刷新分片、获取分片任务和创建分片的接口。
- 调用
ShardManager
的相应方法进行分片管理。
- PersistentConsumer:
- 实现
InitializingBean
接口,启动持久化任务和触发任务的消费线程。 - 消费 Kafka 中的持久化任务和触发任务,并保存到数据库中。
- 实现
- ShardManager:
- 管理分片的创建、更新和填充。
- 提供获取指定分片任务的方法。
- TaskLifeCycleManager:
- 定期刷新任务到触发器和消费者。
- 定期清除过期任务。
- TaskRepository:
- 提供操作数据库中任务实体的方法。
- 提供根据状态和过期时间查询任务的方法。
- ConsumerClient:
- 使用 FeignClient 与
mooncake-consumer
服务通信,执行任务。
- 使用 FeignClient 与
通过上述模块的协作,系统实现了延时任务的持久化、分片管理、任务生命周期管理和任务执行,确保任务在指定时间点被正确触发和执行,并提供了任务状态管理和分片管理机制。
你好,今晚19点04分25秒的时候,有一笔支付请求响应超时了,能帮忙看下是什么原因吗。out_trade_no是:317310638640000046083
部分请求参数如下:
<mch_id>1641714449</mch_id> <device_info>WEB</device_info><out_trade_no>317310638640000046083</out_trade_no> <time_start>20241108190424</time_start>