[关闭]
@aloxc 2018-06-21T01:44:55.000000Z 字数 3826 阅读 484

java实现生产者消费者模式的几种方式

高并发


如题,设计模式中有个生产者消费者模式,具体在java中可以用以下一些方式来完成生产者消费者模式

1.使用synchronized,配合wait()和notify()、notifyAll()方法实现

  1. public class ProduceConsumeTest {
  2. private final static Log logger = LogFactory.getLog(ProduceConsumeTest.class);
  3. static AtomicInteger count = new AtomicInteger();
  4. List<Integer> list = new ArrayList<Integer>(1);
  5. public synchronized void put(int index){
  6. while (list.size() >= 10) {
  7. try {
  8. wait();
  9. } catch (InterruptedException e) {
  10. }
  11. }
  12. int val = count.incrementAndGet();
  13. list.add(val);
  14. System.out.println("["+index+"]添加数据了" + val);
  15. notifyAll();
  16. }
  17. public synchronized void get(int index){
  18. while (list.size() <= 0) {
  19. try {
  20. wait();
  21. } catch (InterruptedException e) {
  22. }
  23. }
  24. Integer a = list.remove(0);;
  25. System.out.println("["+index+"]消费数据了" + a);
  26. notifyAll();
  27. }
  28. public static void main(String[] args) {
  29. ProduceConsumeTest ins = new ProduceConsumeTest();
  30. for (int i = 0; i < 20; i++) {
  31. new Thread(new Produce(ins,i)).start();
  32. new Thread(new Consume(ins,i)).start();
  33. }
  34. }
  35. static class Produce implements Runnable{
  36. private final ProduceConsumeTest ins;
  37. private final int index;
  38. public Produce(ProduceConsumeTest ins, int index) {
  39. this.ins = ins;
  40. this.index = index;
  41. }
  42. @Override
  43. public void run() {
  44. ins.put(index);
  45. }
  46. }
  47. static class Consume implements Runnable{
  48. private final int index;
  49. private final ProduceConsumeTest ins;
  50. public Consume(ProduceConsumeTest ins,int index) {
  51. this.index = index;
  52. this.ins = ins;
  53. }
  54. @Override
  55. public void run() {
  56. ins.get(index);
  57. }
  58. }
  59. }

2.使用ReentrantLock的newCondition,配合await()和signal()、signalAll()方法

  1. public class ProduceConsumeTest2 {
  2. private final static Log logger = LogFactory.getLog(ProduceConsumeTest2.class);
  3. static AtomicInteger count = new AtomicInteger();
  4. Lock lock = new ReentrantLock();
  5. Condition full = lock.newCondition();
  6. Condition empty = lock.newCondition();
  7. List<Integer> list = new ArrayList<Integer>(1);
  8. public void put(int index){
  9. lock.lock();
  10. while (list.size() >= 10) {
  11. try {
  12. full.await();;
  13. } catch (InterruptedException e) {
  14. }
  15. }
  16. int val = count.incrementAndGet();
  17. list.add(val);
  18. System.out.println("["+index+"]添加数据了" + val);
  19. empty.signal();
  20. lock.unlock();
  21. }
  22. public void get(int index){
  23. lock.lock();
  24. while (list.size() <= 0) {
  25. try {
  26. empty.await();
  27. } catch (InterruptedException e) {
  28. }
  29. }
  30. Integer a = list.remove(0);;
  31. System.out.println("["+index+"]消费数据了" + a);
  32. full.signal();
  33. lock.unlock();
  34. }
  35. public static void main(String[] args) {
  36. ProduceConsumeTest2 ins = new ProduceConsumeTest2();
  37. for (int i = 0; i < 20; i++) {
  38. new Thread(new Produce(ins,i)).start();
  39. new Thread(new Consume(ins,i)).start();
  40. }
  41. }
  42. static class Produce implements Runnable{
  43. private final ProduceConsumeTest2 ins;
  44. private final int index;
  45. public Produce(ProduceConsumeTest2 ins, int index) {
  46. this.ins = ins;
  47. this.index = index;
  48. }
  49. @Override
  50. public void run() {
  51. ins.put(index);
  52. }
  53. }
  54. static class Consume implements Runnable{
  55. private final int index;
  56. private final ProduceConsumeTest2 ins;
  57. public Consume(ProduceConsumeTest2 ins, int index) {
  58. this.index = index;
  59. this.ins = ins;
  60. }
  61. @Override
  62. public void run() {
  63. ins.get(index);
  64. }
  65. }
  66. }

3.使用阻塞队列,配合阻塞队列的put()和take()方法,这其中特别是SynchronousQueue更是一个特别的阻塞队列。

SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

  1. public class SynchronousQueueTest {
  2. private final static Log logger = LogFactory.getLog(SynchronousQueueTest.class);
  3. public static void main(String[] args) {
  4. SynchronousQueue<String> queue = new SynchronousQueue<>(true);
  5. int threadCount = 10;
  6. Thread[] consumers = new Thread[threadCount];
  7. Thread[] producers = new Thread[threadCount];
  8. LongAdder adder = new LongAdder();
  9. ExecutorService service = Executors.newFixedThreadPool(threadCount * 2 );
  10. for (int i = 0; i < threadCount; i++) {
  11. consumers[i] = new Thread("消费者" + i){
  12. @Override
  13. public void run() {
  14. super.run();
  15. while (true) {
  16. try {
  17. System.out.println(queue.take());
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. };
  24. producers[i] = new Thread("生产者" + i){
  25. @Override
  26. public void run() {
  27. super.run();
  28. while (true) {
  29. try {
  30. adder.increment();
  31. queue.put(Thread.currentThread().getName() + "--发出的消息 \t" + adder.intValue());
  32. Thread.sleep(3000);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. };
  39. service.submit(producers[i]);
  40. service.submit(consumers[i]);
  41. }
  42. }
  43. }

4.使用第三方服务,如各种mq(RocketMQ、kafka)、redis之pub/sub、rxjava

x

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注