@aloxc
2018-06-21T01:44:55.000000Z
字数 3826
阅读 484
高并发
如题,设计模式中有个生产者消费者模式,具体在java中可以用以下一些方式来完成生产者消费者模式
public class ProduceConsumeTest {
private final static Log logger = LogFactory.getLog(ProduceConsumeTest.class);
static AtomicInteger count = new AtomicInteger();
List<Integer> list = new ArrayList<Integer>(1);
public synchronized void put(int index){
while (list.size() >= 10) {
try {
wait();
} catch (InterruptedException e) {
}
}
int val = count.incrementAndGet();
list.add(val);
System.out.println("["+index+"]添加数据了" + val);
notifyAll();
}
public synchronized void get(int index){
while (list.size() <= 0) {
try {
wait();
} catch (InterruptedException e) {
}
}
Integer a = list.remove(0);;
System.out.println("["+index+"]消费数据了" + a);
notifyAll();
}
public static void main(String[] args) {
ProduceConsumeTest ins = new ProduceConsumeTest();
for (int i = 0; i < 20; i++) {
new Thread(new Produce(ins,i)).start();
new Thread(new Consume(ins,i)).start();
}
}
static class Produce implements Runnable{
private final ProduceConsumeTest ins;
private final int index;
public Produce(ProduceConsumeTest ins, int index) {
this.ins = ins;
this.index = index;
}
@Override
public void run() {
ins.put(index);
}
}
static class Consume implements Runnable{
private final int index;
private final ProduceConsumeTest ins;
public Consume(ProduceConsumeTest ins,int index) {
this.index = index;
this.ins = ins;
}
@Override
public void run() {
ins.get(index);
}
}
}
public class ProduceConsumeTest2 {
private final static Log logger = LogFactory.getLog(ProduceConsumeTest2.class);
static AtomicInteger count = new AtomicInteger();
Lock lock = new ReentrantLock();
Condition full = lock.newCondition();
Condition empty = lock.newCondition();
List<Integer> list = new ArrayList<Integer>(1);
public void put(int index){
lock.lock();
while (list.size() >= 10) {
try {
full.await();;
} catch (InterruptedException e) {
}
}
int val = count.incrementAndGet();
list.add(val);
System.out.println("["+index+"]添加数据了" + val);
empty.signal();
lock.unlock();
}
public void get(int index){
lock.lock();
while (list.size() <= 0) {
try {
empty.await();
} catch (InterruptedException e) {
}
}
Integer a = list.remove(0);;
System.out.println("["+index+"]消费数据了" + a);
full.signal();
lock.unlock();
}
public static void main(String[] args) {
ProduceConsumeTest2 ins = new ProduceConsumeTest2();
for (int i = 0; i < 20; i++) {
new Thread(new Produce(ins,i)).start();
new Thread(new Consume(ins,i)).start();
}
}
static class Produce implements Runnable{
private final ProduceConsumeTest2 ins;
private final int index;
public Produce(ProduceConsumeTest2 ins, int index) {
this.ins = ins;
this.index = index;
}
@Override
public void run() {
ins.put(index);
}
}
static class Consume implements Runnable{
private final int index;
private final ProduceConsumeTest2 ins;
public Consume(ProduceConsumeTest2 ins, int index) {
this.index = index;
this.ins = ins;
}
@Override
public void run() {
ins.get(index);
}
}
}
SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
public class SynchronousQueueTest {
private final static Log logger = LogFactory.getLog(SynchronousQueueTest.class);
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>(true);
int threadCount = 10;
Thread[] consumers = new Thread[threadCount];
Thread[] producers = new Thread[threadCount];
LongAdder adder = new LongAdder();
ExecutorService service = Executors.newFixedThreadPool(threadCount * 2 );
for (int i = 0; i < threadCount; i++) {
consumers[i] = new Thread("消费者" + i){
@Override
public void run() {
super.run();
while (true) {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
producers[i] = new Thread("生产者" + i){
@Override
public void run() {
super.run();
while (true) {
try {
adder.increment();
queue.put(Thread.currentThread().getName() + "--发出的消息 \t" + adder.intValue());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
service.submit(producers[i]);
service.submit(consumers[i]);
}
}
}