天天看點

JPS 跳點算法源碼分析

 網上關于JPS算法原理介紹的文章有很多,本文就不過多的介紹了,直接從JPS算法源碼開始分析

github上面有一個c++寫的JPS算法源碼倉庫:https://github.com/KumarRobotics/jps3d

還有一個ros的JPS算法插件源碼倉庫,它依賴于上面的jps3d庫:https://github.com/eborghi10/jps_global_planner

jps3d庫中jps算法的主要執行部分在src目錄下的:jps_planner.cpp和graph_search.cpp文中

首先分析jps_planner.cpp和jps_planner.h

在JPSPlanner類裡面的構造函數會對地圖進行初始化

JPSPlanner<Dim>::JPSPlanner(bool verbose): planner_verbose_(verbose) {
  planner_verbose_ = verbose;
  if(planner_verbose_)
    printf(ANSI_COLOR_CYAN "JPS PLANNER VERBOSE ON\n" ANSI_COLOR_RESET);
}

template <int Dim>
void JPSPlanner<Dim>::setMapUtil(const std::shared_ptr<JPS::MapUtil<Dim>> &map_util) {
  map_util_ = map_util;
}

//栅格地圖更新,将栅格地圖存儲在cmap_中,1表示陽,0表示非占據
template <int Dim>
void JPSPlanner<Dim>::updateMap() {
  Veci<Dim> dim = map_util_->getDim();

  if(Dim == 3) {
    cmap_.resize(dim(0)*dim(1)*dim(2));
    for( int z = 0; z < dim(2); ++z) {
      for( int y = 0; y < dim(1); ++y) {
        for( int x = 0; x < dim(0); ++x) {
          Veci<Dim> pn;
          pn << x, y, z;
          cmap_[x+y*dim(0)+z*dim(0)*dim(1)] = map_util_->isOccupied(pn) ? 1:0;
        }
      }
    }
  }
  else {
    cmap_.resize(dim(0)*dim(1));
      for( int y = 0; y < dim(1); ++y)
        for( int x = 0; x < dim(0); ++x)
          cmap_[x+y*dim(0)] = map_util_->isOccupied(Veci<Dim>(x,y)) ? 1:0;
  }
}
           

jps_planner.cpp中最關鍵的函數是JPSPlanner::plan,主要步驟為:

1、首先對将起點和終點轉換為栅格地圖的坐标點,并進行合法性檢查

2、然後根據地圖次元執行個體化一個2D或在3D的JPS::GraphSearch對象,并執行graph_search_->plan();函數開始進行搜尋

if(Dim == 3) {
    graph_search_ = std::make_shared<JPS::GraphSearch>(cmap_.data(), dim(0), dim(1), dim(2), eps, planner_verbose_);
    graph_search_->plan(start_int(0), start_int(1), start_int(2), goal_int(0), goal_int(1), goal_int(2), use_jps);
  }
  else {
    graph_search_ = std::make_shared<JPS::GraphSearch>(cmap_.data(), dim(0), dim(1), eps, planner_verbose_);
    graph_search_->plan(start_int(0), start_int(1), goal_int(0),  goal_int(1), use_jps);
  }
           

3、得到初始路徑,最終存在raw_path_

//**** raw path, s --> g
  vec_Vecf<Dim> ps;
  for (const auto &it : path) {
    if(Dim == 3) {
      Veci<Dim> pn;
      pn << it->x, it->y, it->z;
      ps.push_back(map_util_->intToFloat(pn));
    }
    else
      ps.push_back(map_util_->intToFloat(Veci<Dim>(it->x, it->y)));
  }

  raw_path_ = ps;
  std::reverse(std::begin(raw_path_), std::end(raw_path_));
           

4、最後對得到的初始路徑進行一些優化,這些其實跟JPS算法已經關系不大了

path_ = removeCornerPts(raw_path_);
  std::reverse(std::begin(path_), std::end(path_));
  path_ = removeCornerPts(path_);
  std::reverse(std::begin(path_), std::end(path_));
  path_ = removeLinePts(path_);
           

 然後進入到graph_search.cpp檔案裡面,分析下GraphSearch::plan函數,GraphSearch::plan函數有兩個重載,一個用于2D地圖,一個用于3D地圖,下面分析2D地圖的代碼。GraphSearch::plan函數設定起點、終點,并計算法起點狀态。然後調用plan函數

bool GraphSearch::plan(int xStart, int yStart, int xGoal, int yGoal, bool useJps, int maxExpand)
{
  use_2d_ = true;
  pq_.clear();
  path_.clear();
  hm_.resize(xDim_ * yDim_);
  seen_.resize(xDim_ * yDim_, false);
  // Set jps
  use_jps_ = useJps;

  // Set goal
  int goal_id = coordToId(xGoal, yGoal);
  xGoal_ = xGoal; yGoal_ = yGoal;

  // Set start node
  int start_id = coordToId(xStart, yStart);
  StatePtr currNode_ptr = std::make_shared<State>(State(start_id, xStart, yStart, 0, 0));
  currNode_ptr->g = 0;
  currNode_ptr->h = getHeur(xStart, yStart);

  return plan(currNode_ptr, maxExpand, start_id, goal_id);
}
           

plan的函數執行的流程就是我們所熟悉的A*算法流程:

1、維護一個優先隊列openlist,每次從openlist裡面取出優先級最高的節點

2、對該節點的鄰居進行檢查,檢視是否能加入openlist

3、若可以加入openlist則計算節點的g值和h值,相加作為該節點的優先級,并設定該節點的夫節點為目前節點

4、将目前節點加入closelist。循環執行1-4直到找到目标節點

bool GraphSearch::plan(StatePtr& currNode_ptr, int maxExpand, int start_id, int goal_id) {
  // Insert start node
  currNode_ptr->heapkey = pq_.push(currNode_ptr);
  currNode_ptr->opened = true;
  hm_[currNode_ptr->id] = currNode_ptr;
  seen_[currNode_ptr->id] = true;

  int expand_iteration = 0;
  while(true)
  {
    expand_iteration++;
    // get element with smallest cost  //從優先隊列裡面拿出代價最小的節點并彈出這個節點
    currNode_ptr = pq_.top(); pq_.pop();
    currNode_ptr->closed = true; // Add to closed list //将彈出的節點加入clodelist

    if(currNode_ptr->id == goal_id) {
      if(verbose_)
        printf("Goal Reached!!!!!!\n\n");
      break;
    }

    //printf("expand: %d, %d\n", currNode_ptr->x, currNode_ptr->y);
    std::vector<int> succ_ids;
    std::vector<double> succ_costs;
    // Get successors //successors:繼任者,這裡意思是子節點   
    //得到能夠順利到達的子節點ID,并且計算從目前節點到達子節點的cost
    //如果是A*則将子節點是周圍節點,如果是JPS則子節點是跳點
    if(!use_jps_)
      getSucc(currNode_ptr, succ_ids, succ_costs);
    else
      getJpsSucc(currNode_ptr, succ_ids, succ_costs);

    // if(verbose_)
   // printf("size of succs: %zu\n", succ_ids.size());
    // Process successors 
    //判斷所有找出的子節點能否被加入openlist 和 優先隊列pq_
    for( int s = 0; s < (int) succ_ids.size(); s++ )
    {
      //see if we can improve the value of succstate
      StatePtr& child_ptr = hm_[succ_ids[s]];
      double tentative_gval = currNode_ptr->g + succ_costs[s];

      if( tentative_gval < child_ptr->g )
      {
        child_ptr->parentId = currNode_ptr->id;  // Assign new parent
        child_ptr->g = tentative_gval;    // Update gval

        //double fval = child_ptr->g + child_ptr->h;

        // if currently in OPEN, update
        if( child_ptr->opened && !child_ptr->closed) {
          pq_.increase( child_ptr->heapkey );       // update heap
          child_ptr->dx = (child_ptr->x - currNode_ptr->x);
          child_ptr->dy = (child_ptr->y - currNode_ptr->y);
          child_ptr->dz = (child_ptr->z - currNode_ptr->z);
          if(child_ptr->dx != 0)
            child_ptr->dx /= std::abs(child_ptr->dx);
          if(child_ptr->dy != 0)
            child_ptr->dy /= std::abs(child_ptr->dy);
           if(child_ptr->dz != 0)
            child_ptr->dz /= std::abs(child_ptr->dz);
        }
        // if currently in CLOSED
        else if( child_ptr->opened && child_ptr->closed)
        {
          printf("ASTAR ERROR!\n");
        }
        else // new node, add to heap
        {
          //printf("add to open set: %d, %d\n", child_ptr->x, child_ptr->y);
          child_ptr->heapkey = pq_.push(child_ptr);
          child_ptr->opened = true;
        }
      } //
    } // Process successors

    //如果擴充節點次數大于最大門檻值,則結束搜尋,沒有找到終點
    if(maxExpand > 0 && expand_iteration >= maxExpand) {
      if(verbose_)
        printf("MaxExpandStep [%d] Reached!!!!!!\n\n", maxExpand);
      return false;
    }
    //如果優先隊列裡面沒有節點,整個地圖已經被搜尋完畢,沒有找到終點
    if( pq_.empty()) {
      if(verbose_)
        printf("Priority queue is empty!!!!!!\n\n");
      return false;
    }
  }

  if(verbose_) {
    printf("goal g: %f, h: %f!\n", currNode_ptr->g, currNode_ptr->h);
    printf("Expand [%d] nodes!\n", expand_iteration);
  }

  path_ = recoverPath(currNode_ptr, start_id);

  return true;
}
           

jps和A*算法的差別是,jps會向一個方向一直搜尋,直到找到一個跳點。jps會将這個跳點加入到openlist。

跳點定義:目前點 x 滿足以下三個條件之一:

  • 節點 x 是起點/終點。
  • 節點 x 至少有一個強迫鄰居。
  • 如果父節點在斜方向(意味着這是斜向搜尋),節點x的水準或垂直方向上有滿足條件a,b的點。

 強迫鄰居定義:節點 x 的8個鄰居中有障礙,且 x 的父節點 p 經過x 到達 n 的距離代價比不經過 x 到達的 n 的任意路徑的距離代價小,則稱 n 是 x 的強迫鄰居。

跳點想了解得更詳細可看這篇部落格:JPS/JPS+ 尋路算法_GJQI12的部落格-CSDN部落格

 plan函數中GraphSearch::getJpsSucc函數用來尋找跳點,GraphSearch::getJpsSucc函數中調用jump函數尋找跳點。我對2D地圖搜尋部分進行了注釋,3D地圖其實也是差不多的過程。

//尋找目前節點的子節點,即尋找跳點
void GraphSearch::getJpsSucc(const StatePtr& curr, std::vector<int>& succ_ids, std::vector<double>& succ_costs) {
  if(use_2d_) {
    const int norm1 = std::abs(curr->dx)+std::abs(curr->dy);
    int num_neib = jn2d_->nsz[norm1][0];
    int num_fneib = jn2d_->nsz[norm1][1];
    int id = (curr->dx+1)+3*(curr->dy+1);
//對目前所有的鄰居方向和強迫節點方向進行搜尋
// no move (norm 0):        8 neighbors always added
//                          0 forced neighbors to check (never happens)
//                          0 neighbors to add if forced (never happens)
// straight (norm 1):       1 neighbor always added
//                          2 forced neighbors to check
//                          2 neighbors to add if forced
// diagonal (norm sqrt(2)): 3 neighbors always added
//                          2 forced neighbors to check
//                          2 neighbors to add if forced
    for( int dev = 0; dev < num_neib+num_fneib; ++dev) {
      int new_x, new_y;
      int dx, dy;
      //對必定存在的鄰居進行搜尋
      if(dev < num_neib) {
        dx = jn2d_->ns[id][0][dev];
        dy = jn2d_->ns[id][1][dev];
        if(!jump(curr->x, curr->y, dx, dy, new_x, new_y)) continue;
      }
      //對可能存在的強迫鄰居進行搜尋
      else {
        int nx = curr->x + jn2d_->f1[id][0][dev-num_neib];
        int ny = curr->y + jn2d_->f1[id][1][dev-num_neib];
        //如果目前節點旁邊有障礙物則可能存在強迫節點,需要搜尋
        if(isOccupied(nx,ny)) {
          dx = jn2d_->f2[id][0][dev-num_neib];
          dy = jn2d_->f2[id][1][dev-num_neib];
          if(!jump(curr->x, curr->y, dx, dy, new_x, new_y)) continue;
        }
        else
          continue;
      }
      //jump傳回false則該搜尋方向碰到了障礙物、邊界,continue結束本次循環
      //jump傳回true則找到了跳點,進行跳點的存儲
      int new_id = coordToId(new_x, new_y);
      if(!seen_[new_id]) {
        seen_[new_id] = true;
        hm_[new_id] = std::make_shared<State>(new_id, new_x, new_y, dx, dy);
        hm_[new_id]->h = getHeur(new_x, new_y);
      }
      //将跳點ID以及目前點到跳點的cost存儲起來
      succ_ids.push_back(new_id);
      succ_costs.push_back(std::sqrt((new_x - curr->x) * (new_x - curr->x) +
            (new_y - curr->y) * (new_y - curr->y)));
    }
  }
  else {
    const int norm1 = std::abs(curr->dx)+std::abs(curr->dy)+std::abs(curr->dz);
    int num_neib = jn3d_->nsz[norm1][0];
    int num_fneib = jn3d_->nsz[norm1][1];
    int id = (curr->dx+1)+3*(curr->dy+1)+9*(curr->dz+1);

    for( int dev = 0; dev < num_neib+num_fneib; ++dev) {
      int new_x, new_y, new_z;
      int dx, dy, dz;
      if(dev < num_neib) {
        dx = jn3d_->ns[id][0][dev];
        dy = jn3d_->ns[id][1][dev];
        dz = jn3d_->ns[id][2][dev];
        if(!jump(curr->x, curr->y, curr->z,
              dx, dy, dz, new_x, new_y, new_z)) continue;
      }
      else {
        int nx = curr->x + jn3d_->f1[id][0][dev-num_neib];
        int ny = curr->y + jn3d_->f1[id][1][dev-num_neib];
        int nz = curr->z + jn3d_->f1[id][2][dev-num_neib];
        if(isOccupied(nx,ny,nz)) {
          dx = jn3d_->f2[id][0][dev-num_neib];
          dy = jn3d_->f2[id][1][dev-num_neib];
          dz = jn3d_->f2[id][2][dev-num_neib];
          if(!jump(curr->x, curr->y, curr->z,
                dx, dy, dz, new_x, new_y, new_z)) continue;
        }
        else
          continue;
      }

      int new_id = coordToId(new_x, new_y, new_z);
      if(!seen_[new_id]) {
        seen_[new_id] = true;
        hm_[new_id] = std::make_shared<State>(new_id, new_x, new_y, new_z, dx, dy, dz);
        hm_[new_id]->h = getHeur(new_x, new_y, new_z);
      }

      succ_ids.push_back(new_id);
      succ_costs.push_back(std::sqrt((new_x - curr->x) * (new_x - curr->x) +
            (new_y - curr->y) * (new_y - curr->y) +
            (new_z - curr->z) * (new_z - curr->z)));
    }

  }
}
           

 jump是一個遞歸函數,會在一個方向上一直搜尋直到碰到障礙物、邊界、跳點。

bool GraphSearch::jump(int x, int y, int dx, int dy, int& new_x, int& new_y ) {
  new_x = x + dx;
  new_y = y + dy;
  if (!isFree(new_x, new_y))
    return false;

  if (new_x ==  xGoal_ && new_y == yGoal_)
    return true;

  //判斷目前點是否有強迫節點
  if (hasForced(new_x, new_y, dx, dy))
    return true;
  //繼續在目前方向上深度搜尋
  //norm1不可能等于0,因為隻有起始點的年norm1等于0
  //norm1 == 1直接執行return jump,因為直線方向隻有一個方向要搜尋
  //norm1 == 2先執行for裡面的jump 2次再執行return jump
  //因為斜線方向需要先搜尋兩個直線方向再在斜線方向搜尋 
  const int id = (dx+1)+3*(dy+1);
  const int norm1 = std::abs(dx) + std::abs(dy);
  int num_neib = jn2d_->nsz[norm1][0];
  for( int k = 0; k < num_neib-1; ++k )
  {
    int new_new_x, new_new_y;
    if(jump(new_x, new_y, jn2d_->ns[id][0][k], jn2d_->ns[id][1][k],
        new_new_x, new_new_y)) return true;
  }

  return jump(new_x, new_y, dx, dy, new_x, new_y);
}
           

在graph_search.h檔案中有個JPS2DNeib結構體

有個常量數組static constexpr int nsz[3][2] = {{8, 0}, {1, 2}, {3, 2}};

///Search and prune neighbors for JPS 2D
  struct JPS2DNeib {
    // for each (dx,dy) these contain:
    //    ns: neighbors that are always added
    //    f1: forced neighbors to check  //需要檢查是否是障礙物的鄰居
    //    f2: neighbors to add if f1 is forced  //檢查是否是強迫鄰居
    int ns[9][2][8];
    int f1[9][2][2];
    int f2[9][2][2];
    // nsz contains the number of neighbors for the four different types of moves:
    // no move (norm 0):        8 neighbors always added
    //                          0 forced neighbors to check (never happens)
    //                          0 neighbors to add if forced (never happens)
    // straight (norm 1):       1 neighbor always added
    //                          2 forced neighbors to check
    //                          2 neighbors to add if forced
    // diagonal (norm sqrt(2)): 3 neighbors always added
    //                          2 forced neighbors to check
    //                          2 neighbors to add if forced
    static constexpr int nsz[3][2] = {{8, 0}, {1, 2}, {3, 2}};

    void print();
    JPS2DNeib();
    private:
    void Neib(int dx, int dy, int norm1, int dev, int& tx, int& ty);
    void FNeib(int dx, int dy, int norm1, int dev,
        int& fx, int& fy, int& nx, int& ny);
  };
           

nsz[0] = {8,0}表示運動方向的曼哈頓距離 abs(dx)+abs(dy) == 0時,即起點處初始沒有運動方向,他會有8個必定要被檢查鄰居,0個可能的強迫鄰居

nsz[0] = {1,2}表示運動方向的曼哈頓距離 abs(dx)+abs(dy) == 1時,即直線運動方向,他會有1個必定要被檢查鄰居,即下圖中的栅格5,有2個可能的強迫鄰居,即下圖中的栅格3和8.

JPS 跳點算法源碼分析

nsz[0] = {3,2}表示運動方向的曼哈頓距離 abs(dx)+abs(dy) == 2時,即斜線運動方向,他會有3個必定要被檢查鄰居即栅格4、7、x,2個可能的強迫鄰居即栅格1、8

JPS 跳點算法源碼分析

 JPS2DNeib結構體中

// for each (dx,dy) these contain:
    //    ns: neighbors that are always added
    //    f1: forced neighbors to check  //需要檢查是否是障礙物的鄰居
    //    f2: neighbors to add if f1 is forced  //檢查是否是強迫鄰居
    int ns[9][2][8];
    int f1[9][2][2];
    int f2[9][2][2];
           

ns表示根據目前移動方向必須要檢查的鄰居,如:dx = 0,dy = 0.表示起點,有8個必被檢查的鄰居;dx = 1,dy = 0,表示移動方向向右,目前節點右邊的節點必須被檢查;dx = 1,dy = 1,表示移動方向為右上方,目前節點上方、右方、右上方3個節點必須被檢查。

f1表示需要檢查是否有障礙物的位置,f2表示如果存在障礙物這需要檢查的強迫鄰居節點的位置。以上所說的位置都用相對位置表示。如向右上方運動,則f1裡面存儲的是栅格4和7的坐标,算法将檢查栅格4和7有沒有被障礙物占據,如果有障礙物,f2裡面存儲的是栅格1和8的坐标,算法将栅格1和8是否為x的強迫鄰居。

JPS 跳點算法源碼分析

 本文參考:

JPS尋路算法 - 簡書

JPS算法_Jason.Li_0012的部落格-CSDN部落格_jps算法

JPS/JPS+ 尋路算法 - KillerAery - 部落格園

繼續閱讀