在API中是这样来描述Semaphore
的
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire()
,然后再获取该许可。每个 release()
添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore
只对可用许可的号码进行计数,并采取相应的行动。
例如,下面的类使用信号量控制线程并发的数量
代码语言:javascript复制import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class TestSemaphore {
/**
* @param args
*/
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3,true);
for(int i=0;i<10;i ){
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
sp.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sp.availablePermits());
System.out.println("线程 " Thread.currentThread().getName() "进入,已有" (3-sp.availablePermits()) "并发") ;
try {
Thread.sleep((long) (Math.random()*3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程 " Thread.currentThread().getName() "即将离开 " );
sp.release();
System.out.println("线程 " Thread.currentThread().getName() "离开 ,已有" (3-sp.availablePermits()) "并发");
}
};
pool.execute(runnable);
}
}
}
再例如可以通过信号量来控制线程访问资源:
代码语言:javascript复制import java.util.concurrent.Semaphore;
public class DownloadThread {
private static int in_index = 0;
private static int out_index = 0;
private static int buffer_count = 100;
public static boolean g_downloadComplete;
private static Semaphore g_seFull = new Semaphore(0);
private static Semaphore g_seEmpty = new Semaphore(buffer_count);
public static boolean getBlockFromNet(int in_index) {
int i = 0;
while (i < 10000)
i ;
if (in_index < buffer_count - 1)
return false;
else
return true;
}
public static void writeBlockToDisk(int out_index) {
int i = 0;
while (i < 100000)
i ;
}
/**
* @param args
*/
public static void main(String[] args) {
g_downloadComplete = false;
Thread threadA = new Thread() {
public void run() {
proA();
}
};
Thread threadB = new Thread() {
public void run() {
proB();
}
};
threadB.start();
threadA.start();
}
public static void proA(){
while (g_seFull.availablePermits() < buffer_count) {
try {
g_seEmpty.acquire();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
g_downloadComplete = getBlockFromNet(in_index);
in_index = (in_index 1) % buffer_count;
g_seFull.release();
System.out.println("download a block " in_index);
if (g_downloadComplete)
break;
}
}
public static void proB(){
while (g_seEmpty.availablePermits() > 0) {
try {
g_seFull.acquire();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
writeBlockToDisk(out_index);
out_index = (out_index 1) % buffer_count;
g_seEmpty.release();
System.out.println("write a block " out_index);
if (g_downloadComplete && out_index == in_index)
break;
}
}
}