天天看點

UORB講解

Pixhawk 飛控系統是基于ARM的四軸以上飛行器的飛行控制器, 它的前身是PX4-IMU,Pixhawk 把之前的IMU進行了完整的重構,最新版本是2.4.3。而對應的Pixhawk 1.x版本與2.x版本的差別在于,I/O闆與FMU是否整合在一起。

uORB是Pixhawk系統中非常重要且關鍵的一個子產品,它肩負了整個系統的資料傳輸任務,所有的傳感器資料、GPS、PPM信号等都要從晶片擷取後通過uORB進行傳輸到各個子產品進行計算處理。

  • uORB 的架構簡述

uORB全稱為micro object request broker (uORB),即 微對象請求代理器,**實際上uORB是一套跨程序的IPC通訊子產品。**在Pixhawk中, 所有的功能被獨立以程序子產品為機關進行實作并工作。而程序間的資料互動就由為重要,必須要能夠符合實時、有序的特點。

Pixhawk 使用NuttX實時ARM系統, 而uORB對于NuttX而言,它僅僅是一個普通的檔案裝置對象,這個裝置支援Open、Close、Read、Write、Ioctl以及Poll機制。 通過這些接口的實作,uORB提供了一套“點對多”的跨程序廣播通訊機制, “點”指的是通訊消息的“源”,“多”指的是一個源可以有多個使用者來接收、處理。而“源”與“使用者”的關系在于,源不需要去考慮使用者是否可以收到某條被廣播的消息或什麼時候收到這條消息。它隻需要單純的把要廣播的資料推送到uORB的消息“總線”上。對于使用者而言,源推送了多少次的消息也不重要,重要的是取回最新的這條消息。

uORB實際上是多個程序打開同一個裝置檔案,程序間通過此檔案節點進行資料互動和共享。

  • uORB 的系統實作

uORB的實作位于固件源碼的src/modules/uORB/uORB.cpp檔案,它通過重載CDev基類來組織一個uORB的裝置執行個體。并且完成Read/Write等功能的重載。uORB 的入口點是uorb_main函數,在這裡它檢查uORB的啟動參數來完成對應的功能,uORB支援start/test/status這3條啟動參數,在Pixhawk的rcS啟動腳本中,使用start參數來進行初始化,其他2個參數分别用來進行uORB功能的自檢和列出uORB的目前狀态。

在rcS中使用start參數啟動uORB後,uORB會建立并初始化它的裝置執行個體, 其中的實作大部分都在CDev基類完成。這個過程類似于Linux裝置驅動中的Probe函數,或者Windows 核心的DriverEntry,通過init調用完成裝置的建立,節點注冊以及派遣例程的設定等。

  • uORB 源碼分析之Open

    uORB 的Open接口實作了“源”或“使用者” 打開uORB句柄的功能,打開uORB的句柄就意味着一個源的建立或把一個使用者關聯到某個源。我在這以源的建立為開端,逐漸講解Open的過程:

orb_advert_t orb_advertise(const struct orb_metadata *meta, const void data) { int result, fd; orb_advert_t advertiser; / open the node as an advertiser / fd = node_open(PUBSUB, meta, data, true); if (fd == ERROR) return ERROR; / get the advertiser handle and close the node / result = ioctl(fd, ORBIOCGADVERTISER, (unsigned long)&advertiser); close(fd); if (result == ERROR) return ERROR; / the advertiser must perform an initial publish to initialise the object */ result= orb_publish(meta, advertiser, data); if (result == ERROR) return ERROR; return advertiser; }

orb_advertise 其實就是一個int, meta是一個已定義好的源描述資訊,裡面就2個成員,分别為name以及size。儲存了通訊的名稱以及每次發送資料的長度。 建立源的過程為3個步驟, 打開uORB的裝置節點, 擷取裝置執行個體, 推送第一條消息。

/*
 * Generate the path to the node and try to open it.
 */
ret = node_mkpath(path, f, meta);

if (ret != OK) {
	errno = -ret;
	return ERROR;
}

/* open the path as either the advertiser or the subscriber */
fd = open(path, (advertiser) ? O_WRONLY : O_RDONLY);
           

從代碼中可以看出, 每個源都在/PUBSUB/目錄下有一個裝置節點。首先通過node_mkpath來拼接好裝置節點路徑,然後根據要打開的是源節點還是使用者節點來選擇辨別

int

ORBDevNode::open(struct file *filp)

{

int ret;

/* is this a publisher? */
if (filp->f_oflags == O_WRONLY) {

	/* become the publisher if we can */
	lock();

	if (_publisher == 0) {
		_publisher = getpid();
		ret = OK;

	} else {
		ret = -EBUSY;
	}

	unlock();

	/* now complete the open */
	if (ret == OK) {
		ret = CDev::open(filp);

		/* open failed - not the publisher anymore */
		if (ret != OK)
			_publisher = 0;
	}

	return ret;
}

/* is this a new subscriber? */
if (filp->f_oflags == O_RDONLY) {

	/* allocate subscriber data */
	SubscriberData *sd = new SubscriberData;

	if (nullptr == sd)
		return -ENOMEM;

	memset(sd, 0, sizeof(*sd));

	/* default to no pending update */
	sd->generation = _generation;

	filp->f_priv = (void *)sd;

	ret = CDev::open(filp);

	if (ret != OK)
		free(sd);

	return ret;
}

/* can only be pub or sub, not both */
return -EINVAL;
           

}

uORB中規定了源節點隻允許寫打開,使用者節點隻允許隻讀打開。 我認為上面的Open代碼裡lock到unlock那段根本就不需要~ 那裡僅僅是判斷不允許重複建立同一個話題而已。而去重完全可以依賴其他的一些機制來解決, CDev::Open就不在繼續往裡說了。~如果oflags是RDONLY,那就表示要打開的是一個使用者裝置節點,sd是為這個使用者準備的一個上下文結構。裡面包含了一些同步計數器等資訊,比如sd->generation,這裡儲存了目前使用者讀取到的消息的索引号,而_generation來源于源裝置的每次寫操作,每次源寫入資料時,_generation會累加。每次使用者讀取資料時會把_generation同步到自己的sd->generation,通過這種處理,如果目前使用者的sd->generation不等于全局的_generation就意味着源剛剛寫入過資料,有最新的通訊消息可以供讀取。

uORB裝置的讀和寫

ssize_t

ORBDevNode::read(struct file *filp, char *buffer, size_t buflen)

{

SubscriberData *sd = (SubscriberData *)filp_to_sd(filp);

/* if the object has not been written yet, return zero */
if (_data == nullptr)
	return 0;

/* if the caller's buffer is the wrong size, that's an error */
if (buflen != _meta->o_size)
	return -EIO;

/*
 * Perform an atomic copy & state update
 */
irqstate_t flags = irqsave();

/* if the caller doesn't want the data, don't give it to them */
if (nullptr != buffer)
	memcpy(buffer, _data, _meta->o_size);

/* track the last generation that the file has seen */
sd->generation = _generation;

/*
 * Clear the flag that indicates that an update has been reported, as
 * we have just collected it.
 */
sd->update_reported = false;

irqrestore(flags);

return _meta->o_size;
           

}

讀分為3步, 首先判斷參數是否合理,然後屏蔽中斷拷貝資料,最後更新同步資訊。值得注意的是,如果沒有源寫資料,那麼read會在第一個判斷就退出,原因是_data緩沖區在首次write時才會成功申請。generation的同步這裡也不在繼續說了

ssize_t

ORBDevNode::write(struct file *filp, const char buffer, size_t buflen)

{

/

* Writes are legal from interrupt context as long as the

* object has already been initialised from thread context.

*

* Writes outside interrupt context will allocate the object

* if it has not yet been allocated.

*

* Note that filp will usually be NULL.

*/

if (nullptr == _data) {

if (!up_interrupt_context()) {

lock();

		/* re-check size */
		if (nullptr == _data)
			_data = new uint8_t[_meta->o_size];

		unlock();
	}

	/* failed or could not allocate */
	if (nullptr == _data)
		return -ENOMEM;
}

/* If write size does not match, that is an error */
if (_meta->o_size != buflen)
	return -EIO;

/* Perform an atomic copy. */
irqstate_t flags = irqsave();
memcpy(_data, buffer, _meta->o_size);
irqrestore(flags);

/* update the timestamp and generation count */
_last_update = hrt_absolute_time();
_generation++;

/* notify any poll waiters */
poll_notify(POLLIN);

return _meta->o_size;
           

}

上面就是write的實作了,那個lock/unlock真心很雞肋,我是感覺多餘了,首次write會申請記憶體用于資料通訊, 然後關閉中斷拷貝資料防止在複制的過程用有使用者來read,最後是更新最後的更新時間以及同步計數器并且發送一個POLLIN的消息通知來喚醒那些還在等待uORB資料可讀的使用者

uORB裝置的POLL狀态

當使用者沒有指定資料讀取的頻率時,每次源的write都會觸發一個POLLIN來喚醒使用者去讀取剛更新的資料。是否喚醒除了檢查generation的值以外,另外一個要求就是讀取頻率的限制,每個使用者可以單獨指定自己打算讀更新的頻率。

bool

ORBDevNode::appears_updated(SubscriberData sd)

{

/ assume it doesn’t look updated */

bool ret = false;

/* avoid racing between interrupt and non-interrupt context calls */
irqstate_t state = irqsave();

/* check if this topic has been published yet, if not bail out */
if (_data == nullptr) {
	ret = false;
	goto out;
}

/*
 * If the subscriber's generation count matches the update generation
 * count, there has been no update from their perspective; if they
 * don't match then we might have a visible update.
 */
while (sd->generation != _generation) {

	/*
	 * Handle non-rate-limited subscribers.
	 */
	if (sd->update_interval == 0) {
		ret = true;
		break;
	}

	/*
	 * If we have previously told the subscriber that there is data,
	 * and they have not yet collected it, continue to tell them
	 * that there has been an update.  This mimics the non-rate-limited
	 * behaviour where checking / polling continues to report an update
	 * until the topic is read.
	 */
	if (sd->update_reported) {
		ret = true;
		break;
	}

	/*
	 * If the interval timer is still running, the topic should not
	 * appear updated, even though at this point we know that it has.
	 * We have previously been through here, so the subscriber
	 * must have collected the update we reported, otherwise
	 * update_reported would still be true.
	 */
	if (!hrt_called(&sd->update_call))
		break;

	/*
	 * Make sure that we don't consider the topic to be updated again
	 * until the interval has passed once more by restarting the interval
	 * timer and thereby re-scheduling a poll notification at that time.
	 */
	hrt_call_after(&sd->update_call,
		       sd->update_interval,
		       &ORBDevNode::update_deferred_trampoline,
		       (void *)this);

	/*
	 * Remember that we have told the subscriber that there is data.
	 */
	sd->update_reported = true;
	ret = true;

	break;
}
           

out:

irqrestore(state);

/* consider it updated */
return ret;
           

}

uORB 根據使用者指定的周期來設定hrt(實時定時器),每過一個時間間隔,hrt會被發生調用,通過hrt_called來檢查這個調用。如果未發生,即便此時源的資料已經更新那麼也不會傳回POLLIN來喚醒使用者去讀。 簡單來說,它通過控制POLLIN的周期來單方面控制使用者的讀取間隔。 如果是linux平台,當使用者指定了時間間隔後, 我會為它單獨初始化一個核心定時器,每次poll調用時檢查完可用更新後,再次檢查定時器即可。

繼續閱讀