Event Bus & Service Bus

  • 时间:2025-10-27 22:01 作者: 来源: 阅读:4
  • 扫一扫,手机访问
摘要:EventBus eventbus作为kubeedge的edge部分与MQTT进行交互的门户,有必要将eventbus相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的协助。eventbus的具体业务逻辑主要聚焦在启动过程中,本节就侧重分析eventbus启动流程: ● eventbus的struct调用链剖析 ● e

EventBus

eventbus作为kubeedge的edge部分与MQTT进行交互的门户,有必要将eventbus相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的协助。eventbus的具体业务逻辑主要聚焦在启动过程中,本节就侧重分析eventbus启动流程:
●   eventbus的struct调用链剖析
●   eventbus的具体逻辑剖析

eventbus的struct调用链剖析
从eventbus的模块注册函数入手:
kubeedge/edge/pkg/eventbus/event_bus.go

// Register register eventbus
func Register() {
    mode, err := config.CONFIG.GetValue("mqtt.mode").ToInt()
    if err != nil || mode > externalMqttMode || mode < internalMqttMode {
        mode = internalMqttMode
    }
    edgeEventHubModule := eventbus{mqttMode: mode}
    core.Register(&edgeEventHubModule)
}

注册函数中做了2件事:

  1. 从配置文件中获取mqtt.mode,并对其进行判断

mode, err := config.CONFIG.GetValue("mqtt.mode").ToInt()
if err != nil || mode > externalMqttMode || mode < internalMqttMode {
    mode = internalMqttMode
}

mqtt.mode的具体定义如下:

kubeedge/edge/pkg/eventbus/event_bus.go

const (
    internalMqttMode = iota // 0: launch an internal mqtt broker.
    bothMqttMode            // 1: launch an internal and external mqtt broker.
    externalMqttMode        // 2: launch an external mqtt broker.
    ...
)

mqtt.mode定义分internalMqttMode、bothMqttMode和externalMqttMode三种:
●   externalMqttMode 启动内部mqtt代理;
●   bothMqttMode 同时启动内部和外部mqtt代理
●   externalMqttMode 启动外部mqtt代理。

  1. 实例化eventbus并将其注册

edgeEventHubModule := eventbus{mqttMode: mode}
core.Register(&edgeEventHubModule)

进入eventbus定义:

kubeedge/edge/pkg/eventbus/event_bus.go

// eventbus struct
type eventbus struct {
    context  *context.Context
    mqttMode int
}

eventbus的具体逻辑剖析

从eventbus的启动函数切入分析具体逻辑:

kubeedge/edge/pkg/eventbus/event_bus.go

func (eb *eventbus) Start(c *context.Context) {
// no need to call TopicInit now, we have fixed topic
eb.context = c

    nodeID := config.CONFIG.GetConfigurationByKey("edgehub.controller.node-id")
    ...

    mqttBus.NodeID = nodeID.(string)
    mqttBus.ModuleContext = c

    if eb.mqttMode >= bothMqttMode {
    // launch an external mqtt server
        externalMqttURL := config.CONFIG.GetConfigurationByKey("mqtt.server")
        ...
        hub := &mqttBus.Client{
            MQTTUrl: externalMqttURL.(string),
        }
        mqttBus.MQTTHub = hub
        hub.InitSubClient()
        hub.InitPubClient()
    }

    if eb.mqttMode <= bothMqttMode {
        internalMqttURL := config.CONFIG.GetConfigurationByKey("mqtt.internal-server")
        ...
        qos := config.CONFIG.GetConfigurationByKey("mqtt.qos")
        ...
        retain := config.CONFIG.GetConfigurationByKey("mqtt.retain")
        ...

        sessionQueueSize := config.CONFIG.GetConfigurationByKey("mqtt.session-queue-size")
        ...

        if qos.(int) < int(packet.QOSAtMostOnce) || qos.(int) > int(packet.QOSExactlyOnce) || sessionQueueSize.(int) <= 0 {
            klog.Errorf("mqtt.qos must be one of [0,1,2] or mqtt.session-queue-size must > 0")
            os.Exit(1)
        }
        // launch an internal mqtt server only
        mqttServer = mqttBus.NewMqttServer(sessionQueueSize.(int), internalMqttURL.(string), retain.(bool), qos.(int))
        mqttServer.InitInternalTopics()
        err := mqttServer.Run()
        ...
    }

    eb.pubCloudMsgToEdge()
}

eventbus的启动函数做了如下3件事:

  1. 处理eventbus模块的公共配置

eb.context = c

nodeID := config.CONFIG.GetConfigurationByKey("edgehub.controller.node-id")
...

mqttBus.NodeID = nodeID.(string)
mqttBus.ModuleContext = c

接收并存储与edgecore其他模块通信的管道,从配置文件中获取所在节点的唯一标识。

  1. 根据不同mqttMode启动与mqtt交互的不同实例
    config:     &config.GetConfig().CtrConfig

进入config.GetConfig()函数定义:

kubeedge/edge/pkg/edgehub/config/config.go

if eb.mqttMode >= bothMqttMode {
    ...
        }

if eb.mqttMode <= bothMqttMode {

    ...
}

当 eb.mqttMode >= bothMqttMode将mqtt代理启动在eventbus之外,eventbus作为独立启动的mqtt代理的客户端与其交互;当eb.mqttMode <= bothMqttMode时,在eventbus内启动一个mqtt代理,负责与终端设备交互。对于两种情况的具体逻辑感兴趣的同学可以在本文的基础上自行分析。

  1. 将云部分的指令和事件下发到与eventbus相连的设备
    eb.pubCloudMsgToEdge()

servicebus

ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求,
与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。
代码逻辑
servicebus的功能比较简单,根据接收到的消息调用本地服务的HTTP端口

func (sb *servicebus) Start(c *beehiveContext.Context) {
    // no need to call TopicInit now, we have fixed topic
    var ctx context.Context
    sb.context = c
    ctx, sb.cancel = context.WithCancel(context.Background())
    var htc = new(http.Client)
    htc.Timeout = time.Second * 10

    var uc = new(util.URLClient)
    uc.Client = htc

    //Get message from channel
    for {
        select {
        case <-ctx.Done():
            klog.Warning("ServiceBus stop")
            return
        default:

        }
        msg, err := sb.context.Receive("servicebus")
        if err != nil {
            klog.Warningf("servicebus receive msg error %v", err)
            continue
        }
        go func() {
            klog.Infof("ServiceBus receive msg")
            source := msg.GetSource()
            if source != sourceType {
                return
            }
            resource := msg.GetResource()
            r := strings.Split(resource, ":")
            if len(r) != 2 {
                m := "the format of resource " + resource + " is incorrect"
                klog.Warningf(m)
                code := http.StatusBadRequest
                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
                    sb.context.SendToGroup(modules.HubGroup, response)
                }
                return
            }
            content, err := json.Marshal(msg.GetContent())
            if err != nil {
                klog.Errorf("marshall message content failed %v", err)
                m := "error to marshal request msg content"
                code := http.StatusBadRequest
                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
                    sb.context.SendToGroup(modules.HubGroup, response)
                }
                return
            }
            var httpRequest util.HTTPRequest
            if err := json.Unmarshal(content, &httpRequest); err != nil {
                m := "error to parse http request"
                code := http.StatusBadRequest
                klog.Errorf(m, err)
                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
                    sb.context.SendToGroup(modules.HubGroup, response)
                }
                return
            }
            operation := msg.GetOperation()
            targetURL := "http://127.0.0.1:" + r[0] + "/" + r[1]
            resp, err := uc.HTTPDo(operation, targetURL, httpRequest.Header, httpRequest.Body)
            if err != nil {
                m := "error to call service"
                code := http.StatusNotFound
                klog.Errorf(m, err)
                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
                    sb.context.SendToGroup(modules.HubGroup, response)
                }
                return
            }
            resp.Body = http.MaxBytesReader(nil, resp.Body, maxBodySize)
            resBody, err := ioutil.ReadAll(resp.Body)
            if err != nil {
                if err.Error() == "http: request body too large" {
                    err = fmt.Errorf("response body too large")
                }
                m := "error to receive response, err: " + err.Error()
                code := http.StatusInternalServerError
                klog.Errorf(m, err)
                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
                    sb.context.SendToGroup(modules.HubGroup, response)
                }
                return
            }

            response := util.HTTPResponse{Header: resp.Header, StatusCode: resp.StatusCode, Body: resBody}
            responseMsg := model.NewMessage(msg.GetID())
            responseMsg.Content = response
            responseMsg.SetRoute("servicebus", modules.UserGroup)
            sb.context.SendToGroup(modules.HubGroup, *responseMsg)
        }()
    }
}

根据代码分为以下步骤:
●   拿到回传的beehiveContext,初始化http连接,设置超时十秒
●   初始化URLClient
●   从beehiveContext 里面接收接收servicebus的消息,获撤销息来源,来源必须是router_rest
●   然后获撤销息包含的resource,根据代码可以看出resource是一个以冒号分割的字符串,根据后续代码可以知道,实际上就是 port:url
●   获取对应的消息内容,该内容是一个json,最终被反序列化为
type HTTPRequest struct {
Header http.Header json:"header"
Body   []byte      json:"body"
}
●   获取动作 - 即http的 get/post/put...
●   发出请求接收返回,判断返回数据量大小,最大为 5*1e6个字节,超过就报错
●   以接收到的消息ID作为父ID通过返回数据给edgehub

  • 全部评论(0)
最新发布的资讯信息
【系统环境|】Spring Boot3 中实现按模板导出 Word 文档合同的技术指南(2025-10-30 16:04)
【系统环境|】openPangu-Ultra-MoE-718B-V1.1今日正式开源,部署指南来啦!(2025-10-30 16:03)
【系统环境|】Ubuntu + vLLM + DeepSeek 本地部署完全指南(2025-10-30 16:03)
【系统环境|】如何用公众号AI编辑器实现一键排版?一份完整的5步指南(2025-10-30 16:02)
【系统环境|】Spring Boot 与 Nacos 完美整合指南(2025-10-30 16:01)
【系统环境|】Rust MCP开发指南:让AI与应用对话的桥梁(2025-10-30 16:00)
【系统环境|】MCP Server 开发实战指南(2025-10-30 15:59)
【系统环境|】入门指南:使用 Playwright MCP Server 为你的 AI Agent 赋予能力(2025-10-30 15:58)
【系统环境|】一个IT女搬砖工的情人节爱心礼物指南及衍伸 v16.02.14(2025-10-30 15:57)
【系统环境|】百元矿渣显卡淘金全指南(2025-10-30 15:57)
手机二维码手机访问领取大礼包
返回顶部