本文介绍利用requests和pandas将API接口返回的数据分别导入Oracle和MySQL数据库以便使用。
导入Oracle代码如下:
代码语言:python代码运行次数:0复制import requests
import pandas as pd
import sqlalchemy as SAC
from sqlalchemy import create_engine
from datetime import datetime,timedelta
import retrying
import time
import cx_Oracle
import sqlalchemy.dialects.oracle as OType
import logging
import logging.handlers
conStr=cx_Oracle.makedsn('XXX','XXX',service_name='XXX')
engine = create_engine('oracle cx_oracle://XXX:XXX@' conStr)
logger=logging.Logger(__name__)
handler=logging.handlers.TimedRotatingFileHandler('log',when='d')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
@retrying.retry()
def retryGet(url):
time.sleep(10)
res=requests.get(url)
data=res.json()
print(type(data))
if(isinstance(data,int)):
print(res.content)
raise Exception()
return data
def getPS():
GetPSListUrl='XXX'
psList=retryGet(GetPSListUrl)
df_PS=pd.DataFrame(psList)
df_PS.drop(columns=["token_id","error_web"],inplace=True)
return df_PS
def savePS_oracle(df_table):
dtype={"PS_CODE":OType.VARCHAR2(80),"PS_NAME":OType.VARCHAR2(400),"REGION_CODE":OType.VARCHAR2(40),
"REGION_NAME":OType.VARCHAR2(40),"FULLREGION_NAME":OType.VARCHAR2(80),"ADDRESS":OType.VARCHAR2(300),
"LONGITUDE":OType.NUMBER(9,6),"LATITUDE":OType.NUMBER(9,6),"MANUFACTURE_DATE":OType.DATE(),
"ENVIRONMENT_PRINCIPAL":OType.VARCHAR2(80),"CREDIT_CODE":OType.VARCHAR2(100),"LINK_INFO":OType.VARCHAR2(80),
"BOILER_NUM":OType.NUMBER(1),"BURN_ABILITY":OType.VARCHAR2(300),"ELECTRIC_POWER":OType.VARCHAR2(80),
"DATA_STATUS":OType.NUMBER(1),"AFGROUP_CODE":OType.VARCHAR2(80),"CREATE_TIME":OType.DATE(),
"UPDATE_TIME":OType.DATE(),"CORPORATION_NAME":OType.VARCHAR2(20)
}
df_table.rename(columns=str.upper,inplace=True)
dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
for col in dateCols:
df_table[col]=pd.to_datetime(df_table[col])
df_table.to_sql('T_GDI_INFO_PS_BASE',engine,index=False,dtype=dtype,if_exists='append')
def getBurn():
GetBurnListUrl='XXX'
df_PS=pd.io.sql.read_sql_table('T_GDI_INFO_PS_BASE'.lower(),con=engine)
dataList0=[]
for i,ps in df_PS.iterrows():
url=GetBurnListUrl.format(pscode=ps.ps_code)
dataList=retryGet(url)
dataList0 =dataList
df_Burn=pd.DataFrame(burnList0)
df_Burn.replace({'':None},inplace=True)
return df_Burn
def saveBurn_oracle(df_table):
dtype={"PS_CODE":OType.VARCHAR2(80),"MP_CODE":OType.VARCHAR2(80),"MP_NAME":OType.VARCHAR2(80),
"BOILER_NAME":OType.VARCHAR2(400),"BOILER_TYPE":OType.VARCHAR2(80),"CREATER_TIME":OType.DATE(),
"BURN_ABILITY":OType.VARCHAR2(80),"DATA_STATUS":OType.NUMBER(1),"OPEN_TIME":OType.DATE(),
"CREATE_TIME":OType.DATE(),"UPDATE_TIME":OType.DATE(),
"STOPOPEN_TIME":OType.VARCHAR2(80),"STOPOPEN_STATUS":OType.VARCHAR2(80),"STOPOPEN_REASON":OType.VARCHAR2(80),
}
df_table.rename(columns=str.upper,inplace=True)
dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
for col in dateCols:
df_table[col]=pd.to_datetime(df_table[col])
df_table.to_sql('T_GDI_INFO_MP_BURN',engine,index=False,dtype=dtype,if_exists='append')
def getMonitor(date):
GetMonitorDataListUrl='XXX'
df_Burn=pd.io.sql.read_sql_table('T_GDI_INFO_MP_BURN'.lower(),con=engine)
dataList0=[]
for i,ps in df_Burn.iterrows():
url=GetMonitorDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))
dataList=retryGet(url)
print(dataList)
dataList0 =dataList
df_Monitor=pd.DataFrame(dataList0)
return df_Monitor
def saveMonitor_oracle(df_table):
dtype={"PS_CODE":OType.VARCHAR2(80),"MP_CODE":OType.VARCHAR2(80),"MONITOR_TIME":OType.DATE(),
"POLLUTANT_CODE":OType.VARCHAR2(20),"POLLUTANT_NAME":OType.VARCHAR2(30),"DAY":OType.VARCHAR2(20),
"STANDARD_VALUE":OType.VARCHAR2(20),"STRENGTH":OType.NUMBER(20,6),
"STATUS":OType.NUMBER(1),"DATA_STATUS":OType.NUMBER(1),"CREATE_TIME":OType.DATE(),"UPDATE_TIME":OType.DATE(),
"REMARK":OType.VARCHAR2(300),
}
df_table.rename(columns=str.upper,inplace=True)
dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
for col in dateCols:
df_table[col]=pd.to_datetime(df_table[col])
df_table.to_sql('T_GDI_DATA_DAY',engine,index=False,dtype=dtype,if_exists='append')
def getGK(date):
GetGkDataListUrl='XXX'
df_Burn=pd.io.sql.read_sql_table('T_GDI_INFO_MP_BURN'.lower(),con=engine)
dataList0=[]
for i,ps in df_Burn.iterrows():
time.sleep(10)
url=GetGkDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))
dataList=retryGet(url)
print(dataList)
dataList0 =dataList
break
df_Gk=pd.DataFrame(dataList0)
return df_Gk
def saveGk_oracle(df_table):
dtype={"PS_CODE":OType.VARCHAR2(80),"MP_CODE":OType.VARCHAR2(80),"MONITOR_TIME":OType.DATE(),
"GK_NAME":OType.VARCHAR2(20),"DAY":OType.VARCHAR2(20),
"STATUS":OType.NUMBER(1),"DATA_STATUS":OType.NUMBER(1),"CREATE_TIME":OType.DATE(),"UPDATE_TIME":OType.DATE(),
"REMARK":OType.VARCHAR2(300),
}
df_table.rename(columns=str.upper,inplace=True)
dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
for col in dateCols:
df_table[col]=pd.to_datetime(df_table[col])
df_table.to_sql('T_GDI_DATA_GK',engine,index=False,dtype=dtype,if_exists='append')
def getLW(date):
GetGkDataListUrl='XXX'
df_Burn=pd.io.sql.read_sql_table('T_GDI_INFO_MP_BURN'.lower(),con=engine)
dataList0=[]
for i,ps in df_Burn.iterrows():
url=GetGkDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))
print(url)
dataList=retryGet(url)
dataList0 =dataList
df_Lw=pd.DataFrame(dataList0)
print(df_Lw)
return df_Lw
def saveLw_oracle(df_table):
dtype={"PS_CODE":OType.VARCHAR2(80),"MP_CODE":OType.VARCHAR2(80),"MONITOR_TIME":OType.DATE(),
"STRENGTH":OType.NUMBER(20,6),"STANDARD_VALUE":OType.NUMBER(20,6),
"STATUS":OType.NUMBER(1),"DATA_STATUS":OType.NUMBER(1),"CREATE_TIME":OType.DATE(),"UPDATE_TIME":OType.DATE(),
"TX_INFO":OType.VARCHAR2(300),"ABNORMAL_REASON":OType.VARCHAR2(300),"FAULT_INFO":OType.VARCHAR2(300),
"CEMS_INFO":OType.VARCHAR2(300),"DAY":OType.VARCHAR2(20),"H_NAME":OType.NUMBER(20,6),
}
df_table.rename(columns=str.upper,inplace=True)
dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
for col in dateCols:
df_table[col]=pd.to_datetime(df_table[col])
df_table.to_sql('T_GDI_DATA_LW2',engine,index=False,dtype=dtype,if_exists='append')
导入MySQL代码如下:
代码语言:python代码运行次数:0复制import requests
import pandas as pd
import sqlalchemy as SAC
from sqlalchemy import create_engine
from datetime import datetime,timedelta
import retrying
import time
engine = create_engine('mysql pymysql://XXX:XXX@XXX/garbage_power?charset=utf8mb4')
def getPS():
GetPSListUrl='XXX'
res=requests.get(GetPSListUrl)
psList=res.json()
df_PS=pd.DataFrame(psList)
df_PS.drop(columns=["token_id","error_web"],inplace=True)
return df_PS
def savePS_mysql():
dtype={col:SAC.CHAR(200) for col in df_PS.columns}
dtype0={'longitude':SAC.FLOAT(),'latitude':SAC.FLOAT(),"data_status":SAC.INTEGER(),
"create_time":SAC.DATETIME(),"boiler_num":SAC.INTEGER(),
"manufacture_date":SAC.DATE(),"update_time":SAC.DATETIME()}
dtype.update(dtype0)
df_PS.to_sql('psinfo',engine,index=False,dtype=dtype)
def getBurn():
GetBurnListUrl='XXX'
#df_PS=pd.io.sql.read_sql_table('psinfo',con=engine)
df_PS=pd.io.sql.read_sql_table('psinfo',con=engine)
dataList0=[]
for i,ps in df_PS.iterrows():
url=GetBurnListUrl.format(pscode=ps.ps_code)
dataList=retryGet(url)
dataList0 =dataList
df_Burn=pd.DataFrame(burnList0)
df_Burn.replace({'':None},inplace=True)
return df_Burn
def saveBurn_mysql(df_table):
dtype={col:SAC.CHAR(200) for col in df_table.columns}
dtype0={"open_time":SAC.DATETIME(),
"create_time":SAC.DATETIME(),"data_status":SAC.INTEGER(),
"creater_time":SAC.DATE(),"update_time":SAC.DATETIME()}
dtype.update(dtype0)
df_table.to_sql('burninfo',engine,index=False,dtype=dtype)
@retrying.retry()
def retryGet(url):
time.sleep(10)
res=requests.get(url)
data=res.json()
print(type(data))
if(isinstance(data,int)):
print(res.content)
raise Exception()
return data
def getMonitor(date):
GetMonitorDataListUrl='XXX'
df_Burn=pd.io.sql.read_sql_table('burninfo'.lower(),con=engine)
dataList0=[]
for i,ps in df_Burn.iterrows():
url=GetMonitorDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))
dataList=retryGet(url)
print(dataList)
dataList0 =dataList
df_Monitor=pd.DataFrame(dataList0)
return df_Monitor
def saveMonitor_mysql(df_table):
dtype={col:SAC.CHAR(200) for col in df_table.columns}
dtype0={"create_time":SAC.DATETIME(),"status":SAC.INTEGER(),"strength":SAC.FLOAT(),"standard_value":SAC.FLOAT(),
"monitor_time":SAC.DATETIME(),"data_status":SAC.INTEGER(),
"day":SAC.DATE(),"update_time":SAC.DATETIME()}
dtype.update(dtype0)
print(df_table)
df_table.to_sql('monitordata',engine,index=False,if_exists='append',dtype=dtype)
def getGK(date):
GetGkDataListUrl='XXX'
df_Burn=pd.io.sql.read_sql_table('burninfo'.lower(),con=engine)
dataList0=[]
for i,ps in df_Burn.iterrows():
time.sleep(10)
url=GetGkDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))
dataList=retryGet(url)
print(dataList)
dataList0 =dataList
df_Gk=pd.DataFrame(dataList0)
return df_Gk
def saveGk_mysql(df_table):
dtype={col:SAC.CHAR(200) for col in df_Gk.columns}
dtype0={"create_time":SAC.DATETIME(),"monitor_time":SAC.DATETIME(),"data_status":SAC.INTEGER(),
"day":SAC.DATE(),"update_time":SAC.DATETIME()}
dtype.update(dtype0)
print(df_table)
df_table.to_sql('gkdata',engine,index=False,if_exists='append',dtype=dtype)
def getLW(date):
GetGkDataListUrl='XXX'
df_Burn=pd.io.sql.read_sql_table('burninfo'.lower(),con=engine)
dataList0=[]
for i,ps in df_Burn.iterrows():
time.sleep(10)
url=GetGkDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))
print(url)
dataList=retryGet(url)
dataList0 =dataList
break
df_Lw=pd.DataFrame(dataList0)
print(df_Lw)
return df_Lw
def saveLw_mysql(df_table):
dtype={col:SAC.CHAR(200) for col in df_Lw.columns}
dtype0={"create_time":SAC.DATETIME(),"monitor_time":SAC.DATETIME(),
"data_status":SAC.INTEGER(),"status":SAC.INTEGER(),"h_name":SAC.FLOAT(),
"day":SAC.DATE(),"standard_value":SAC.FLOAT(),"strength":SAC.FLOAT()}
dtype.update(dtype0)
df_Lw.to_sql('lwdata',engine,index=False,if_exists='append',dtype=dtype)