下面是一系列使用信号量实现生产者/消费者场景的类。 “主”进程初始化信号量并等待用户指示活动已全部完成。生产者在循环中随机增加一个信号量值,更新之间的延迟可变。消费者尝试在随机时间从信号量中删除随机数量,也是在循环中。该示例由 5 个类组成:
-
Main
– 初始化环境并等待信号量上的活动完成的类。 -
Counter
– 实现信号量本身的类。它记录它的创建以及由于信号量在等待列表中而发生的任何回调。 -
Producer
– 一个类,其主要方法增加信号量值。增量是一个随机选择的小整数。完成增量后,该方法会在下一个增量之前延迟一小段随机数秒。 -
Consumer
消费者——这是对生产者的补充。此类的主要方法尝试将信号量减少一个随机选择的小整数。它将递减请求添加到其等待列表中,等待时间也是随机选择的秒数。 - Util - 这个类有几个方法被示例的其他类使用。几种方法解决了为所有活动维护公共日志的问题;其他人解决了多个消费者和多个生产者的命名问题。
注意:组成这些类的代码特意写得简单。尽可能地,每个语句只完成一个动作。这应该使用户更容易和更直接地修改示例。
Class: Semaphore.Main
此类建立演示环境。它调用实用程序类来初始化日志和名称索引工具。然后它用初始值 0
初始化公共信号量,并等待用户输入一个字符(通常是 ENTER
键),表明实验已经完成。
一旦它接收到用户输入,它就会报告信号量的当前值,尝试删除它,并终止执行。
代码语言:javascript复制Class Semaphore.Main Extends %RegisteredObject
{
/// 共享信号量的名称
Parameter ME = "Main";
/// 信号量演示的驱动程序
ClassMethod Run()
{
// 初始化日志记录全局变量
d ##class(Semaphore.Util).InitLog()
d ##class(Semaphore.Util).InitIndex()
s msg = ..#ME _ " 开始"
d ..Log(msg)
// 创建和初始化信号量
s inventory = ##class(Semaphore.Counter).%New()
if ('($isobject(inventory))) {
s msg = "%New() of MySem failed"
d ..Log(msg)
q
}
s msg = "信号创建结果: " _ inventory.Init(0)
d ..Log(msg)
// 等待终止响应
s msg = "输入任何字符以终止 Run 方法"
d ..Log(msg)
r *x
// 报告最终值,删除信号量并完成
s msg = "终值 = " _ inventory.GetValue()
d ..Log(msg)
s msg = "信号量删除状态: " _ inventory.Delete()
d ..Log(msg)
s msg = ..#ME _ " 结束"
d ..Log(msg)
q
}
/// 将收到的消息输入到公共日志中
ClassMethod Log(msg As %String) [ Private ]
{
d ##class(Semaphore.Util).Logger(..#ME, msg)
q
}
}
Class: Semaphore.Counter
此类实现示例中使用的信号量。根据需要,它是 %SYSTEM.Semaphore
的子类,并提供方法 WaitCompleted
的实现。为了简单起见,初始化信号量的代码也包含在这个类中。还有一个类方法提供此信号量的名称,以允许设置、生产者和消费者类获取它。
Class Semaphore.Counter Extends %SYSTEM.Semaphore
{
ClassMethod Name() As %String
{
Quit "Counter"
}
/// 直接向日志工具发送消息
Method Log(Msg As %String) [ Private ]
{
d ##class(Semaphore.Util).Logger(..Name(), Msg)
q
}
/// 将信号量 id 值作为十六进制字符串返回
Method MyId() As %String
{
q ("0x" _ $zhex(..SemID))
}
/// 创建实例时调用
Method %OnNew() As %Status
{
s msg = "新信号"
d ..Log(msg)
q $$$OK
}
Method Init(initvalue = 0) As %Status
{
try {
if (..Create(..Name(), initvalue)) {
s msg = "创建 """ _ ..Name()
_ """; 值为 = " _ initvalue
_ "; Id = 0x" _ ..MyId()
d ..Log(msg)
ret 1
} else {
s msg = "信号创建失败: Name = """ _ ..Name() _ """"
d ..Log(msg)
ret 0
}
} catch {
s msg = "捕获信号量故障"
d ..Log(msg)
ret 0
}
}
Method %OnClose() As %Status [ Private ]
{
s msg = "关闭信号量: Id = " _ ..MyId()
d ..Log(msg)
q $$$OK
}
/// 此方法由 WaitMany() 作为回调调用。信号量中存在非零数量或等待超时。
/// 减少的数量作为参数传递给此方法;零,在超时的情况下。
///
/// 调用此方法后,信号量将从等待多列表中删除。
/// 需要显式调用 AddToWaitMany 才能将其放回等待列表。
Method WaitCompleted(amt As %Integer)
{
// 只需报告递减量
s msg = "WaitCompleted: " _ ..MyId() _ "; Amt = " _ amt
d ..Log(msg)
q
}
}
Class: Semaphore.Producer
此类负责获取公共信号量的 OREF
。一旦它拥有了OREF
,它就会尝试将信号量重复增加一个随机选择的小整数,并在每次增量之间暂停一个小的随机选择间隔。每次增加信号量的尝试都会输入到日志中。
Class Semaphore.Producer Extends %RegisteredObject
{
/// 类名
Parameter MeBase = "Producer";
/// 暂停后以少量随机增加信号量
ClassMethod Run() As %Status
{
// 建立名称和访问信号量
s ME = ##class(Semaphore.Util).IndexName(..#MeBase)
s msg = ME _ " 开始"
d ..Logger(ME, msg)
s cell = ##class(Semaphore.Counter).%New()
d cell.Open(##class(Semaphore.Counter).Name())
s msg = "open Id = " _ cell.MyId()
d ..Logger(ME, msg)
// 在随机时间按随机量增加信号量
for addcnt = 1 : 1 : 8 {
s incamt = $random(5) 1
s waitsec = $random(10) 1
s msg = "increment " _ cell.MyId()
_ " = " _ cell.GetValue()
_ " by " _ incamt
_ " wait " _ waitsec _ " sec"
d cell.Increment(incamt)
d ..Logger(ME, msg)
h waitsec
}
// 结束
s msg = ME _ " 结束"
d ..Logger(ME, msg)
q $$$OK
}
/// 将消息传送到日期记录器
ClassMethod Logger(id As %String, msg As %String) [ Private ]
{
d ##class(Semaphore.Util).Logger(id, msg)
q
}
}
Class: Semaphore.Consumer
这个类是对 Semaphore.Producer
的补充。它也获得了公共信号量的 OREF
,并以与 Producer
类似的方式尝试将信号量重复减少随机选择的数量,并在每次尝试之间随机选择暂停。每次尝试的成功或失败都会写入日志。
Class Semaphore.Consumer Extends %RegisteredObject
{
/// 类名
Parameter MeBase = "Consumer";
/// 暂停后将信号量减少少量随机数
ClassMethod Run() As %Status
{
// 建立名称和访问信号量
s ME = ##class(Semaphore.Util).IndexName(..#MeBase)
s msg = ME _ " 开始"
d ..Logger(ME, msg)
s cell = ##class(Semaphore.Counter).%New()
d cell.Open(##class(Semaphore.Counter).Name())
s msg = "Consumer: Open Id = " _ cell.MyId()
d ..Logger(ME, msg)
// 以不同的数量和不同的时间反复递减信号量
for deccnt = 1 : 1 : 15 {
s decamt = $RANDOM(5) 1
s waitsec = $RANDOM(10) 1
s msg = "Decrement " _ cell.MyId()
_ " = " _ cell.GetValue()
_ " by " _ decamt
_ " wait " _ waitsec _ " sec"
// 在这种情况下,我们等待一个信号量,但我们可以一次等待多个信号量减量(最多 200)
d cell.AddToWaitMany(decamt)
d ..Logger(ME, msg)
s result = ##class(%SYSTEM.Semaphore).WaitMany(waitsec)
s msg = $select((result > 0) : "授权", 1 : "超时")
d ..Logger(ME, msg)
}
// 结束
s msg = ME _ " 结束"
d ..Logger(ME, msg)
q $$$OK
}
/// 将消息传送到日志记录器
ClassMethod Logger(id As %String, msg As %String) [ Private ]
{
d ##class(Semaphore.Util).Logger(id, msg)
q
}
}
Class: Semaphore.Util
此类包含解决与此示例相关的两个问题的方法。第一个是保存记录消息所需的结构的初始化,以及归档提交到日志的消息及其后续显示的方法。
第二组方法处理生成编号序列的名称以识别生产者和消费者。这不是严格需要的,因为 $JOB
命令提供的进程 ID
也这样做,但使用更易于阅读的标签更容易。
Class Semaphore.Util Extends %RegisteredObject
{
/// 共享信号量的名称
Parameter ME = "Util";
/// 初始化输出日志
ClassMethod InitLog()
{
// 初始化日志记录全局
k ^SemaphoreLog
s ^SemaphoreLog = 0
q
}
/// 将收到的消息输入到全局以进行日志记录
///
ClassMethod Logger(sender As %String, msg As %String)
{
s inx = $i(^SemaphoreLog)
s ^SemaphoreLog(inx, 0) = $job
s ^SemaphoreLog(inx, 1) = sender
s ^SemaphoreLog(inx, 2) = msg
w "(", ^SemaphoreLog, ") ", msg, !
q
}
/// 显示日志中的消息
ClassMethod ShowLog()
{
s msgcnt = $g(^SemaphoreLog, 0)
w "消息日志:条目 = ", msgcnt, !, !
w "#", ?5, "$Job", ?12, "Sender", ?25, "Message", !
for i = 1 : 1 : msgcnt {
s job = ^SemaphoreLog(i, 0)
s sender = ^SemaphoreLog(i, 1)
s msg = ^SemaphoreLog(i, 2)
w i, ")", ?5, job, ?12, sender, ":", ?25, msg, !
}
q
}
/// 初始化名称索引
ClassMethod InitIndex()
{
k ^SemaphoreNames
q
}
/// 初始化名称索引
ClassMethod IndexName(name As %String) As %String
{
if ($d(^SemaphoreNames(name)) = 0) {
s ^SemaphoreNames(name) = 0
}
s index = $i(^SemaphoreNames(name))
q (name _ "." _ index)
}
}