mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體
文章目錄
- mongos、nanomsg、zeroMQ簡述和go-mongos使用執行個體
-
- 1. mongos、nanomsg簡述
- 2. zeroMQ、nanomsg和可擴充協定
-
-
- PAIR(雙向通信)
- REQREP(用戶端請求、伺服器回複)
- PIPELINE(單向資料流)
- BUS(多對多通信)
- PUBSUB(主題廣播)
- SURVEY(向小組提問)
-
- 3. mongos及執行個體
1. mongos、nanomsg簡述
來自:https://cloud.tencent.com/developer/article/1096304
nanomsg是一個消息協定SP ("Scalable Protocols"可擴充協定)的c語言實作,而mangos用golang實作了SP (“Scalable Protocols”)。
消息協定不同于通常我們說的消息隊列,是指一個簡單的傳輸會話協定。
mangos重點也是替代直接手寫TCP,實作各種場合的通訊範式。
推薦:https://bravenewgeek.com/fast-scalable-networking-in-go-with-mangos/
那麼mangos、nanomsg有何優點麼?
主要是:簡單、抽象合理、相容多種語言、輕量級、學習成本低、比自己造的輪子好用很多。
了解的誤區:mangos/nanomsg并不是消息隊列,也不是RPC架構。
2. zeroMQ、nanomsg和可擴充協定
https://bravenewgeek.com/a-look-at-nanomsg-and-scalability-protocols/
可以簡單了解這些網絡架構和協定是對TCP、PGM、IPC、ITC等協定的封裝,提供新的接口便于分布式環境下的通信,而zeroMQ較差的可擴充性将其局限于某一些協定,為了解決擴充性等一些其它的問題出現了Nanomsg,Nanomsg 通過為傳輸和消息傳遞協定提供可插入的接口來解決這個問題。這意味着支援超出标準集 PUB/SUB、REQ/REP 等的新傳輸(例如 WebSockets)和新消息模式。
也許最有趣的是 nanomsg 與 ZeroMQ 的哲學背離。nanomsg 不是作為一個通用的網絡庫,而是打算通過實作所謂的“可擴充性協定”來提供用于建構可擴充和高性能分布式系統的“樂高積木”。這些可擴充協定是通信模式,它們是網絡堆棧傳輸層之上的抽象。這些協定彼此完全分離,是以每個協定都可以展現明确定義的分布式算法。正如 nanomsg 的作者 Martin Sustrik 所說,其目的是通過IETF标準化協定規範。(zeroMQ可以了解成網絡庫,而NanoMsg更像按照應用場景定義新的可擴充協定,具體選擇那一種則需要看應用場景,就分布式場景下NanoMsg的可擴充性可能更适合分布式複雜多變場景)
Nanomsg 目前定義了六種不同的可擴充性協定:PAIR、REQREP、PIPELINE、BUS、PUBSUB 和 SURVEY。
PAIR(雙向通信)
PAIR 在兩個端點之間實作簡單的一對一、雙向通信。兩個節點可以互相發送消息。
REQREP(用戶端請求、伺服器回複)
REQREP 協定定義了一種用于建構無狀态服務來處理使用者請求的模式。用戶端發送請求,伺服器接收請求,進行一些處理,然後傳回響應。
PIPELINE(單向資料流)
PIPELINE 提供單向資料流,這對于建立負載平衡的處理管道非常有用。生産者節點送出分布在消費者節點之間的工作。
BUS(多對多通信)
BUS 允許從每個對等點發送的消息傳遞到組中的每個其他對等點。
PUBSUB(主題廣播)
PUBSUB 允許釋出者向零個或多個訂閱者多點傳播消息。訂閱者可以連接配接到多個釋出者,可以訂閱特定的主題,允許他們隻接收與他們相關的消息。
SURVEY(向小組提問)
最後一個可擴充性協定是 SURVEY。SURVEY 模式與 PUBSUB 的相似之處在于來自一個節點的消息被廣播到整個組,但不同的是組中的每個節點都 響應該消息。這開辟了各種各樣的應用程式,因為它使您可以快速輕松地一次性查詢大量系統的狀态。調查受訪者必須在調查員配置的時間視窗内做出回應。
這裡面文章最後也提到了關于c還是c++還開發zeroMQ的說明,了解這些内容感覺也是蠻有意思的。
3. mongos及執行個體
關于zeroMQ、manomsg的分析我們不再繼續,感興趣的可以在上面的連結中繼續檢視(https://bravenewgeek.com/a-look-at-nanomsg-and-scalability-protocols/),感覺分析的還是比較全面的。我們項目中使用了mongos用于邊緣網關和攝像頭的通信以及提供了人臉分析結果接口(使用的PUBSUB的通信方式,邊緣網關中部分程序通過onvif協定拉取攝像頭的流資料後通過ffmpeg和gstreamer進行處理來檢測人臉并分析對應特征資料,之後通過pub的方式提供資料,而另一些人臉抓拍處理上報或者關聯業務處理的程序則是通過mongos來sub人臉屬性資訊的,這種方式很像MQTT的協定處理方式,但是我們不需要專門搭建中間件,僅僅隻是用于程序間通信,是以Nanomsg基于場景分類的可擴充協定是非常棒的,非常适合不同的通信場景)。
可以看這裡:https://bravenewgeek.com/fast-scalable-networking-in-go-with-mangos/
這裡給了PUBSUB和SURVEY的執行個體,當然,也可以在mongos的GitHub(https://github.com/nanomsg/mangos-v1/tree/v1.4.0/examples)上去檢視,目前看似乎國内使用的還不是很多或者說中文資料不是很多,但從場景來看還是有很多場景可以使用的。
建立檔案test.go:
// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// pubsub implements a publish/subscribe example. server is a listening
// pub socket, and clients are dialing sub sockets.
//
// To use:
//
// $ go build .
// $ url=tcp://127.0.0.1:40899
// $ ./pubsub server $url server & server=$! && sleep 1
// $ ./pubsub client $url client0 & client0=$!
// $ ./pubsub client $url client1 & client1=$!
// $ ./pubsub client $url client2 & client2=$!
// $ sleep 5
// $ kill $server $client0 $client1 $client2
//
package main
import (
"fmt"
"os"
"time"
"nanomsg.org/go-mangos"
"nanomsg.org/go-mangos/protocol/pub"
"nanomsg.org/go-mangos/protocol/sub"
"nanomsg.org/go-mangos/transport/ipc"
"nanomsg.org/go-mangos/transport/tcp"
)
func die(format string, v ...interface{}) {
fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
os.Exit(1)
}
func date() string {
return time.Now().Format(time.ANSIC)
}
func server(url string) {
var sock mangos.Socket
var err error
if sock, err = pub.NewSocket(); err != nil {
die("can't get new pub socket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
die("can't listen on pub socket: %s", err.Error())
}
for {
// Could also use sock.RecvMsg to get header
d := date()
fmt.Printf("SERVER: PUBLISHING DATE %s\n", d)
if err = sock.Send([]byte(d)); err != nil {
die("Failed publishing: %s", err.Error())
}
time.Sleep(time.Second)
}
}
func client(url string, name string) {
var sock mangos.Socket
var err error
var msg []byte
if sock, err = sub.NewSocket(); err != nil {
die("can't get new sub socket: %s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err != nil {
die("can't dial on sub socket: %s", err.Error())
}
// Empty byte array effectively subscribes to everything
err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
if err != nil {
die("cannot subscribe: %s", err.Error())
}
for {
if msg, err = sock.Recv(); err != nil {
die("Cannot recv: %s", err.Error())
}
fmt.Printf("CLIENT(%s): RECEIVED %s\n", name, string(msg))
}
}
func main() {
if len(os.Args) > 2 && os.Args[1] == "server" {
server(os.Args[2])
os.Exit(0)
}
if len(os.Args) > 3 && os.Args[1] == "client" {
client(os.Args[2], os.Args[3])
os.Exit(0)
}
fmt.Fprintf(os.Stderr, "Usage: pubsub server|client <URL> <ARG>\n")
os.Exit(1)
}
編譯檔案(通過module模式建立編譯時使用如下方式,否則可直接編譯運作):
if [ "$1" == "arm" ]
then
export GOARCH="arm"
echo $GOARCH
export GOOS="linux"
echo $GOOS
fi
go build -mod=mod
結果(運作方式在代碼中已經給了提示,通過設定url變量之後傳參運作即可):
這樣就可以建立一個不需要中間件的具備訂閱、釋出功能的服務端和用戶端,在某些嵌入式場景也可以使用一般雲上分布式場景才用到的技術(是不是有點像鴻蒙OS的分布式方案,SURVEY模式是支援服務發現功能的)。