本文循序渐进地实现一套“最小可用(MVP)”的发布 / 订阅(Pub/Sub)系统,用纯 Python、零第三方依赖,重点是:好懂、能跑、能扩展。
最小可用实现
设计思路(非常简化)
我们只保留 Pub/Sub 的三要素:
- 主题(Topic)
- 订阅者(Subscriber):本质是一个回调函数
- 消息代理(Broker):负责管理订阅关系和消息分发
👉 发布者和订阅者互相不知道对方,只和 Broker 打交道。
最小可用实现
1️⃣ 消息代理(Broker)
1 | class Broker: |
2️⃣ 订阅者(Subscriber)
1 | def subscriber_a(message): |
3️⃣ 发布者(Publisher)
1 | def publisher(broker): |
4️⃣ 运行示例
1 | if __name__ == "__main__": |
运行结果
1 | [A] 收到消息: Hello, Pub/Sub! |
🎉 这已经是一个完整、可工作的发布/订阅系统了。
这是“最小可用”,但你可以这样升级
如果你后面想继续玩大一点,可以按需加:
🔹 异步(asyncio)
1 | async def publish(...) |
🔹 取消订阅
1 | unsubscribe(topic, callback) |
🔹 多进程 / 网络版
queue.Queuemultiprocessing.Queuesocket / ZeroMQ / Redis
🔹 消息持久化
- 文件
- SQLite
- Redis Stream
一句话总结
Pub/Sub 的本质:
用一个中介,把“谁发消息”和“谁收消息”彻底解耦。
加上生产者
先说清楚:加了「Topic 的生产者」意味着什么?
之前的版本是:
- 任意发布者 👉 想往哪个 topic 发就往哪个发
- Broker 不关心 谁 在发
现在你想要的是:
某个 topic 只能由指定的生产者(Producer)来生产消息
也就是说要引入三层关系:
1 | Producer --> Topic --> Subscriber |
最小改动方案
核心改动点只有 3 个:
- Topic 需要注册生产者
- Producer 有身份(name / id)
- Broker 在 publish 时校验 producer 是否有权限
修改后的最小可用实现
1️⃣ Producer 类
1 | class Producer: |
2️⃣ Broker(关键改动)
1 | class Broker: |
3️⃣ Subscriber(不变)
1 | def subscriber_a(msg): |
4️⃣ 使用示例
1 | if __name__ == "__main__": |
这样设计有什么好处?
✅ 权限边界清晰
✅ Topic 生命周期可控
✅ 更接近真实系统(Kafka / Pulsar)
在 Kafka 里对应的是:
| 这里 | Kafka |
|---|---|
| Producer | Producer |
| Topic 注册 | Topic 创建 |
| publish 校验 | ACL / 权限 |
如果你想再“像点工业级”
可以继续加(但别一次全加 😄):
🔹 一个 topic 多个 producer
1 | topic -> set(producer_name) |
🔹 Producer 只能 publish,不能乱 register
1 | register_topic 只允许 admin 调用 |
🔹 Topic schema(消息格式)
1 | def publish(self, topic, message: dict) |
一句话总结
引入 Topic 的生产者,本质是:
把“谁能往哪发消息”这件事,变成 Broker 的职责。
拆成 Producer / Broker / Consumer 三进程
这一步就真正像一个“能跑的分布式 Pub/Sub”了 👍
我们直接来一套 三进程 + TCP Socket 的 最小可用版本:
- 🧠 Broker:独立进程,负责 topic、转发
- ✍️ Producer:独立进程,只负责发消息
- 👂 Consumer:独立进程,订阅并接收消息
目标:
一台机器上,开 3 个终端,就能看到完整的 Pub/Sub 流程
整体架构
1 | Producer --->| |
通信方式:TCP + JSON(一行一条消息)
通信协议(极简)
所有消息都是一行 JSON:
1 | {"type": "subscribe", "topic": "news"} |
Broker 进程(broker.py)
1 | import socket |
Producer 进程(producer.py)
1 | import socket |
Consumer 进程(consumer.py)
1 | import socket |
如何运行(关键)
打开 3 个终端:
1️⃣ 启动 Broker
1 | python broker.py |
2️⃣ 启动 Consumer
1 | python consumer.py |
(你可以开多个 consumer)
3️⃣ 启动 Producer
1 | python producer.py |
运行效果
1 | Broker 启动 |
Consumer:
1 | 等待消息... |
🎉 这就是一个真正的三进程 Pub/Sub 系统了
这个版本“刻意没做”的事(但你现在已经能加了)
✅ 鉴权 / Producer 权限
✅ Topic 注册
✅ 消息持久化
✅ 断线重连
✅ asyncio / epoll
✅ 多 Broker(集群)
一句话总结
拆成三进程后,Pub/Sub 的本质就变成:
Broker 是“路由器”,Producer/Consumer 都是客户端。
上面为什么用socket协议
完全可以不用 socket。
用 socket 只是因为:它是最小、最通用、最接近底层的选择。
为什么示例里用 socket?
1️⃣ socket 是所有网络协议的地基
几乎所有你听过的协议,最后都落在 socket 上:
| 协议 | 底层 |
|---|---|
| HTTP | TCP socket |
| gRPC | HTTP/2 → TCP socket |
| WebSocket | TCP socket |
| MQTT | TCP socket |
用 socket 的好处是:
- 零依赖
- 最少抽象
- 你能完全控制通信模型
👉 非常适合 教学 / 理解原理 / MVP。
2️⃣ Pub/Sub 的核心≠协议
Pub/Sub 真正关心的是:
- 谁订阅了什么
- 消息如何路由
- 什么时候投递
用什么协议只是“运输方式”。
可以换成什么?(按抽象层次从低到高)
✅ 方案 1:HTTP(最常见替代)
思路
- Producer:
POST /publish - Consumer:
POST /subscribe - Broker:HTTP Server
示例结构
1 | POST /publish |
优点
- 简单、熟
- 防火墙友好
- curl 就能调
缺点(关键)
❌ HTTP 是短连接
❌ Broker 无法主动推消息给 Consumer
所以你必须改成:
- Consumer 轮询
- 或 长轮询
👉 已经不是“纯粹的 Pub/Sub”了。
✅ 方案 2:WebSocket(非常适合)
这是 socket 的升级版 👍
为什么合适?
- 长连接
- 全双工
- 基于 TCP
- 语义清晰
架构几乎不变
1 | Producer --WS--> Broker <--WS-- Consumer |
Python 实现
1 | pip install websockets |
1 | await websocket.send(json.dumps(msg)) |
优点
- 比裸 socket 易用
- 浏览器 / 前端友好
- 非常适合实时消息
缺点
- 有第三方依赖
- 抽象略高(教学稍弱)
✅ 方案 3:gRPC(工业味儿最浓)
特点
- 强类型(proto)
- 双向流(stream)
- 高性能
- 标准化
1 | rpc Subscribe(Topic) returns (stream Message); |
优点
- 可维护性强
- 接口即文档
- 适合团队协作
缺点
- 学习成本高
- 对“最小可用”来说偏重
✅ 方案 4:消息中间件(Redis / MQTT / ZeroMQ)
这一步已经是:
“我不想自己造轮子了”
举例
| 技术 | 适合 |
|---|---|
| Redis Pub/Sub | 内网、小系统 |
| MQTT | IoT |
| ZeroMQ | 高性能通信 |
| Kafka | 高吞吐、持久化 |
👉 这时候你已经不是在“换协议”,而是在换系统形态。
那为什么不一开始就用 HTTP / WebSocket?
因为你当时的目标是:
理解 Pub/Sub 本质 + 三进程解耦
socket 有三个“教学级”优势:
- 看得见连接
- 看得见数据流
- 没有框架魔法
怎么选?给你一个实用对照表
| 场景 | 推荐 |
|---|---|
| 学原理 | socket |
| 浏览器 / 实时 UI | WebSocket |
| 内部 API | gRPC |
| 快速 demo | HTTP |
| 不想维护 | Redis / MQTT |
一句话总结(重点)
Pub/Sub ≠ 协议
socket 只是“最裸的运输层”,不是唯一选择。
kafka为什么不用websocket
这个问题已经直戳 Kafka 设计核心了 🔥
我们直接站在 Kafka 架构师的视角来回答:
Kafka 为什么不用 WebSocket?
它到底用的是什么?
先给结论(非常重要)
Kafka 用的是:
👉 自定义的二进制 TCP 协议(基于 socket)
而不是 WebSocket / HTTP。
而且这不是“历史包袱”,是非常主动、非常理性的选择。
Kafka 的真实目标是什么?(这是关键)
Kafka 从一开始就不是为「实时推送」设计的,而是为:
- 超高吞吐
- 顺序日志
- 磁盘友好
- 批量传输
- 可回放
服务的对象是:
数据管道,而不是交互式客户端
为什么 Kafka 不用 WebSocket?(逐条拆)
1️⃣ Kafka 的通信模型 ≠ 消息模型
WebSocket 的模型是:
1 | “来一条消息 → 马上推给你” |
Kafka 的模型是:
1 | “给我 offset X 之后的一大坨数据” |
Consumer 并不是等 Broker 推:
Consumer 是主动 pull 数据的
1 | Consumer ---> Fetch(offset, max_bytes) |
👉 这和 WebSocket 的推送语义是冲突的。
2️⃣ Kafka 是“批量 IO”,不是“消息 IO”
Kafka 的基本单位不是 message,而是:
RecordBatch
- 多条消息打包
- 顺序写磁盘
- 顺序读磁盘
- 顺序发网络
而 WebSocket 的 frame:
- 偏向“小消息”
- 有额外 framing
- 有 mask / opcode
👉 对 Kafka 来说,全是额外开销。
3️⃣ Kafka 需要极致控制 IO 行为
Kafka 做了很多 WebSocket 做不到的事:
- 零拷贝(
sendfile) - page cache 直读
- 批量压缩(lz4 / zstd)
- 精确控制 buffer
WebSocket 库通常:
- 封装 IO
- 隐藏 buffer
- 不允许零拷贝
👉 这是性能杀手。
4️⃣ Kafka 的客户端 ≠ 浏览器
WebSocket 的最大价值之一是:
浏览器可用
而 Kafka 的客户端是:
- Java 服务
- Flink / Spark
- 日志采集 agent
👉 Kafka 完全不需要 WebSocket 的“通用性”。
5️⃣ Kafka 的协议是“状态机 + offset”
Kafka 协议里大量内容是:
1 | - FetchRequest |
这是一个:
强状态、强一致性的协议
WebSocket 只是传输层,不提供这些语义,你还是得全自己实现。
那 Kafka 到底“用的是什么”?
Kafka 使用的是:
自定义二进制协议 + TCP socket
特点:
- 固定字段长度
- 网络字节序
- 无文本编码
- 可版本演进
示例(概念级)
1 | | size | api_key | api_version | correlation_id | payload | |
优点:
- 快
- 紧凑
- 可 forward compatibility
- 精确控制性能
Kafka 为什么选择“pull”,而不是“push”?
这是核心哲学。
Push 模型(WebSocket)
❌ Consumer 慢 → Broker 堆积
❌ Broker 要感知 Consumer 状态
❌ 背压复杂
Pull 模型(Kafka)
✅ Consumer 自己控制节奏
✅ 自然背压
✅ Broker 极度简单
Kafka 把复杂性推给 Consumer。