前言
在上一篇文章我們從上帝視角鳥瞰了一下Influxdb的元件結構,在這篇文章中,我們會開始深入代碼,從代碼的流程中幫助大家切割Influxdb的核心元件。希望在看本篇文章的時候,大家能夠将Influxdb的代碼下載下傳的本地,對照進行檢視。
git clone -b v1.5.0 [email protected]:influxdata/influxdb.git
代碼主幹流程
Influxdb的源碼倉庫中包含了influx、influx_inspect、influx_stress、influx_tsm、influxd、store等多個子項目,對于本系列而言,更多的側重在Influxd也就是Influxdb的主要存儲Server。
首先我們進入到cmd/influxd/main.go的入口檔案,進行代碼跟蹤,進入到
run
指令的代碼分支流程下。
//cmd/influxd/run/command.go 133行
s, err := NewServer(config, buildInfo)
if err != nil {
return fmt.Errorf("create server: %s", err)
}
s.Logger = cmd.Logger
s.CPUProfile = options.CPUProfile
s.MemProfile = options.MemProfile
if err := s.Open(); err != nil {
return fmt.Errorf("open server: %s", err)
}
cmd.Server = s
// Begin monitoring the server's error channel.
go cmd.monitorServerErrors()
可以看到Influxdb中Server的建構,并最終調用了Server的Open方法啟動Server,這個Server對象是Influxdb的邏輯封裝。我們來看下Server包含的内容。
//cmd/influxd/run/server.go 158行
s.Monitor = monitor.New(s, c.Monitor)
s.config.registerDiagnostics(s.Monitor)
if err := s.MetaClient.Open(); err != nil {
return nil, err
}
s.TSDBStore = tsdb.NewStore(c.Data.Dir)
s.TSDBStore.EngineOptions.Config = c.Data
// Copy TSDB configuration.
s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index
// Create the Subscriber service
s.Subscriber = subscriber.NewService(c.Subscriber)
// Initialize points writer.
s.PointsWriter = coordinator.NewPointsWriter()
s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
s.PointsWriter.TSDBStore = s.TSDBStore
// Initialize query executor.
s.QueryExecutor = query.NewQueryExecutor()
s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
MetaClient: s.MetaClient,
TaskManager: s.QueryExecutor.TaskManager,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
ShardMapper: &coordinator.LocalShardMapper{
MetaClient: s.MetaClient,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
},
Monitor: s.Monitor,
PointsWriter: s.PointsWriter,
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
}
s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries
// Initialize the monitor
s.Monitor.Version = s.buildInfo.Version
s.Monitor.Commit = s.buildInfo.Commit
s.Monitor.Branch = s.buildInfo.Branch
s.Monitor.BuildTime = s.buildInfo.Time
s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)
Server的核心代碼主要就是執行個體化内部元件,主要包含Monitor、Subscriber、PointsWriter和QueryExecutor。在上篇文章中已經簡單的介紹過這幾個元件的作用,在此先不過多贅述,我們再來看下Server的Open方法。
//cmd/influxd/run/server.go 371行
s.appendMonitorService()
s.appendPrecreatorService(s.config.Precreator)
s.appendSnapshotterService()
s.appendContinuousQueryService(s.config.ContinuousQuery)
s.appendHTTPDService(s.config.HTTPD)
s.appendStorageService(s.config.Storage)
s.appendRetentionPolicyService(s.config.Retention)
for _, i := range s.config.GraphiteInputs {
if err := s.appendGraphiteService(i); err != nil {
return err
}
}
for _, i := range s.config.CollectdInputs {
s.appendCollectdService(i)
}
for _, i := range s.config.OpenTSDBInputs {
if err := s.appendOpenTSDBService(i); err != nil {
return err
}
}
for _, i := range s.config.UDPInputs {
s.appendUDPService(i)
}
s.Subscriber.MetaClient = s.MetaClient
s.PointsWriter.MetaClient = s.MetaClient
s.Monitor.MetaClient = s.MetaClient
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
// Configure logging for all services and clients.
if s.config.Meta.LoggingEnabled {
s.MetaClient.WithLogger(s.Logger)
}
s.TSDBStore.WithLogger(s.Logger)
if s.config.Data.QueryLogEnabled {
s.QueryExecutor.WithLogger(s.Logger)
}
s.PointsWriter.WithLogger(s.Logger)
s.Subscriber.WithLogger(s.Logger)
for _, svc := range s.Services {
svc.WithLogger(s.Logger)
}
s.SnapshotterService.WithLogger(s.Logger)
s.Monitor.WithLogger(s.Logger)
// Open TSDB store.
if err := s.TSDBStore.Open(); err != nil {
return fmt.Errorf("open tsdb store: %s", err)
}
// Open the subcriber service
if err := s.Subscriber.Open(); err != nil {
return fmt.Errorf("open subscriber: %s", err)
}
// Open the points writer service
if err := s.PointsWriter.Open(); err != nil {
return fmt.Errorf("open points writer: %s", err)
}
s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())
for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
}
}
在Server的Open中先進行了Service的注冊,然後将執行個體化的内部元件進行啟動并設定相應的Logger等配置,最後依次啟動注冊在Server上的服務。
我們可以挑選appendPrecreatorService作為範例進行分析,來看下一個Service的注冊過程。
//cmd/influxd/run/server.go 320行
func (s *Server) appendPrecreatorService(c precreator.Config) error {
if !c.Enabled {
return nil
}
srv := precreator.NewService(c)
srv.MetaClient = s.MetaClient
s.Services = append(s.Services, srv)
return nil
}
在本例中講解的是PrecreatorService這個Service的注冊過程,首先執行個體化一個PrecreatorService,然後設定其使用的MetaClient,最後将PrecreatorService的執行個體注冊到Server上,在Influxdb中Service是一個接口類型,需要實作如下三個方法,分别負責統一的日志接入,啟動和停止。
//cmd/influxd/run/server.go 558行
// Service represents a service attached to the server.
type Service interface {
WithLogger(log *zap.Logger)
Open() error
Close() error
}
這樣Influxdb的代碼基本已經切割清晰了,首先是一個Server負責執行個體化幾個全局的内部元件單例,然後生成上層業務包裝的Service,最後再依次啟動。
最後
在接下來的文章中,我們會深入到每個Service中,依次講解他們的功能與原理。如果有錯誤的地方麻煩大家多多指正。