在使用 IPython 集群进行并行计算时,可能会遇到 PicklingError
。这种错误通常与 Python 对象的序列化(即“pickling”)有关。Pickling 是将 Python 对象转换为字节流的过程,以便能够在不同的 Python 进程之间传递对象。在分布式计算环境中,如 IPython 集群,这种对象传递是常见的。
1、问题背景
我正在使用 IPython 的 notebook 使用 zipline,所以我首先创建了一个基于 zipline.TradingAlgorithm 的类。 我将该类发送到 IPython 集群引擎以在并行环境中运行。当我尝试在 IPython 集群上运行我的代码时,我遇到了一个错误。
在单元格 [3] 中,我使用 load_from_yahoo
从雅虎加载了股票数据。然后我创建了一个 AgentList
,其中包含三个 Agent
的实例。Agent
类是一个基于 zipline.TradingAlgorithm
的自定义类。
在单元格 [4] 中,我定义了一个名为 testSystem
的函数,该函数接受一个 agent
和一个 data
作为参数。该函数使用 agent
在 data
上运行 zipline
模拟,并将最终的投资组合价值存储在 agent.valueHistory
中。
在单元格 [5] 中,我使用 lview.apply_async
将 testSystem
函数异步地应用于每个 agent
和 data
。然后我使用 ar.get()
获取每个任务的结果。
在单元格 [6] 中,我绘制了每个 agent
的 valueHistory
。
2、解决方案
PicklingError
是因为 zipline.TradingAlgorithm.run()
方法不能被 pickle。为了解决这个问题,我使用以下代码将 run()
方法从 zipline.TradingAlgorithm
复制到了 Agent
类:
def run(self, data):
return zipline.TradingAlgorithm.run(self, data)
除了将 run()
方法复制到 Agent
类之外,我还在 Agent
类中添加了一个 __getstate__
方法,该方法返回一个包含 Agent
状态的字典。
def __getstate__(self):
state = super().__getstate__()
state['valueHistory'] = self.valueHistory
return state
通过将 run()
方法复制到 Agent
类并添加一个 __getstate__
方法,我能够成功地将 Agent
类发送到 IPython 集群并运行它。
以下是我修改后的代码:
代码语言:javascript复制from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
%%px --local # This insures that the Class and modules exist on each engine
import zipline as zpl
import numpy as np
class Agent(zpl.TradingAlgorithm): # must define initialize and handle_data methods
def initialize(self):
self.valueHistory = None
pass
def handle_data(self, data):
for security in data.keys():
## Just randomly buy/sell/hold for each security
coinflip = np.random.random()
if coinflip < .25:
self.order(security,100)
elif coinflip > .75:
self.order(security,-100)
pass
def run(self, data):
return zipline.TradingAlgorithm.run(self, data)
def __getstate__(self):
state = super().__getstate__()
state['valueHistory'] = self.valueHistory
return state
from zipline.utils.factory import load_from_yahoo
start = '2013-04-01'
end = '2013-06-01'
sidList = ['SPY','GOOG']
data = load_from_yahoo(stocks=sidList,start=start,end=end)
agentList = []
for i in range(3):
agentList.append(Agent())
def testSystem(agent,data):
results = agent.run(data) #-- This is how the zipline based class is executed
#-- next I'm just storing the final value of the test so I can plot later
agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])
return agent
for i in range(10):
tasks = []
for agent in agentList:
#agent = testSystem(agent,data) ## On its own, this works!
#-- To Test, uncomment the above line and comment out the next two
tasks.append(lview.apply_async(testSystem,agent,data))
agentList = [ar.get() for ar in tasks]
for agent in agentList:
plot(agent.valueHistory)
在使用 IPython 集群进行并行计算时,如果遇到 PicklingError
,通常是因为你试图传递不可序列化的对象。解决方法包括确保函数在全局作用域中定义、使用 dill
代替 pickle
、简化数据和代码,以及检查第三方库的兼容性。通过这些方法,你可以有效地避免或解决并行计算中的序列化问题。