轻易云数据集成平台设计了一套精细化的数据存储机制,专为支撑高效的数据集成方案而构建。此机制的核心在于为每个租户分配一个独立的MongoDB数据库,进而在该数据库中为每个集成方案创建专属的数据集合(Collection),确保数据的隔离性与安全性。
在数据集成的过程中,处理和存储数据是至关重要的一环。轻易云数据集成平台在接收、处理到存储数据时,遵循一套精细的原理,确保数据准确、高效地集成。以下是该平台数据存储原理的详细说明:
1、数据接收与预处理
当数据集成任务开始,平台首先从源系统接收数据。以JSON格式为例,原始数据可能如下所示:
{
"code": 0,
"message": "",
"trades": [
{
"rec_id": "1",
"shop_no": "xyp2test",
"tid": "115580935277840368-1",
"logistics_type": "12"
},
{
"rec_id": "2",
"shop_no": "115580935277840368-2",
"tid": "test0002",
"logistics_type": "12"
}
]
}
在此阶段,源平台适配器的角色是至关重要的。它负责解析接收到的数据,特别是提取trades
数组中的数据元素。然后,适配器将这些元素逐一写入轻易云数据集成平台的存储系统(简称ADATA)中。在数据写入过程中,可以通过特定配置(如指定id
、number
等字段),明确数据中哪个键值应作为主键或业务编码,以支持后续的数据处理和分析需求。
2、处理复杂的数据结构
在现实世界的数据集成场景中,数据结构往往具有复杂性。考虑到这一点,轻易云数据集成平台能够处理具有多级数据结构的情况。例如,以下JSON示例中展示了一个包含多级数据结构的复杂情形:
{
"code": 0,
"message": "",
"total_count": 1,
"trades": [{
"trade_id": "168498",
"trade_no": "JY201905180001",
"platform_id": "0",
"warehouse_type": "1",
"src_tids": "115580935277840368-1",
"pay_account": "默认账户",
"trade_status": "30",
"goods_list": [{
"rec_id": "2944911",
"trade_id": "168498",
"spec_id": "1",
"num": "2.0000",
"price": "99.0000",
"paid": "198.0000",
"goods_name": "旺店通手持终端",
"goods_id": "1",
"goods_no": "WDTPDA"
}, {
"rec_id": "2944912",
"trade_id": "168498",
"spec_id": "1",
"num": "2.0000",
"price": "99.0000",
"paid": "198.0000",
"goods_name": "旺店通手持终端",
"goods_id": "1",
"goods_no": "WDTPDA"
}]
}]
}
在默认情况下,平台会遍历trades
数组下的每个对象,将其作为一个独立单元进行存储。然而,为了处理更为复杂的场景,如上例中的goods_list
数组,数据集成平台提供了数据拍扁功能。
通过这一功能,平台能将嵌套的数据结构拍扁,使得原本属于一个单元的goods_list
中的每个商品信息被转化为独立的记录行。这样,即便是复杂的多级数据结构也能被有效地存储和处理,以满足不同的数据分析和应用需求。
3: 数据存储结构详解
每个集成方案涉及到的数据存储对象及其格式,具体如下:
核心数据存储(ADATA)
每个集成方案的核心数据被存储在一个名为{{方案id}}_ADATA
的Collection中。此结构不仅存储了业务数据的核心内容,也记录了数据的状态、处理响应等关键信息,为数据集成过程提供了全面的数据支持。
{
"_id": "65228287911dac2fbd2e551e",
"id": "CRK2023100719403_1",
"number": "CRK2023100719403_1",
"content": {},
"status": 0,
"relation_id": null,
"response": null,
"response_at": 0,
"created_at": 0.0,
"source_job_id": "6522777d007a976b6c36d34b",
"target_job_id": null,
"dispatch_begin": 0,
"dispatch_end": 0,
"dispatch_time": 0,
"result": []
}
_id
: 数据记录的唯一标识符,用于内部追踪。
id
: 从业务数据中摘取的主键ID,用于业务层面的唯一性标识。
number
: 从业务数据中摘取的编号,便于检索和识别。
content
: 具体的业务数据内容,存储实际的业务信息。
status
: 数据状态码,表示数据的当前处理状态。其中,0
代表等待处理(AWAIT),1
代表重复的(目标平台响应后的状态,REPEAT),2
代表已完成(FINISHED),3
代表出现错误(ERROR),5
代表处于队列中(QUEUE),6
代表跳过写入调用(可能是因为不满足某些条件,CONTINUE),7
代表不处理(UN_HANDLE)。
relation_id
: 关联的其他数据记录ID,用于跟踪数据间的关系。
response
: 目标平台的响应内容,用于记录目标平台处理后的反馈。
response_at
: 目标平台响应的时间戳,精确记录响应时间。
created_at
: 数据创建的时间戳,精确记录数据入库的时间。
source_job_id
: 关联的源平台任务的唯一标识符,用于追踪数据来源。
target_job_id
: 关联的目标平台任务的唯一标识符,用于追踪数据去向。
dispatch_begin
: 数据分派处理的开始时间戳,用于性能监控。
dispatch_end
: 数据分派处理的结束时间戳,用于性能监控。
dispatch_time
: 数据分派处理所花费的时间,用于性能分析。
result
: 存储处理结果的数组,用于记录每一步操作的输出结果。
运行日志存储(LOG)
为了记录集成方案的执行细节和过程中出现的各种状态,{{方案id}}_LOG
Collection被设计用来存储运行日志。这些日志详细记录了每一步操作的状态和执行时间,对于问题诊断和性能优化至关重要。
{
"_id": "6516600ef94af3187d5efd2c",
"text": "00_debug_建立连接",
"content": {},
"status": 4,
"created_at": 0.0
}
_id
: 日志记录的唯一标识符,用于追踪日志条目。
text
: 日志文本信息,简要描述了日志记录的主要内容,例如“建立连接”等操作信息。
content
: 日志的具体内容,可能包含了执行该日志记录相关操作的详细数据或额外信息。
status
: 日志级别,用于表示日志的重要性和紧急程度。其可能的值包括:0
(RECORD,记录),1
(NOTICE,注意),2
(SUCCESS,成功),3
(ERROR,错误),4
(FATAL,致命错误),5
(DEBUG,调试信息)。
created_at
: 日志创建的时间戳,用于记录日志生成的确切时间。
源平台任务存储(SJOB)
{{方案id}}_SJOB
Collection专门用于存储与源平台相关的任务信息。它包括API请求参数、执行历史记录等,为源平台数据的获取和处理提供了详细记录。
{
"_id": "65228298dba4b8593d3ea4b2",
"status": 2,
"api": "erp.storage.goodsdocout.v2",
"params": {},
"history": [
{
"handle_at": 0.0,
"response": []
}
],
"created_at": 0.0,
"handle_at": 0.0,
"retries": 0,
"active_begin": 0.0,
"active_end": 0.0,
"active_time": 0.0,
"result": [],
"dispatch_begin": 0.0,
"dispatch_end": 0.0,
"dispatch_time": 0.0,
"SDK": {},
"count": 0
}
_id
: 任务的唯一标识符,用于内部追踪和识别。
status
: 任务的执行状态,如2
表示任务已成功完成。
api
: 指定的源平台API接口,用于标识任务调用的具体API。
params
: API请求的参数,包含了执行此任务所需的所有信息。
history
: 任务执行的历史记录,每一项都记录了一次执行的时间(handle_at
)和相应的响应(response
)。
created_at
: 任务创建的时间戳,记录任务被创建的具体时间。
handle_at
: 任务最近一次处理的时间戳,记录最近一次任务执行的时间。
retries
: 任务重试的次数,记录了任务因失败而重新尝试执行的次数。
active_begin
和active_end
: 分别记录任务开始执行和结束执行的时间戳,用于计算任务的执行时长。
active_time
: 任务活跃的时间长度,即从开始执行到执行完成所花费的时间。
result
: 任务执行的结果,记录了执行任务后的输出数据。
dispatch_begin
和dispatch_end
: 分别记录任务分派开始和结束的时间戳,用于性能监控。
dispatch_time
: 任务分派处理所花费的时间,用于分析任务分派的效率。
SDK
: 记录了执行任务时使用的SDK的相关信息,便于追踪和调试。
count
: 任务执行的次数,用于追踪任务被触发的总次数。
目标平台任务存储(TJOB)
最后,我们来看{{方案id}}_TJOB
Collection的JSON结构及其详细的中文说明:
{
"_id": "657bd2c818c839663d80374a",
"status": 2,
"api": "batchSave",
"params": {},
"ids": [
"657bd162d37a346dfa39219e"
],
"data_range": null,
"response": {
"success": [
{
"Id": 241855,
"Number": "QTRK044001",
"DIndex": 0
}
],
"error": []
},
"history": [
],
"created_at": 0.0,
"handle_at": 0.0,
"retries": 0,
"active_begin": 0,
"active_end": 0,
"active_time": 0,
"result": [],
"dispatch_begin": 0.0,
"dispatch_end": 0.0,
"dispatch_time": 0.0,
"SDK": {}
}
_id
: 目标平台任务的唯一标识符,用于追踪和识别任务。
status
: 任务的执行状态,例如2
表示任务已成功完成。
api
: 指定的目标平台API接口,标识任务调用的具体API。
params
: API请求参数,包含执行此任务所需的所有信息。
ids
: 关联的是ADATA中哪些数据ID主键,标识此任务处理的具体数据项。
data_range
: 关联的ADATA中的数据范围,如果启用数据合并,则由该值描述合并的范围。
response
: 目标平台写入后的响应内容,记录了成功和错误的详细信息。
history
: 任务执行的历史记录,每一项都记录了执行时间(handle_at
)和相应的响应(response
)。
created_at
: 任务创建的时间戳,记录任务被创建的具体时间。
handle_at
: 任务最近一次处理的时间戳,记录最近一次任务执行的时间。
retries
: 任务重试的次数,记录任务因失败而重新尝试执行的次数。
active_begin
和active_end
: 分别记录任务开始执行和结束执行的时间戳,用于计算任务执行的时长。
active_time
: 任务活跃的时间长度,即从开始执行到执行完成所花费的时间。
result
: 任务执行的结果,记录了执行任务后的输出数据。
dispatch_begin
和dispatch_end
: 分别记录任务分派开始和结束的时间戳,用于性能监控。
dispatch_time
: 任务分派处理所花费的时间,用于分析任务分派的效率。
SDK
: 记录了执行任务时使用的SDK的相关信息,便于追踪和调试。
此结构允许开发者全面监控目标平台任务的执行情况,确保数据正确地从源平台迁移到目标平台,并在过程中实时追踪任务状态、处理响应及性能指标。