eventbus作为kubeedge的edge部分与MQTT进行交互的门户,有必要将eventbus相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的协助。eventbus的具体业务逻辑主要聚焦在启动过程中,本节就侧重分析eventbus启动流程:
● eventbus的struct调用链剖析
● eventbus的具体逻辑剖析
eventbus的struct调用链剖析
从eventbus的模块注册函数入手:
kubeedge/edge/pkg/eventbus/event_bus.go
// Register register eventbus
func Register() {
mode, err := config.CONFIG.GetValue("mqtt.mode").ToInt()
if err != nil || mode > externalMqttMode || mode < internalMqttMode {
mode = internalMqttMode
}
edgeEventHubModule := eventbus{mqttMode: mode}
core.Register(&edgeEventHubModule)
}注册函数中做了2件事:
从配置文件中获取mqtt.mode,并对其进行判断
mode, err := config.CONFIG.GetValue("mqtt.mode").ToInt()
if err != nil || mode > externalMqttMode || mode < internalMqttMode {
mode = internalMqttMode
}mqtt.mode的具体定义如下:
kubeedge/edge/pkg/eventbus/event_bus.go
const ( internalMqttMode = iota // 0: launch an internal mqtt broker. bothMqttMode // 1: launch an internal and external mqtt broker. externalMqttMode // 2: launch an external mqtt broker. ... )
mqtt.mode定义分internalMqttMode、bothMqttMode和externalMqttMode三种:
● externalMqttMode 启动内部mqtt代理;
● bothMqttMode 同时启动内部和外部mqtt代理
● externalMqttMode 启动外部mqtt代理。
实例化eventbus并将其注册
edgeEventHubModule := eventbus{mqttMode: mode}
core.Register(&edgeEventHubModule)进入eventbus定义:
kubeedge/edge/pkg/eventbus/event_bus.go
// eventbus struct
type eventbus struct {
context *context.Context
mqttMode int
}eventbus的具体逻辑剖析
从eventbus的启动函数切入分析具体逻辑:
kubeedge/edge/pkg/eventbus/event_bus.go
func (eb *eventbus) Start(c *context.Context) {
// no need to call TopicInit now, we have fixed topic
eb.context = c
nodeID := config.CONFIG.GetConfigurationByKey("edgehub.controller.node-id")
...
mqttBus.NodeID = nodeID.(string)
mqttBus.ModuleContext = c
if eb.mqttMode >= bothMqttMode {
// launch an external mqtt server
externalMqttURL := config.CONFIG.GetConfigurationByKey("mqtt.server")
...
hub := &mqttBus.Client{
MQTTUrl: externalMqttURL.(string),
}
mqttBus.MQTTHub = hub
hub.InitSubClient()
hub.InitPubClient()
}
if eb.mqttMode <= bothMqttMode {
internalMqttURL := config.CONFIG.GetConfigurationByKey("mqtt.internal-server")
...
qos := config.CONFIG.GetConfigurationByKey("mqtt.qos")
...
retain := config.CONFIG.GetConfigurationByKey("mqtt.retain")
...
sessionQueueSize := config.CONFIG.GetConfigurationByKey("mqtt.session-queue-size")
...
if qos.(int) < int(packet.QOSAtMostOnce) || qos.(int) > int(packet.QOSExactlyOnce) || sessionQueueSize.(int) <= 0 {
klog.Errorf("mqtt.qos must be one of [0,1,2] or mqtt.session-queue-size must > 0")
os.Exit(1)
}
// launch an internal mqtt server only
mqttServer = mqttBus.NewMqttServer(sessionQueueSize.(int), internalMqttURL.(string), retain.(bool), qos.(int))
mqttServer.InitInternalTopics()
err := mqttServer.Run()
...
}
eb.pubCloudMsgToEdge()
}eventbus的启动函数做了如下3件事:
处理eventbus模块的公共配置
eb.context = c
nodeID := config.CONFIG.GetConfigurationByKey("edgehub.controller.node-id")
...
mqttBus.NodeID = nodeID.(string)
mqttBus.ModuleContext = c接收并存储与edgecore其他模块通信的管道,从配置文件中获取所在节点的唯一标识。
根据不同mqttMode启动与mqtt交互的不同实例
config: &config.GetConfig().CtrConfig
进入config.GetConfig()函数定义:
kubeedge/edge/pkg/edgehub/config/config.go
if eb.mqttMode >= bothMqttMode {
...
}
if eb.mqttMode <= bothMqttMode {
...
}当 eb.mqttMode >= bothMqttMode将mqtt代理启动在eventbus之外,eventbus作为独立启动的mqtt代理的客户端与其交互;当eb.mqttMode <= bothMqttMode时,在eventbus内启动一个mqtt代理,负责与终端设备交互。对于两种情况的具体逻辑感兴趣的同学可以在本文的基础上自行分析。
将云部分的指令和事件下发到与eventbus相连的设备
eb.pubCloudMsgToEdge()
ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求,
与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。
代码逻辑
servicebus的功能比较简单,根据接收到的消息调用本地服务的HTTP端口
func (sb *servicebus) Start(c *beehiveContext.Context) {
// no need to call TopicInit now, we have fixed topic
var ctx context.Context
sb.context = c
ctx, sb.cancel = context.WithCancel(context.Background())
var htc = new(http.Client)
htc.Timeout = time.Second * 10
var uc = new(util.URLClient)
uc.Client = htc
//Get message from channel
for {
select {
case <-ctx.Done():
klog.Warning("ServiceBus stop")
return
default:
}
msg, err := sb.context.Receive("servicebus")
if err != nil {
klog.Warningf("servicebus receive msg error %v", err)
continue
}
go func() {
klog.Infof("ServiceBus receive msg")
source := msg.GetSource()
if source != sourceType {
return
}
resource := msg.GetResource()
r := strings.Split(resource, ":")
if len(r) != 2 {
m := "the format of resource " + resource + " is incorrect"
klog.Warningf(m)
code := http.StatusBadRequest
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
content, err := json.Marshal(msg.GetContent())
if err != nil {
klog.Errorf("marshall message content failed %v", err)
m := "error to marshal request msg content"
code := http.StatusBadRequest
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
var httpRequest util.HTTPRequest
if err := json.Unmarshal(content, &httpRequest); err != nil {
m := "error to parse http request"
code := http.StatusBadRequest
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
operation := msg.GetOperation()
targetURL := "http://127.0.0.1:" + r[0] + "/" + r[1]
resp, err := uc.HTTPDo(operation, targetURL, httpRequest.Header, httpRequest.Body)
if err != nil {
m := "error to call service"
code := http.StatusNotFound
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
resp.Body = http.MaxBytesReader(nil, resp.Body, maxBodySize)
resBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
if err.Error() == "http: request body too large" {
err = fmt.Errorf("response body too large")
}
m := "error to receive response, err: " + err.Error()
code := http.StatusInternalServerError
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
response := util.HTTPResponse{Header: resp.Header, StatusCode: resp.StatusCode, Body: resBody}
responseMsg := model.NewMessage(msg.GetID())
responseMsg.Content = response
responseMsg.SetRoute("servicebus", modules.UserGroup)
sb.context.SendToGroup(modules.HubGroup, *responseMsg)
}()
}
}根据代码分为以下步骤:
● 拿到回传的beehiveContext,初始化http连接,设置超时十秒
● 初始化URLClient
● 从beehiveContext 里面接收接收servicebus的消息,获撤销息来源,来源必须是router_rest
● 然后获撤销息包含的resource,根据代码可以看出resource是一个以冒号分割的字符串,根据后续代码可以知道,实际上就是 port:url
● 获取对应的消息内容,该内容是一个json,最终被反序列化为
type HTTPRequest struct {
Header http.Header json:"header"
Body []byte json:"body"
}
● 获取动作 - 即http的 get/post/put...
● 发出请求接收返回,判断返回数据量大小,最大为 5*1e6个字节,超过就报错
● 以接收到的消息ID作为父ID通过返回数据给edgehub
¥435.00
【预售】Reforming the Governance of the IMF and the World
¥1589.00
日本直邮Coach包女士手提包Outlet红色Dempsey抽绳水桶包CG532IMF
¥689.00
COACH 红色女士腰带 C1725IMF8QM
¥1369.00
【按需印刷】The IMF-Supported Programs in Transition Economies
¥1279.00
COACH 蔻驰腰带 女士红色字母皮带 C1725-IMF8Q
¥490.00
预订Reforming the Governance of the IMF and the World Bank