首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >python: Broadcast Pattern

python: Broadcast Pattern

作者头像
geovindu
发布2026-06-18 12:41:21
发布2026-06-18 12:41:21
120
举报

项目结构:

代码语言:javascript
复制
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:11
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : exceptions.py
 
 
class BroadcastError(Exception):
    """
    广播基础异常
    """
    pass
 
class SubscriberError(BroadcastError):
    """
    订阅者异常
    """
    pass
 
 
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:15
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : broadcast.py
 
from typing import List
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessage
from BroadcastPattern.core.exceptions import SubscriberError
 
class BroadcastEngine(object):
    """
    广播核心引擎(单例、线程安全)
    """
    _instance = None
 
    def __new__(cls):
        # 单例模式:全局唯一广播中心
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._subscribers: List[Subscriber] = []
        return cls._instance
 
    def subscribe(self, subscriber: Subscriber) -> None:
        """
 
        :param subscriber:
        :return:
        """
        if not subscriber:
            raise SubscriberError("订阅者不能为空")
        if subscriber not in self._subscribers:
            self._subscribers.append(subscriber)
 
    def unsubscribe(self, subscriber: Subscriber) -> None:
        """
 
        :param subscriber:
        :return:
        """
        if subscriber in self._subscribers:
            self._subscribers.remove(subscriber)
 
    def broadcast(self, message: JewelryMessage) -> None:
        """
        向所有订阅者同时广播消息"
        :param message:
        :return:
        """
        print(f"\n📢 【广播引擎】开始全局广播:{message.title}\n")
        for sub in self._subscribers:
            try:
                sub.on_receive(message)
            except Exception as e:
                print(f"⚠️ {sub.name} 处理消息失败:{str(e)}")
代码语言:javascript
复制
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:13
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : models.py
from dataclasses import dataclass
 
 
@dataclass(frozen=True)
class JewelryMessage:
    """
    珠宝行业标准广播消息(不可变、类型安全) 实体
    """
    title: str
    content: str
    product: str
    material: str
    batch: str
    standard: str
    warehouse_location: str
    marketing_content: str
 
 
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:12
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : subscriber.py
 
 
from abc import ABC, abstractmethod
from BroadcastPattern.message.models import JewelryMessage
 
 
class Subscriber(ABC):
    """
    订阅者接口(所有业务系统必须实现)
    """
    @property
    @abstractmethod
    def name(self) -> str:
        """
        系统名称
        :return:
        """
        pass
 
    @abstractmethod
    def on_receive(self, message: JewelryMessage) -> None:
        """
        接收广播消息
        :param message:
        :return:
        """
        pass
代码语言:javascript
复制
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:17
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : procurement.py
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessage
 
class ProcurementSystem(Subscriber):
    """
    采购系统
    """
    @property
    def name(self) -> str:
        return "原料采购系统"
 
    def on_receive(self, message: JewelryMessage) -> None:
        print(f"📦【{self.name}】已同步原料溯源:{message.material}")
 
 
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:18
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : production.py
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessage
 
class ProductionSystem(Subscriber):
    """
    生产系统
    """
    @property
    def name(self) -> str:
        return "生产加工系统"
 
    def on_receive(self, message: JewelryMessage) -> None:
        print(f"⚙️【{self.name}】已排产:{message.product} 批次 {message.batch}")
 
 
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:19
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : quality.py
 
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessage
 
class QCSystem(Subscriber):
    """
    质检系统
    """
    @property
    def name(self) -> str:
        return "质量检测系统"
 
    def on_receive(self, message: JewelryMessage) -> None:
        print(f"🔍【{self.name}】已加载质检标准:{message.standard}")
 
 
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:20
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : warehouse.py
 
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessage
 
class WarehouseSystem(Subscriber):
    """
    仓储系统
    """
    @property
    def name(self) -> str:
        return "仓储管理系统"
 
    def on_receive(self, message: JewelryMessage) -> None:
        print(f"📦【{self.name}】已预留仓位:{message.warehouse_location}")
 
 
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:21
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : sales.py
 
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessage
 
class StoreSalesSystem(Subscriber):
    """
     门店销售系统
    """
    @property
    def name(self) -> str:
        return "全国门店销售系统"
 
    def on_receive(self, message: JewelryMessage) -> None:
        print(f"🏬【{self.name}】已上架新品:{message.product}")
 
 
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:22
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : marketing.py
 
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessage
 
class MemberMarketingSystem(Subscriber):
    """
    会员营销系统
    """
    @property
    def name(self) -> str:
        return "会员营销系统"
 
    def on_receive(self, message: JewelryMessage) -> None:
        print(f"🎯【{self.name}】已推送:{message.marketing_content}") 

调用:

代码语言:javascript
复制
# encoding: utf-8
# 版权所有  2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author    : geovindu,Geovin Du 涂聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/6/7 23:23
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : BroadcastBll.py
from BroadcastPattern.core.broadcast import BroadcastEngine
from BroadcastPattern.message.models import JewelryMessage
from BroadcastPattern.business.procurement import ProcurementSystem
from BroadcastPattern.business.production import ProductionSystem
from BroadcastPattern.business.quality import QCSystem
from BroadcastPattern.business.warehouse import WarehouseSystem
from BroadcastPattern.business.sales import StoreSalesSystem
from BroadcastPattern.business.marketing import MemberMarketingSystem
 
 
class BroadcastBll(object):
    """
 
    """
    def demo(self):
        """
 
        :return:
        """
        # 1. 初始化全局广播引擎
        engine = BroadcastEngine()
 
        # 2. 初始化所有珠宝业务系统
        systems = [
            ProcurementSystem(),
            ProductionSystem(),
            QCSystem(),
            WarehouseSystem(),
            StoreSalesSystem(),
            MemberMarketingSystem()
        ]
 
        # 3. 全部订阅广播
        for sys in systems:
            engine.subscribe(sys)
            print(f"✅ 已订阅:{sys.name}")
 
        # 4. 构造行业标准消息
        message = JewelryMessage(
            title="2025春季冰种翡翠手镯全国上市",
            content="天然A货翡翠,统一标准、统一定价、同步发售",
            product="冰种翡翠手镯",
            material="缅甸天然翡翠",
            batch="JC20250415-001",
            standard="GB/T 16552-2017 珠宝玉石鉴定",
            warehouse_location="广州总部仓-A03-07",
            marketing_content="VIP会员专享9折+免费刻字"
        )
 
        # 5. 执行广播
        engine.broadcast(message)
 
        print("\n🎉 企业级广播完成:全业务链同步成功")

输出:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档