0%

使用python写一套最小可用的发布/订阅系统

本文循序渐进地实现一套“最小可用(MVP)”的发布 / 订阅(Pub/Sub)系统,用纯 Python、零第三方依赖,重点是:好懂、能跑、能扩展

最小可用实现

设计思路(非常简化)

我们只保留 Pub/Sub 的三要素:

  1. 主题(Topic)
  2. 订阅者(Subscriber):本质是一个回调函数
  3. 消息代理(Broker):负责管理订阅关系和消息分发

👉 发布者和订阅者互相不知道对方,只和 Broker 打交道。


最小可用实现

1️⃣ 消息代理(Broker)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Broker:
def __init__(self):
# topic -> list of subscribers
self.subscribers = {}

def subscribe(self, topic, callback):
"""订阅某个主题"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)

def publish(self, topic, message):
"""向主题发布消息"""
if topic not in self.subscribers:
return
for callback in self.subscribers[topic]:
callback(message)

2️⃣ 订阅者(Subscriber)

1
2
3
4
5
def subscriber_a(message):
print(f"[A] 收到消息: {message}")

def subscriber_b(message):
print(f"[B] 收到消息: {message}")

3️⃣ 发布者(Publisher)

1
2
3
def publisher(broker):
broker.publish("news", "Hello, Pub/Sub!")
broker.publish("news", "Python 真香")

4️⃣ 运行示例

1
2
3
4
5
6
7
8
9
if __name__ == "__main__":
broker = Broker()

# 订阅主题
broker.subscribe("news", subscriber_a)
broker.subscribe("news", subscriber_b)

# 发布消息
publisher(broker)

运行结果

1
2
3
4
[A] 收到消息: Hello, Pub/Sub!
[B] 收到消息: Hello, Pub/Sub!
[A] 收到消息: Python 真香
[B] 收到消息: Python 真香

🎉 这已经是一个完整、可工作的发布/订阅系统了。


这是“最小可用”,但你可以这样升级

如果你后面想继续玩大一点,可以按需加:

🔹 异步(asyncio)

1
async def publish(...)

🔹 取消订阅

1
unsubscribe(topic, callback)

🔹 多进程 / 网络版

  • queue.Queue
  • multiprocessing.Queue
  • socket / ZeroMQ / Redis

🔹 消息持久化

  • 文件
  • SQLite
  • Redis Stream

一句话总结

Pub/Sub 的本质:
用一个中介,把“谁发消息”和“谁收消息”彻底解耦。

加上生产者

先说清楚:加了「Topic 的生产者」意味着什么?

之前的版本是:

  • 任意发布者 👉 想往哪个 topic 发就往哪个发
  • Broker 不关心 在发

现在你想要的是:

某个 topic 只能由指定的生产者(Producer)来生产消息

也就是说要引入三层关系:

1
Producer  -->  Topic  -->  Subscriber

最小改动方案

核心改动点只有 3 个:

  1. Topic 需要注册生产者
  2. Producer 有身份(name / id)
  3. Broker 在 publish 时校验 producer 是否有权限

修改后的最小可用实现

1️⃣ Producer 类

1
2
3
class Producer:
def __init__(self, name):
self.name = name

2️⃣ Broker(关键改动)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Broker:
def __init__(self):
# topic -> list of subscribers
self.subscribers = {}

# topic -> producer_name
self.topic_producers = {}

def register_topic(self, topic, producer):
"""注册 topic 及其唯一生产者"""
self.topic_producers[topic] = producer.name

def subscribe(self, topic, callback):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)

def publish(self, topic, message, producer):
"""只有注册过的 producer 才能发布"""
if topic not in self.topic_producers:
raise ValueError(f"Topic {topic} 未注册")

if self.topic_producers[topic] != producer.name:
raise PermissionError(
f"Producer {producer.name} 无权向 {topic} 发布消息"
)

for callback in self.subscribers.get(topic, []):
callback(message)

3️⃣ Subscriber(不变)

1
2
3
4
5
def subscriber_a(msg):
print(f"[A] 收到: {msg}")

def subscriber_b(msg):
print(f"[B] 收到: {msg}")

4️⃣ 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if __name__ == "__main__":
broker = Broker()

producer_news = Producer("news_producer")
producer_ads = Producer("ads_producer")

# 注册 topic 及其生产者
broker.register_topic("news", producer_news)
broker.register_topic("ads", producer_ads)

# 订阅
broker.subscribe("news", subscriber_a)
broker.subscribe("news", subscriber_b)

# 合法发布
broker.publish("news", "今日头条", producer_news)

# 非法发布(会抛异常)
broker.publish("news", "广告插播", producer_ads)

这样设计有什么好处?

权限边界清晰
Topic 生命周期可控
更接近真实系统(Kafka / Pulsar)

在 Kafka 里对应的是:

这里 Kafka
Producer Producer
Topic 注册 Topic 创建
publish 校验 ACL / 权限

如果你想再“像点工业级”

可以继续加(但别一次全加 😄):

🔹 一个 topic 多个 producer

1
topic -> set(producer_name)

🔹 Producer 只能 publish,不能乱 register

1
register_topic 只允许 admin 调用

🔹 Topic schema(消息格式)

1
def publish(self, topic, message: dict)

一句话总结

引入 Topic 的生产者,本质是:
把“谁能往哪发消息”这件事,变成 Broker 的职责。

拆成 Producer / Broker / Consumer 三进程

这一步就真正像一个“能跑的分布式 Pub/Sub”了 👍
我们直接来一套 三进程 + TCP Socket最小可用版本

  • 🧠 Broker:独立进程,负责 topic、转发
  • ✍️ Producer:独立进程,只负责发消息
  • 👂 Consumer:独立进程,订阅并接收消息

目标

一台机器上,开 3 个终端,就能看到完整的 Pub/Sub 流程


整体架构

1
2
3
4
Producer  --->|
|----> Consumer A
|----> Consumer B
Broker <----|

通信方式:TCP + JSON(一行一条消息)


通信协议(极简)

所有消息都是一行 JSON:

1
2
{"type": "subscribe", "topic": "news"}
{"type": "publish", "topic": "news", "message": "hello"}

Broker 进程(broker.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import socket
import threading
import json

HOST = "127.0.0.1"
PORT = 9000

class Broker:
def __init__(self):
self.subscribers = {} # topic -> list of sockets
self.lock = threading.Lock()

def handle_client(self, conn):
with conn:
while True:
data = conn.recv(1024)
if not data:
break

msg = json.loads(data.decode())
msg_type = msg["type"]

if msg_type == "subscribe":
topic = msg["topic"]
with self.lock:
self.subscribers.setdefault(topic, []).append(conn)
print(f"订阅: {topic}")

elif msg_type == "publish":
topic = msg["topic"]
message = msg["message"]
print(f"发布: {topic} -> {message}")

with self.lock:
for sub in self.subscribers.get(topic, []):
sub.sendall(
(json.dumps({
"topic": topic,
"message": message
}) + "\n").encode()
)

def start(self):
server = socket.socket()
server.bind((HOST, PORT))
server.listen()
print("Broker 启动")

while True:
conn, _ = server.accept()
threading.Thread(
target=self.handle_client,
args=(conn,),
daemon=True
).start()

if __name__ == "__main__":
Broker().start()

Producer 进程(producer.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import socket
import json
import time

HOST = "127.0.0.1"
PORT = 9000

sock = socket.socket()
sock.connect((HOST, PORT))

for i in range(5):
msg = {
"type": "publish",
"topic": "news",
"message": f"第 {i} 条消息"
}
sock.sendall(json.dumps(msg).encode())
time.sleep(1)

sock.close()

Consumer 进程(consumer.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import socket
import json

HOST = "127.0.0.1"
PORT = 9000

sock = socket.socket()
sock.connect((HOST, PORT))

# 订阅
subscribe_msg = {
"type": "subscribe",
"topic": "news"
}
sock.sendall(json.dumps(subscribe_msg).encode())

print("等待消息...")

while True:
data = sock.recv(1024)
if not data:
break
msg = json.loads(data.decode())
print(f"收到: {msg['message']}")

如何运行(关键)

打开 3 个终端

1️⃣ 启动 Broker

1
python broker.py

2️⃣ 启动 Consumer

1
python consumer.py

(你可以开多个 consumer)

3️⃣ 启动 Producer

1
python producer.py

运行效果

1
2
3
4
5
Broker 启动
订阅: news
发布: news -> 第 0 条消息
发布: news -> 第 1 条消息
...

Consumer:

1
2
3
等待消息...
收到: 第 0 条消息
收到: 第 1 条消息

🎉 这就是一个真正的三进程 Pub/Sub 系统了


这个版本“刻意没做”的事(但你现在已经能加了)

✅ 鉴权 / Producer 权限

✅ Topic 注册

✅ 消息持久化

✅ 断线重连

✅ asyncio / epoll

✅ 多 Broker(集群)


一句话总结

拆成三进程后,Pub/Sub 的本质就变成:
Broker 是“路由器”,Producer/Consumer 都是客户端。

上面为什么用socket协议

完全可以不用 socket。
用 socket 只是因为:它是最小、最通用、最接近底层的选择。


为什么示例里用 socket?

1️⃣ socket 是所有网络协议的地基

几乎所有你听过的协议,最后都落在 socket 上:

协议 底层
HTTP TCP socket
gRPC HTTP/2 → TCP socket
WebSocket TCP socket
MQTT TCP socket

用 socket 的好处是:

  • 零依赖
  • 最少抽象
  • 你能完全控制通信模型

👉 非常适合 教学 / 理解原理 / MVP


2️⃣ Pub/Sub 的核心≠协议

Pub/Sub 真正关心的是:

  • 谁订阅了什么
  • 消息如何路由
  • 什么时候投递

用什么协议只是“运输方式”


可以换成什么?(按抽象层次从低到高)


✅ 方案 1:HTTP(最常见替代)

思路

  • Producer:POST /publish
  • Consumer:POST /subscribe
  • Broker:HTTP Server

示例结构

1
2
3
4
5
POST /publish
{
"topic": "news",
"message": "hello"
}

优点

  • 简单、熟
  • 防火墙友好
  • curl 就能调

缺点(关键)

HTTP 是短连接
❌ Broker 无法主动推消息给 Consumer

所以你必须改成:

  • Consumer 轮询
  • 长轮询

👉 已经不是“纯粹的 Pub/Sub”了。


✅ 方案 2:WebSocket(非常适合)

这是 socket 的升级版 👍

为什么合适?

  • 长连接
  • 全双工
  • 基于 TCP
  • 语义清晰

架构几乎不变

1
Producer  --WS--> Broker <--WS-- Consumer

Python 实现

1
pip install websockets
1
await websocket.send(json.dumps(msg))

优点

  • 比裸 socket 易用
  • 浏览器 / 前端友好
  • 非常适合实时消息

缺点

  • 有第三方依赖
  • 抽象略高(教学稍弱)

✅ 方案 3:gRPC(工业味儿最浓)

特点

  • 强类型(proto)
  • 双向流(stream)
  • 高性能
  • 标准化
1
rpc Subscribe(Topic) returns (stream Message);

优点

  • 可维护性强
  • 接口即文档
  • 适合团队协作

缺点

  • 学习成本高
  • 对“最小可用”来说偏重

✅ 方案 4:消息中间件(Redis / MQTT / ZeroMQ)

这一步已经是:

“我不想自己造轮子了”

举例

技术 适合
Redis Pub/Sub 内网、小系统
MQTT IoT
ZeroMQ 高性能通信
Kafka 高吞吐、持久化

👉 这时候你已经不是在“换协议”,而是在换系统形态


那为什么不一开始就用 HTTP / WebSocket?

因为你当时的目标是:

理解 Pub/Sub 本质 + 三进程解耦

socket 有三个“教学级”优势:

  1. 看得见连接
  2. 看得见数据流
  3. 没有框架魔法

怎么选?给你一个实用对照表

场景 推荐
学原理 socket
浏览器 / 实时 UI WebSocket
内部 API gRPC
快速 demo HTTP
不想维护 Redis / MQTT

一句话总结(重点)

Pub/Sub ≠ 协议
socket 只是“最裸的运输层”,不是唯一选择。

kafka为什么不用websocket

这个问题已经直戳 Kafka 设计核心了 🔥
我们直接站在 Kafka 架构师的视角来回答:

Kafka 为什么不用 WebSocket?
它到底用的是什么?


先给结论(非常重要)

Kafka 用的是:
👉 自定义的二进制 TCP 协议(基于 socket)
而不是 WebSocket / HTTP。

而且这不是“历史包袱”,是非常主动、非常理性的选择


Kafka 的真实目标是什么?(这是关键)

Kafka 从一开始就不是为「实时推送」设计的,而是为:

  • 超高吞吐
  • 顺序日志
  • 磁盘友好
  • 批量传输
  • 可回放

服务的对象是:

数据管道,而不是交互式客户端


为什么 Kafka 不用 WebSocket?(逐条拆)


1️⃣ Kafka 的通信模型 ≠ 消息模型

WebSocket 的模型是:

1
“来一条消息 → 马上推给你”

Kafka 的模型是:

1
“给我 offset X 之后的一大坨数据”

Consumer 并不是等 Broker 推:

Consumer 是主动 pull 数据的

1
2
Consumer ---> Fetch(offset, max_bytes)
Broker ---> DataBatch

👉 这和 WebSocket 的推送语义是冲突的。


2️⃣ Kafka 是“批量 IO”,不是“消息 IO”

Kafka 的基本单位不是 message,而是:

RecordBatch

  • 多条消息打包
  • 顺序写磁盘
  • 顺序读磁盘
  • 顺序发网络

而 WebSocket 的 frame:

  • 偏向“小消息”
  • 有额外 framing
  • 有 mask / opcode

👉 对 Kafka 来说,全是额外开销。


3️⃣ Kafka 需要极致控制 IO 行为

Kafka 做了很多 WebSocket 做不到的事:

  • 零拷贝(sendfile
  • page cache 直读
  • 批量压缩(lz4 / zstd)
  • 精确控制 buffer

WebSocket 库通常:

  • 封装 IO
  • 隐藏 buffer
  • 不允许零拷贝

👉 这是性能杀手。


4️⃣ Kafka 的客户端 ≠ 浏览器

WebSocket 的最大价值之一是:

浏览器可用

而 Kafka 的客户端是:

  • Java 服务
  • Flink / Spark
  • 日志采集 agent

👉 Kafka 完全不需要 WebSocket 的“通用性”。


5️⃣ Kafka 的协议是“状态机 + offset”

Kafka 协议里大量内容是:

1
2
3
4
- FetchRequest
- ProduceRequest
- OffsetCommit
- GroupCoordinator

这是一个:

强状态、强一致性的协议

WebSocket 只是传输层,不提供这些语义,你还是得全自己实现。


那 Kafka 到底“用的是什么”?

Kafka 使用的是:

自定义二进制协议 + TCP socket

特点:

  • 固定字段长度
  • 网络字节序
  • 无文本编码
  • 可版本演进

示例(概念级)

1
| size | api_key | api_version | correlation_id | payload |

优点:

  • 紧凑
  • 可 forward compatibility
  • 精确控制性能

Kafka 为什么选择“pull”,而不是“push”?

这是核心哲学。

Push 模型(WebSocket)

❌ Consumer 慢 → Broker 堆积
❌ Broker 要感知 Consumer 状态
❌ 背压复杂

Pull 模型(Kafka)

✅ Consumer 自己控制节奏
✅ 自然背压
✅ Broker 极度简单

Kafka 把复杂性推给 Consumer。