天天看點

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

文章目錄

  • mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體
    • 1. mongos、nanomsg簡述
    • 2. zeroMQ、nanomsg和可擴充協定
        • PAIR(雙向通信)
        • REQREP(用戶端請求、伺服器回複)
        • PIPELINE(單向資料流)
        • BUS(多對多通信)
        • PUBSUB(主題廣播)
        • SURVEY(向小組提問)
    • 3. mongos及執行個體

1. mongos、nanomsg簡述

來自:https://cloud.tencent.com/developer/article/1096304

nanomsg是一個消息協定SP ("Scalable Protocols"可擴充協定)的c語言實作,而mangos用golang實作了SP (“Scalable Protocols”)。

消息協定不同于通常我們說的消息隊列,是指一個簡單的傳輸會話協定。

mangos重點也是替代直接手寫TCP,實作各種場合的通訊範式。

推薦:https://bravenewgeek.com/fast-scalable-networking-in-go-with-mangos/

那麼mangos、nanomsg有何優點麼?

主要是:簡單、抽象合理、相容多種語言、輕量級、學習成本低、比自己造的輪子好用很多。

了解的誤區:mangos/nanomsg并不是消息隊列,也不是RPC架構。

2. zeroMQ、nanomsg和可擴充協定

https://bravenewgeek.com/a-look-at-nanomsg-and-scalability-protocols/

可以簡單了解這些網絡架構和協定是對TCP、PGM、IPC、ITC等協定的封裝,提供新的接口便于分布式環境下的通信,而zeroMQ較差的可擴充性将其局限于某一些協定,為了解決擴充性等一些其它的問題出現了Nanomsg,Nanomsg 通過為傳輸和消息傳遞協定提供可插入的接口來解決這個問題。這意味着支援超出标準集 PUB/SUB、REQ/REP 等的新傳輸(例如 WebSockets)和新消息模式。

也許最有趣的是 nanomsg 與 ZeroMQ 的哲學背離。nanomsg 不是作為一個通用的網絡庫,而是打算通過實作所謂的“可擴充性協定”來提供用于建構可擴充和高性能分布式系統的“樂高積木”。這些可擴充協定是通信模式,它們是網絡堆棧傳輸層之上的抽象。這些協定彼此完全分離,是以每個協定都可以展現明确定義的分布式算法。正如 nanomsg 的作者 Martin Sustrik 所說,其目的是通過IETF标準化協定規範。(zeroMQ可以了解成網絡庫,而NanoMsg更像按照應用場景定義新的可擴充協定,具體選擇那一種則需要看應用場景,就分布式場景下NanoMsg的可擴充性可能更适合分布式複雜多變場景)

Nanomsg 目前定義了六種不同的可擴充性協定:PAIR、REQREP、PIPELINE、BUS、PUBSUB 和 SURVEY。

PAIR(雙向通信)

PAIR 在兩個端點之間實作簡單的一對一、雙向通信。兩個節點可以互相發送消息。

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

REQREP(用戶端請求、伺服器回複)

REQREP 協定定義了一種用于建構無狀态服務來處理使用者請求的模式。用戶端發送請求,伺服器接收請求,進行一些處理,然後傳回響應。

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

PIPELINE(單向資料流)

PIPELINE 提供單向資料流,這對于建立負載平衡的處理管道非常有用。生産者節點送出分布在消費者節點之間的工作。

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

BUS(多對多通信)

BUS 允許從每個對等點發送的消息傳遞到組中的每個其他對等點。

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

PUBSUB(主題廣播)

PUBSUB 允許釋出者向零個或多個訂閱者多點傳播消息。訂閱者可以連接配接到多個釋出者,可以訂閱特定的主題,允許他們隻接收與他們相關的消息。

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

SURVEY(向小組提問)

最後一個可擴充性協定是 SURVEY。SURVEY 模式與 PUBSUB 的相似之處在于來自一個節點的消息被廣播到整個組,但不同的是組中的每個節點都 響應該消息。這開辟了各種各樣的應用程式,因為它使您可以快速輕松地一次性查詢大量系統的狀态。調查受訪者必須在調查員配置的時間視窗内做出回應。

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

這裡面文章最後也提到了關于c還是c++還開發zeroMQ的說明,了解這些内容感覺也是蠻有意思的。

3. mongos及執行個體

關于zeroMQ、manomsg的分析我們不再繼續,感興趣的可以在上面的連結中繼續檢視(https://bravenewgeek.com/a-look-at-nanomsg-and-scalability-protocols/),感覺分析的還是比較全面的。我們項目中使用了mongos用于邊緣網關和攝像頭的通信以及提供了人臉分析結果接口(使用的PUBSUB的通信方式,邊緣網關中部分程序通過onvif協定拉取攝像頭的流資料後通過ffmpeg和gstreamer進行處理來檢測人臉并分析對應特征資料,之後通過pub的方式提供資料,而另一些人臉抓拍處理上報或者關聯業務處理的程序則是通過mongos來sub人臉屬性資訊的,這種方式很像MQTT的協定處理方式,但是我們不需要專門搭建中間件,僅僅隻是用于程序間通信,是以Nanomsg基于場景分類的可擴充協定是非常棒的,非常适合不同的通信場景)。

可以看這裡:https://bravenewgeek.com/fast-scalable-networking-in-go-with-mangos/

這裡給了PUBSUB和SURVEY的執行個體,當然,也可以在mongos的GitHub(https://github.com/nanomsg/mangos-v1/tree/v1.4.0/examples)上去檢視,目前看似乎國内使用的還不是很多或者說中文資料不是很多,但從場景來看還是有很多場景可以使用的。

建立檔案test.go:

// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// pubsub implements a publish/subscribe example.  server is a listening
// pub socket, and clients are dialing sub sockets.
//
// To use:
//
//   $ go build .
//   $ url=tcp://127.0.0.1:40899
//   $ ./pubsub server $url server & server=$! && sleep 1
//   $ ./pubsub client $url client0 & client0=$!
//   $ ./pubsub client $url client1 & client1=$!
//   $ ./pubsub client $url client2 & client2=$!
//   $ sleep 5
//   $ kill $server $client0 $client1 $client2
//
package main

import (
	"fmt"
	"os"
	"time"

	"nanomsg.org/go-mangos"
	"nanomsg.org/go-mangos/protocol/pub"
	"nanomsg.org/go-mangos/protocol/sub"
	"nanomsg.org/go-mangos/transport/ipc"
	"nanomsg.org/go-mangos/transport/tcp"
)

func die(format string, v ...interface{}) {
	fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
	os.Exit(1)
}

func date() string {
	return time.Now().Format(time.ANSIC)
}

func server(url string) {
	var sock mangos.Socket
	var err error
	if sock, err = pub.NewSocket(); err != nil {
		die("can't get new pub socket: %s", err)
	}
	sock.AddTransport(ipc.NewTransport())
	sock.AddTransport(tcp.NewTransport())
	if err = sock.Listen(url); err != nil {
		die("can't listen on pub socket: %s", err.Error())
	}
	for {
		// Could also use sock.RecvMsg to get header
		d := date()
		fmt.Printf("SERVER: PUBLISHING DATE %s\n", d)
		if err = sock.Send([]byte(d)); err != nil {
			die("Failed publishing: %s", err.Error())
		}
		time.Sleep(time.Second)
	}
}

func client(url string, name string) {
	var sock mangos.Socket
	var err error
	var msg []byte

	if sock, err = sub.NewSocket(); err != nil {
		die("can't get new sub socket: %s", err.Error())
	}
	sock.AddTransport(ipc.NewTransport())
	sock.AddTransport(tcp.NewTransport())
	if err = sock.Dial(url); err != nil {
		die("can't dial on sub socket: %s", err.Error())
	}
	// Empty byte array effectively subscribes to everything
	err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
	if err != nil {
		die("cannot subscribe: %s", err.Error())
	}
	for {
		if msg, err = sock.Recv(); err != nil {
			die("Cannot recv: %s", err.Error())
		}
		fmt.Printf("CLIENT(%s): RECEIVED %s\n", name, string(msg))
	}
}

func main() {
	if len(os.Args) > 2 && os.Args[1] == "server" {
		server(os.Args[2])
		os.Exit(0)
	}
	if len(os.Args) > 3 && os.Args[1] == "client" {
		client(os.Args[2], os.Args[3])
		os.Exit(0)
	}
	fmt.Fprintf(os.Stderr, "Usage: pubsub server|client <URL> <ARG>\n")
	os.Exit(1)
}
           

編譯檔案(通過module模式建立編譯時使用如下方式,否則可直接編譯運作):

if [ "$1" == "arm" ]
then
  export GOARCH="arm"
  echo $GOARCH
  export GOOS="linux"
  echo $GOOS
fi
go build -mod=mod
           

結果(運作方式在代碼中已經給了提示,通過設定url變量之後傳參運作即可):

mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體

這樣就可以建立一個不需要中間件的具備訂閱、釋出功能的服務端和用戶端,在某些嵌入式場景也可以使用一般雲上分布式場景才用到的技術(是不是有點像鴻蒙OS的分布式方案,SURVEY模式是支援服務發現功能的)。

繼續閱讀