轻易云数据集成平台的源平台调度者生命周期详解
轻易云数据集成平台为企业提供了一套完整的数据处理流程,包括数据抽取、清洗、转换及转发等关键步骤。本文旨在深入探讨平台中“数据抽取”阶段的核心组件——源平台调度者的生命周期管理及操作流程,确保开发工程师能够充分理解并有效地实施数据集成方案。
数据抽取是数据集成的首步,通常通过“源平台定时调度请求任务”来实现。该过程涉及以下关键步骤:
1 定时任务调度
- 调度者分配与配置: 轻易云平台预设了20个调度者(dispatcher-0至dispatcher-19),通过Linux crontab计划任务实现每分钟一次的调度命令执行。在数据集成方案的配置页面中,用户可指定“调度号”以分配特定调度者,此举是为了在处理数百至数千个集成方案时优化性能,防止单个调度者的任务阻塞。
- 调度命令执行: 每个调度者将执行
cd ./dispatcher && php dispatcher-[0~19] schedule:run
命令,根据分配的“调度号”识别并启动相应的集成方案。
2 集成方案遍历与调度命令生成
- 异步方案识别: 在遍历集成方案时,首先排除非异步方案,这些通常由事件触发或外部系统激活。
- 调度命令创建: 对于需要调度的方案,将创建一个或多个调度命令,例如
dispatch:datahub [task_id] --source --asyn
,以及对应目标平台的调度命令。这些命令将根据crontab延迟执行。
- 补漏命令生成: 如果集成方案配置了补漏措施,则会创建一个特殊的补漏命令,例如
dispatch:datahub [task_id] --source --asyn --omission
,按照配置的补漏crontab执行。
3 调度命令执行与队列管理
- 调度命令激活: 到达指定时间点后,创建的调度命令将被激活,转化为特定格式并放入AsynDispatcher队列池中排队,等待处理。
- 队列池任务消费: AsynDispatcher队列池将对排队的任务进行处理,包括再次确认任务启动条件、检查调度条件是否满足,以及执行具体的调度工作。
4 适配器加载与任务调度
- 适配器加载: 调度过程中首先加载集成方案配置的源平台适配器。
- 任务调度执行: 适配器初始化后,执行
$adapter->dispatch()
方法,负责完成具体的调度任务。包括插入调度日志、生成任务请求参数、将新任务参数写入任务存储、将任务ID插入源任务队列池进行排队,最终插入调度结束日志。
5. 异步队列池任务消费
异步队列池(AsynDispatcher队列池)的任务消费过程是数据抽取阶段的核心。以下步骤详细描述了从任务验证到执行的流程:
5.1 任务验证与条件检查
- 任务状态确认: 消费任务前,首先确认任务是否处于启动状态,并检查是否满足调度条件。这包括验证是否有前置任务正在执行,以确保调度的顺序性和依赖性。
- 调度条件满足后的处理: 一旦确认任务满足所有调度条件,系统将调用
Instance::handleSourceDispatch
方法来执行具体的调度工作。
5.2 适配器操作与任务执行
- 适配器加载与初始化: 系统将加载指定的源平台适配器,并初始化适配器参数。
- 执行适配器调度方法: 初始化后,适配器的
dispatch()
方法被调用,执行包括插入调度开始日志、生成任务请求参数、将请求参数写入异步源任务存储、将任务ID插入源任务队列池、插入调度结束日志等步骤。
6. 数据抽取与处理
适配器完成任务调度后,接下来的步骤涉及数据的实际抽取和处理:
6.1 数据抽取任务的执行
- 任务详情获取: 根据任务ID,从异步源任务存储中获取任务的详细信息,包括请求参数等。
- 适配器连接与执行: 适配器首先尝试连接到源平台,确保连接成功后,执行具体的数据请求操作,通常通过调用适配器的
invoke()
方法完成。
6.2 数据处理与转换
- 响应数据处理: 任务执行后,适配器处理源平台的响应数据,包括检查响应状态、处理成功或失败的响应、以及对成功响应的数据进行进一步处理。
- 数据加工与转换: 在适配器处理响应数据后,可能会触发脚本加工厂的调度方法,对数据进行加工和转换,以满足集成方案的需求。
7. 任务状态管理与日志记录
任务在执行过程中,其状态管理和日志记录是至关重要的:
7.1 任务状态更新
- 任务完成状态标记: 完成数据处理后,任务将被标记为“已完成”状态,确保系统正确跟踪任务的执行结果。
- 错误处理与重排: 如果任务执行失败,系统将执行错误处理流程,包括异常记录和判断是否需要任务重新排队。
7.2 日志记录
- 调度日志: 从调度开始到结束,系统将记录详细的调度日志,包括任务开始、结束时间,以及任何重要的状态变更信息。
- 适配器日志: 适配器操作过程中产生的日志也被记录,包括连接状态、数据处理结果等,以便于问题诊断和性能优化。
8. 源平台事件关联与触发
最后,轻易云数据集成平台支持源平台事件关联,允许一个集成方案的执行触发其他方案的调度:
8.
1 事件关联配置
9. 异步队列池中任务的具体执行过程
9.1 任务的获取与验证
- 获取任务详情: 对于异步队列池中的每个任务,系统首先通过任务ID从
getAsynSourceJobStorage
获取任务的具体详情,包括执行所需的所有参数。
- 任务状态检查: 系统接着检查任务的当前状态。如果任务已经标记为错误或已完成,系统将不会继续执行该任务,并将其从队列中移除。
9.2 适配器连接与任务执行
- 适配器连接: 确认任务有效后,系统将通过调用适配器的
connect()
方法来检测与源平台的连接是否成功。
- 执行任务: 连接成功后,适配器的
invoke()
方法被调用,传入任务的参数,向源平台发送请求并获取响应。这一步是数据抽取过程的核心,适配器内部的SDK负责具体的数据请求与接收。
9.3 响应处理与数据加工
- 响应数据处理: 接收到源平台的响应后,适配器将根据响应状态进行处理。成功的响应将进入数据加工阶段,而失败的响应则会触发错误处理流程。
- 脚本加工厂调度: 对于成功的响应,可能会触发脚本加工厂的调度方法
ScriptFactory::dispatch()
,以执行任务完成后的数据加工,如数据清洗、格式转换等。
9.4 错误处理与任务重排
- 错误处理: 如果适配器在执行过程中遇到错误,将调用
handleError
方法进行错误处理。该过程包括异常记录和判断是否需要将任务重新排队。
- 任务重排: 在某些情况下,任务执行失败可能是由于临时问题造成的,如网络不稳定。此时,系统可能会决定将任务重新放入队列中,以便稍后重试。
9.5 成功响应的后续处理
- 处理响应数据: 对于成功的响应,适配器将处理响应中的数据,可能包括数据解析、格式化等操作,以确保数据能够被后续流程正确处理。
- 数据存储与队列管理: 处理完响应数据后,系统将相关数据存入数据存储,并根据需要更新任务的状态,如标记为已完成。
9.6 任务完成后的事件触发
- 脚本加工厂的再次调用: 在任务成功完成并处理响应数据之后,系统可能会再次调用脚本加工厂的事件,例如
AfterSourceResponseSuccess
,以执行特定的后处理脚本,进一步加工或验证数据。