使用Java实现生产者消费者队列
参考: https://www.jianshu.com/p/66e8b5ab27f6
1、使用wait()和notify()实现
public class TestMultiThread {
private final static Integer FULL = 10;
private static Integer count = 0;
private final static Class MYLOCK = TestMultiThread.class;
public static void main(String[] args) {
TestMultiThread testMultiThread = new TestMultiThread();
for (int i = 0; i < 4; i++) {
new Thread(testMultiThread.new Producer()).start();
new Thread(testMultiThread.new Consumer()).start();
}
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (MYLOCK) {
while (count == FULL) {
try {
MYLOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
++count;
System.out.println("生产者:" + Thread.currentThread().getName() + "生产数据,目前共有:" + count + "个任务");
MYLOCK.notifyAll();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (MYLOCK) {
while (count == 0) {
try {
MYLOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
--count;
System.out.println("消费者:" + Thread.currentThread().getName() + "消费数据,目前共有:" + count + "个任务");
MYLOCK.notifyAll();
}
}
}
}
}
2、使用可重入锁ReentrantLock实现
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestMultiThread {
private final static Integer FULL = 10;
private static Integer count = 0;
private Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
TestMultiThread testMultiThread = new TestMultiThread();
for (int i = 0; i < 4; i++) {
new Thread(testMultiThread.new Producer()).start();
new Thread(testMultiThread.new Consumer()).start();
}
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
lock.lock();
while (count == FULL) notFull.await();
++count;
System.out.println("生产者:" + Thread.currentThread().getName() + "生产数据,目前共有:" + count + "个任务");
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(5000);
lock.lock();
while (count == 0) notEmpty.await();
--count;
System.out.println("消费者:" + Thread.currentThread().getName() + "消费数据,目前共有:" + count + "个任务");
notFull.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
3、使用阻塞队列BlockingQueue实现
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestMultiThread {
private AtomicInteger count = new AtomicInteger(0);
//创建一个阻塞队列
final static BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
private final Class MYLOCK = TestMultiThread.class;
public static void main(String[] args) {
TestMultiThread testMultiThread = new TestMultiThread();
for (int i = 0; i < 5; i++) {
new Thread(testMultiThread.new Producer()).start();
new Thread(testMultiThread.new Consumer()).start();
}
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
blockingQueue.put(1);
synchronized (MYLOCK) {
System.out.println("生产者:" + Thread.currentThread().getName() + "生产数据,目前共有:" + count.incrementAndGet() + "个任务");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(5000);
blockingQueue.take();
synchronized (MYLOCK) {
System.out.println("消费者:" + Thread.currentThread().getName() + "消费数据,目前共有:" + count.decrementAndGet() + "个任务");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
4、使用信号量Semaphore实现
import java.util.concurrent.Semaphore;
public class TestMultiThread {
private static Integer count = 0;
private final Semaphore notFull = new Semaphore(10);
private final Semaphore notEmpty = new Semaphore(0);
private final Semaphore mutex = new Semaphore(1);
public static void main(String[] args) {
TestMultiThread testMultiThread = new TestMultiThread();
for (int i = 0; i < 4; i++) {
new Thread(testMultiThread.new Producer()).start();
new Thread(testMultiThread.new Consumer()).start();
}
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
notFull.acquire();
mutex.acquire();
++count;
System.out.println("生产者:" + Thread.currentThread().getName() + "生产数据,目前共有:" + count + "个任务");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
notEmpty.release();
mutex.release();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
notEmpty.acquire();
mutex.acquire();
--count;
System.out.println("消费者:" + Thread.currentThread().getName() + "消费数据,目前共有:" + count + "个任务");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
notFull.release();
mutex.release();
}
}
}
}
}
5、使用管道输入输出流PipedInputStream,PipedOutInputStream实现
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class TestMultiThread {
public static void main(String[] args) {
TestMultiThread testMultiThread = new TestMultiThread();
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();
try {
pos.connect(pis);
new Thread(testMultiThread.new Producer(pos)).start();
new Thread(testMultiThread.new Consumer(pis)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
class Producer implements Runnable {
private PipedOutputStream pos;
public Producer(PipedOutputStream pos) {
this.pos = pos;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
int num = (int) (Math.random() * 100);
System.out.println("生产者:" + Thread.currentThread().getName() + "生产了数字:" + num);
pos.write(num);
pos.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private PipedInputStream pis;
public Consumer(PipedInputStream pis) {
this.pis = pis;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
int num = pis.read();
System.out.println("消费者:" + Thread.currentThread().getName() + "消费数字:" + num);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}


