@aloxc
2018-06-21T01:44:55.000000Z
字数 3826
阅读 528
高并发
如题,设计模式中有个生产者消费者模式,具体在java中可以用以下一些方式来完成生产者消费者模式
public class ProduceConsumeTest {private final static Log logger = LogFactory.getLog(ProduceConsumeTest.class);static AtomicInteger count = new AtomicInteger();List<Integer> list = new ArrayList<Integer>(1);public synchronized void put(int index){while (list.size() >= 10) {try {wait();} catch (InterruptedException e) {}}int val = count.incrementAndGet();list.add(val);System.out.println("["+index+"]添加数据了" + val);notifyAll();}public synchronized void get(int index){while (list.size() <= 0) {try {wait();} catch (InterruptedException e) {}}Integer a = list.remove(0);;System.out.println("["+index+"]消费数据了" + a);notifyAll();}public static void main(String[] args) {ProduceConsumeTest ins = new ProduceConsumeTest();for (int i = 0; i < 20; i++) {new Thread(new Produce(ins,i)).start();new Thread(new Consume(ins,i)).start();}}static class Produce implements Runnable{private final ProduceConsumeTest ins;private final int index;public Produce(ProduceConsumeTest ins, int index) {this.ins = ins;this.index = index;}@Overridepublic void run() {ins.put(index);}}static class Consume implements Runnable{private final int index;private final ProduceConsumeTest ins;public Consume(ProduceConsumeTest ins,int index) {this.index = index;this.ins = ins;}@Overridepublic void run() {ins.get(index);}}}
public class ProduceConsumeTest2 {private final static Log logger = LogFactory.getLog(ProduceConsumeTest2.class);static AtomicInteger count = new AtomicInteger();Lock lock = new ReentrantLock();Condition full = lock.newCondition();Condition empty = lock.newCondition();List<Integer> list = new ArrayList<Integer>(1);public void put(int index){lock.lock();while (list.size() >= 10) {try {full.await();;} catch (InterruptedException e) {}}int val = count.incrementAndGet();list.add(val);System.out.println("["+index+"]添加数据了" + val);empty.signal();lock.unlock();}public void get(int index){lock.lock();while (list.size() <= 0) {try {empty.await();} catch (InterruptedException e) {}}Integer a = list.remove(0);;System.out.println("["+index+"]消费数据了" + a);full.signal();lock.unlock();}public static void main(String[] args) {ProduceConsumeTest2 ins = new ProduceConsumeTest2();for (int i = 0; i < 20; i++) {new Thread(new Produce(ins,i)).start();new Thread(new Consume(ins,i)).start();}}static class Produce implements Runnable{private final ProduceConsumeTest2 ins;private final int index;public Produce(ProduceConsumeTest2 ins, int index) {this.ins = ins;this.index = index;}@Overridepublic void run() {ins.put(index);}}static class Consume implements Runnable{private final int index;private final ProduceConsumeTest2 ins;public Consume(ProduceConsumeTest2 ins, int index) {this.index = index;this.ins = ins;}@Overridepublic void run() {ins.get(index);}}}
SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
public class SynchronousQueueTest {private final static Log logger = LogFactory.getLog(SynchronousQueueTest.class);public static void main(String[] args) {SynchronousQueue<String> queue = new SynchronousQueue<>(true);int threadCount = 10;Thread[] consumers = new Thread[threadCount];Thread[] producers = new Thread[threadCount];LongAdder adder = new LongAdder();ExecutorService service = Executors.newFixedThreadPool(threadCount * 2 );for (int i = 0; i < threadCount; i++) {consumers[i] = new Thread("消费者" + i){@Overridepublic void run() {super.run();while (true) {try {System.out.println(queue.take());} catch (InterruptedException e) {e.printStackTrace();}}}};producers[i] = new Thread("生产者" + i){@Overridepublic void run() {super.run();while (true) {try {adder.increment();queue.put(Thread.currentThread().getName() + "--发出的消息 \t" + adder.intValue());Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}};service.submit(producers[i]);service.submit(consumers[i]);}}}
