
Learn Influxdb the hard way (4) - Services in Influxdb II





//cmd/influxd/run/server.go 320行
func (s *Server) appendPrecreatorService(c precreator.Config) error {
    if !c.Enabled {
        return nil
    srv := precreator.NewService(c)
    srv.MetaClient = s.MetaClient
    s.Services = append(s.Services, srv)
    return nil

// services/precreator/service.go 12行
// Service manages the shard precreation service.
type Service struct {
    checkInterval time.Duration
    advancePeriod time.Duration

    Logger *zap.Logger

    done chan struct{}
    wg   sync.WaitGroup

    MetaClient interface {
        PrecreateShardGroups(now, cutoff time.Time) error

// NewService returns an instance of the precreation service.
func NewService(c Config) *Service {
    return &Service{
        checkInterval: time.Duration(c.CheckInterval),
        advancePeriod: time.Duration(c.AdvancePeriod),
        Logger:        zap.NewNop(),


// services/precreator/service.go 71行
// runPrecreation continually checks if resources need precreation.
func (s *Service) runPrecreation() {
    defer s.wg.Done()

    for {
        select {
        case <-time.After(s.checkInterval):
            if err := s.precreate(time.Now().UTC()); err != nil {
                s.Logger.Info("Failed to precreate shards", zap.Error(err))
        case <-s.done:
            s.Logger.Info("Terminating precreation service")

// precreate performs actual resource precreation.
func (s *Service) precreate(now time.Time) error {
    cutoff := now.Add(s.advancePeriod).UTC()
    return s.MetaClient.PrecreateShardGroups(now, cutoff)

在代码中有一个很有趣的用法cutoff := now.Add(s.advancePeriod).UTC(),这个的作用是什么呢,查询了一下官方的文档,这个参数的主要作用是提前多长时间创建ShardGroups,默认情况下checkInterval的取值为10m,而advancePeriod的取值为30m,也就是说Influxdb会每10分钟进行一下检查,看是否需要为30分钟后预创建ShardGroups。接下来深入到创建ShardGroups的源码中。

// services/meta/client.go 777行
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
func (c *Client) PrecreateShardGroups(from, to time.Time) error {
    defer c.mu.Unlock()
    data := c.cacheData.Clone()
    var changed bool

    for _, di := range data.Databases {
        for _, rp := range di.RetentionPolicies {
            if len(rp.ShardGroups) == 0 {
                // No data was ever written to this group, or all groups have been deleted.
            g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
            if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
                // Group is not deleted, will end before the future time, but is still yet to expire.
                // This last check is important, so the system doesn't create shards groups wholly
                // in the past.

                // Create successive shard group.
                nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
                // if it already exists, continue
                if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {
                    c.logger.Info("Shard group already exists",
                newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)
                if err != nil {
                    c.logger.Info("Failed to precreate successive shard group",
                        zap.Uint64("group_id", g.ID), zap.Error(err))
                changed = true
                c.logger.Info("New shard group successfully precreated",

    if changed {
        if err := c.commit(data); err != nil {
            return err

    return nil

在前几篇文章中我们稍微涉及了一些关于Influxdb存储结构上的内容,一个Influxdb可以包含多个库,每个库的存储是根据retention policy来分目录的,而retention policy之下才是真正的Shard,因此在预创建Shard的时候,需要为每一个符合条件的库以及库之下的retention policy都创建Shard。

如果此时retention policy下没有任何的Shard,则会认为无需预创建,因为大部分的情况下,这种场景意味着数据被清空的中间态。为了保证创建的Shard的时序性,在预创建的时候会获取一个retention policy下的最后一个shard,并检查当前shard的所属时间序列是否会和新创建的Shard有重合,只有新创建的Shard与最新的Shard之间不存在重合关系的时候,才会进行创建。继续跟踪代码,查看创建Shard的流程。

// services/meta/data.go 350行
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
    // Find retention policy.
    rpi, err := data.RetentionPolicy(database, policy)
    if err != nil {
        return err
    } else if rpi == nil {
        return influxdb.ErrRetentionPolicyNotFound(policy)

    // Verify that shard group doesn't already exist for this timestamp.
    if rpi.ShardGroupByTimestamp(timestamp) != nil {
        return nil

    // Create the shard group.
    sgi := ShardGroupInfo{}
    sgi.ID = data.MaxShardGroupID
    sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
    sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
    if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {
        // Shard group range is [start, end) so add one to the max time.
        sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)

    sgi.Shards = []ShardInfo{
        {ID: data.MaxShardID},

    // Retention policy has a new shard group, so update the policy. Shard
    // Groups must be stored in sorted order, as other parts of the system
    // assume this to be the case.
    rpi.ShardGroups = append(rpi.ShardGroups, sgi)

    return nil


