問題背景
Producer(生產者)和 Consumer(消費者)的速度通常不一樣:
- Web server 接受 request(快)→ 發送 email(慢)
- 感測器資料採集(高頻)→ 資料庫寫入(有 I/O 延遲)
- 爬蟲抓取 URL(快)→ HTML 解析(CPU 密集,慢)
直接讓 Producer 等 Consumer 完成再繼續——Producer 的速度被拖累到 Consumer 的速度。
Producer-Consumer 模式在兩者之間插入一個 Buffer(有界佇列),讓:
- Producer 把任務放進 queue 就返回,不等 Consumer
- Consumer 從 queue 取任務,以自己的速度處理
Bounded Buffer 和 Backpressure
有界佇列(Bounded Queue):queue 有最大容量。當 queue 滿了,Producer 不能再放——這叫做 backpressure(背壓)。
Backpressure 是健康的系統設計:它讓上游知道下游的處理能力,而不是讓 queue 無限成長到記憶體耗盡。
處理 queue 滿的策略:
- Blocking:Producer 等待直到 queue 有空間(同步 backpressure)
- Drop:丟棄新的任務(適合可以接受資料丟失的場景)
- Drop oldest:丟掉最舊的任務(滑動窗口)
- Error:向 Producer 返回錯誤讓它自己決定
Java:BlockingQueue
import java.util.concurrent.*;
class ImageProcessor {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
// Producer
public void submit(String imageUrl) throws InterruptedException {
queue.put(imageUrl); // blocking 直到有空間
}
// Consumer(在獨立 thread 跑)
public void startProcessing() {
Thread consumer = new Thread(() -> {
while (true) {
try {
String url = queue.take(); // blocking 直到有任務
processImage(url);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
consumer.start();
}
}Go:Channel
Go 的 channel 就是 bounded buffer:
func main() {
jobs := make(chan string, 100) // buffered channel,容量 100
// Producers
go func() {
for _, url := range urls {
jobs <- url // 送入 channel(滿了就 block)
}
close(jobs)
}()
// Multiple Consumers
var wg sync.WaitGroup
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range jobs { // channel 關閉後自動退出
processImage(url)
}
}()
}
wg.Wait()
}現代分散式版本
在微服務架構裡,Producer-Consumer 的 queue 從 in-process 的 BlockingQueue 擴展成 message broker:
- RabbitMQ:Push 模式,broker 推給 Consumer,有 ACK 機制
- Kafka:Pull 模式,Consumer 自己 poll,有 consumer group 和 offset 管理
- Redis Streams / Bull:輕量版,適合同一服務內的異步任務
原理和本地的 Producer-Consumer 相同,只是 queue 在網路上,帶來了分散式系統的新問題(at-least-once / exactly-once delivery、Consumer crash 的 recovery)。
適用場景
- API → 異步處理(email、通知、圖片處理)
- ETL pipeline(Extract → Transform → Load 的各階段)
- 事件驅動架構(event production → event processing)
- Rate limiting(把高峰流量平滑分散到後端)