背景
客户已有一个用了多年的珠宝ERP,里面有商品信息及准确的库存,他们原有的订货系统已经满足不了业务发展的需要,将要切换成包含PC端及小程序端,功能更先进及完善的珠宝订货系统-优订货。
需求说明
客户希望ERP的商品及库存信息自动与订货系统对接,以减轻运营的工作量并保持数据同步
实现方案
原本ERP只提供了支持分页的数据查询接口,查询接口支持按产品条码、产品名称、产品创建时间三个字段的搜索。 我分析了一下,基于目前ERP这个接口,要实现类实时的数据同步不可能,因为产品数量很多,而ERP服务器的配置及带宽都是不够的,响应速度比较慢,也支撑不了太频繁的查询,于是与ERP方沟通,让他们增加了“最后更新时间”字段,并在查询接口增加按“最后更新时间”字段区间的查询支持,然后订货系统每15分钟发起对此前每15分钟有变化的产品库存的查询,如果查询到结果则同步数据,如果结果为空,说明这个时间区间内没有产品的信息发生过变化,将这个时间区间标记为已更新,等待下一次更新即可。逻辑见下方流程图。
方案优点
逻辑严谨,两个系统同步数据同步常见的网络错误不会导致数据同步出错,因为每一个时间区间的每一页都必须确保同步成功了才会写更新日志,这样当网络出现故障或一方服务器有问题时,恢复正常后,同步任务就能从此前最后一次更新的记录中恢复,继续更新,而且整个同步过程都有可视化输出,非常清晰明了,容易控制,还有详细的日志可以追踪问题。
流程图
数据库设计
字段说明:
核心代码参考(php)
这个是定时任务的主方法,即入口
代码语言:javascript复制 /**
* [定时任务入口]增量同步商品信息,主要是找到更新到哪个时间段的哪一页了
*/
public function additionalSync(){
Tools::realTimeOutputPrepare();
$page = 1;
$lastRecord = ErpSyncModel::open()->order('updateAt', 'desc')->first();
if($lastRecord){
if($lastRecord['status'] === 1){//说明上次的更新已经完成
$startTime = $lastRecord['endTime'] 1;
}else{//如果未完成则继续更新
$startTime = $lastRecord['startTime'];
$page = $lastRecord['page'];
if(time() - $lastRecord['updateAt'] <= 60){
die('距离上次更新未超过60秒,暂不执行更新');
}
}
}else{//没有记录则从头开始
$startTime = strtotime('-1 days');
}
$endTime = $startTime 15*60;//每15分钟为一个周期
if($endTime > time()){
die('当前已经是最新数据了,请等待下一轮更新');
}
$this->pullData($startTime, $endTime, $page);
}
下面这个是访问ERP接口并实现同步数据并更新同步记录
代码语言:javascript复制/**
* 拉取数据
* @param $startTime
* @param $endTime
* @param $page
*/
public function pullData($startTime, $endTime, $page){
while(true){
$startTimeString = urlencode(date('Y-m-d H:i:s', $startTime));
$endTimeString = urlencode(date('Y-m-d H:i:s', $endTime));
Tools::realTimeOutput('正在获取['.date('Y-m-d H:i:s', $startTime).']至['.date('Y-m-d H:i:s', $endTime).']的数据,当前是第['.$page.']页');
$url = 'https://api.xxx.com?pageId='.$page.'&pageSize='.$this->pageSize.'&startCreateDate='.$startTimeString.'&endCreateDate='.$endTimeString;
$res = Tools::curlGet($url, 30);
if($res['success']){//CURL成功
$responseData = Tools::jsonToArray($res['data']);
if(intval($responseData['status']) === 200){//表示接口返回是成功的
$productList = $responseData['data'];//商品列表数据
$getProductCount = count($productList);
Tools::realTimeOutput('已获取到第['.$page.']页数据,共['.$getProductCount.']条记录');
$upsertData = [
'startTime' => $startTime,
'endTime' => $endTime,
'count' => $getProductCount,
'page' => $page,
'status' => 0//表示这个时间段的已经拉取完了
];
if($getProductCount === 0){
$upsertData['status'] = 1;//如果没有记录了就将记录状态改为完成
ErpSyncModel::open()->upsert($upsertData, $upsertData, 'startTime,endTime');
break;
}else{//如果有记录就更新记录
ErpSyncModel::open()->upsert($upsertData, $upsertData, 'startTime,endTime');
try{
foreach($productList as $item){
$this->importOneByOne($item);
}
if($getProductCount >= $this->pageSize){//说明还有下一页
$page;
continue;
}else{//说明没有下一页了
$upsertData['status'] = 1;
ErpSyncModel::open()->upsert($upsertData, $upsertData, 'startTime,endTime');
break;
}
}catch (Exception $e){
$upsertData['data'] = $productList;
$upsertData['message'] = $e->getMessage();
ErpSyncExceptionModel::open()->add($upsertData);
$page;
}catch(PDOException $pe){
$upsertData['data'] = $productList;
$upsertData['message'] = $pe->getMessage();
ErpSyncExceptionModel::open()->add($upsertData);
$page;
}
}
}else{
Tools::realTimeOutput('CURL未获取到数据,5秒后将重试获取第'.$page.'页');
sleep(5);
}
}else{//如果curl获取失败就睡5秒再试
Tools::realTimeOutput('CURL获取失败,5秒后将重试获取第'.$page.'页');
sleep(5);
}
}