缓冲
缓冲的目的是在pipeline中积累足够的数据,以便播放可以顺利进行而不会中断。它通常在从(慢速)并且非实时网络源读取时这样做,但也可用于实时源。
GStreamer 为以下用例提供支持:
-
- 在开始播放之前在内存中缓冲特定数量的数据,以便将网络波动降至最低。请参阅流缓冲。
- 将网络文件下载到本地磁盘,并在下载的数据中进行快速定位(seeking)。这类似于 quicktime/youtube 播放器。请参阅下载缓冲。
- 将(半)实时流缓存到本地磁盘环形缓冲区,并在缓存区域中定位(seeking)。这类似于tivo-like 的时移播放。请参阅时移缓冲。
GStreamer 可以向应用程序提供有关当前缓冲状态的进度报告,并让应用程序决定如何缓冲以及何时停止缓冲。
在最简单的情况下,应用程序必须侦听消息总线上的 BUFFERING 消息。如果 BUFFERING 消息中的用来表示百分比的值小于 100,则pipeline正在缓冲。当接收到 100% 的消息时,缓冲就完成了。在缓冲状态下,应用程序应将pipeline保持在 PAUSED 状态。当缓冲完成时,它可以将pipeline(back)置于 PLAYING 状态。
下面是消息处理程序如何处理 BUFFERING 消息的示例。我们将在缓冲策略中看到更高级的方法。
[...]
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_BUFFERING:{
gint percent;
/* no state management needed for live pipelines */
if (is_live)
break;
gst_message_parse_buffering (message, &percent);
if (percent == 100) {
/* a 100% message means buffering is done */
buffering = FALSE;
/* if the desired state is playing, go back */
if (target_state == GST_STATE_PLAYING) {
gst_element_set_state (pipeline, GST_STATE_PLAYING);
}
} else {
/* buffering busy */
if (!buffering && target_state == GST_STATE_PLAYING) {
/* we were not buffering but PLAYING, PAUSE the pipeline. */
gst_element_set_state (pipeline, GST_STATE_PAUSED);
}
buffering = TRUE;
}
break;
case ...
[...]
Stream buffering
+---------+ +---------+ +-------+
| httpsrc | | buffer | | demux |
| src - sink src - sink ....
+---------+ +---------+ +-------+
在这种情况下,我们正在从慢速网络源将数据读取到 buffer element(例如 queue2)。
buffer element具有以字节表示的低水位线和高水位线。缓冲区使用水位线的方式如下:
- buffer element 将发布 BUFFERING 消息,直到达到高水位线。这指示应用程序保持pipeline PAUSED,这最终将阻止 srcpad 在数据在sinks中预卷时推送。
- 当达到高水位时,将发布一条缓冲 100% 的 BUFFERING 消息,指示应用程序继续播放。
- 当播放期间,低水位线被命中时,queue 将再次开始发布 BUFFERING 消息,使应用程序再次 PAUSE pipeline,直到再次命中高水位线。这称为重新缓冲阶段(rebuffering stage)。
- 在播放过程中,queue的 水位线会在高水位和低水位之间波动,以补偿网络的不规则性。
当demuxer 在推模式下运行时,可以使用这种缓冲方法。在流中Seeking要求seek发生在网络源中。当文件的总持续时间未知时,例如在实时流中或 当不可能/不需要高效seeking 时,这是最可取的。
问题是如何配置一个好的低水位线和高水位线。这里有一些想法:
-
可以通过缓冲占用固定时间的方式来测量网络带宽并配置低/高水位线。
GStreamer 核心中的 queue2 element具有 max-size-time 属性,与 use-rate-estimate 属性一起,完全可以做到这一点。此外, playbin buffer-duration 属性使用速率估计来衡量缓冲的数据量。
- 根据编码的比特率,同样可以设置水位线,以便在播放开始之前缓冲固定数量的数据。通常,buffering element不知道流的比特率,但它可以通过查询获得它。
- 从固定数量的字节开始,测量重新缓冲之间的时间,增加队列大小,直到重新缓冲之间的时间在应用程序选择的限制内。
Buffering element可以插入pipeline中的任何位置。例如,您可以在解码器之前插入buffering element。这将使基于时间设置低/高水位线成为可能。
playbin 上的缓冲标志,对解析的数据执行缓冲。在稍后阶段进行缓冲的另一个优点是您可以让demuxer 在拉模式下运行。从慢速网络驱动器(使用 filesrc)读取数据时,这可能是一种感兴趣的缓冲方式。
Download buffering
+---------+ +---------+ +-------+
| httpsrc | | buffer | | demux |
| src - sink src - sink ....
+---------+ +----|----+ +-------+
V
file
如果我们知道服务器正在向客户端流式传输固定长度的文件,则应用程序可以选择将整个文件下载到磁盘上。buffer element将提供一个基于推或拉的srcpad给demuxer,以在下载的文件中导航。
这种模式只适用于客户端可以确定服务器上文件长度的情况。
在这种情况下,当请求的范围不在下载区域 + 缓冲区大小(downloaded area + buffersize)内时,将照常发出缓冲消息。 缓冲消息还将包含正在执行增量下载的指示。 此标志可用于让应用程序以更智能的方式控制缓冲,例如使用 BUFFERING 查询。 请参阅缓冲策略。
Timeshift buffering(时移缓冲)
+---------+ +---------+ +-------+
| httpsrc | | buffer | | demux |
| src - sink src - sink ....
+---------+ +----|----+ +-------+
V
file-ringbuffer
在这种模式下,会保留一个固定大小的环形缓冲区来下载服务器内容。 这允许在缓冲的数据中seeking 。 可以向前seeking的时间取决于环形缓冲区的大小,。
此模式适用于所有直播流。 与增量下载模式一样,缓冲消息将随着timeshift download 正在进行的指示一起发出。
实时缓冲
在实时pipelines中,我们通常会在拍摄(capture )和播放(playback)element之间引入一些固定延迟。这种延迟可以由队列(例如抖动缓冲区(jitterbuffer))或其他方式(在音频接收器中(audiosink))引入。
缓冲消息也可以在这些实时pipeline中发出,并作为对延迟缓冲用户的指示。应用程序通常不会通过状态更改来响应这些缓冲消息。
缓冲策略
下面是一些基于缓冲消息和缓冲查询实现不同缓冲策略的想法。
无缓冲策略
我们希望在pipeline中缓冲足够的数据,以便继续播放而不会中断。要实现这一点,我们需要知道的是文件中的总剩余播放时间和总剩余下载时间。如果需要的缓冲时间小于播放时间,我们可以不间断地开始播放。
我们可以通过 DURATION、POSITION 和 BUFFERING 查询获得所有这些信息。我们需要定期执行缓冲查询来获取当前的缓冲状态。我们还需要有一个足够大的缓冲区来保存完整的文件,最坏的情况。最好将此缓冲策略与下载缓冲结合使用(请参阅下载缓冲)。
This is what the code would look like:
#include <gst/gst.h>
GstState target_state;
static gboolean is_live;
static gboolean is_buffering;
static gboolean
buffer_timeout (gpointer data)
{
GstElement *pipeline = data;
GstQuery *query;
gboolean busy;
gint percent;
gint64 estimated_total;
gint64 position, duration;
guint64 play_left;
query = gst_query_new_buffering (GST_FORMAT_TIME);
if (!gst_element_query (pipeline, query))
return TRUE;
gst_query_parse_buffering_percent (query, &busy, &percent);
gst_query_parse_buffering_range (query, NULL, NULL, NULL, &estimated_total);
if (estimated_total == -1)
estimated_total = 0;
/* calculate the remaining playback time */
if (!gst_element_query_position (pipeline, GST_FORMAT_TIME, &position))
position = -1;
if (!gst_element_query_duration (pipeline, GST_FORMAT_TIME, &duration))
duration = -1;
if (duration != -1 && position != -1)
play_left = GST_TIME_AS_MSECONDS (duration - position);
else
play_left = 0;
g_message ("play_left %" G_GUINT64_FORMAT", estimated_total %" G_GUINT64_FORMAT
", percent %d", play_left, estimated_total, percent);
/* we are buffering or the estimated download time is bigger than the
* remaining playback time. We keep buffering. */
is_buffering = (busy || estimated_total * 1.1 > play_left);
if (!is_buffering)
gst_element_set_state (pipeline, target_state);
return is_buffering;
}
static void
on_message_buffering (GstBus *bus, GstMessage *message, gpointer user_data)
{
GstElement *pipeline = user_data;
gint percent;
/* no state management needed for live pipelines */
if (is_live)
return;
gst_message_parse_buffering (message, &percent);
if (percent < 100) {
/* buffering busy */
if (!is_buffering) {
is_buffering = TRUE;
if (target_state == GST_STATE_PLAYING) {
/* we were not buffering but PLAYING, PAUSE the pipeline. */
gst_element_set_state (pipeline, GST_STATE_PAUSED);
}
}
}
}
static void
on_message_async_done (GstBus *bus, GstMessage *message, gpointer user_data)
{
GstElement *pipeline = user_data;
if (!is_buffering)
gst_element_set_state (pipeline, target_state);
else
g_timeout_add (500, buffer_timeout, pipeline);
}
gint
main (gint argc,
gchar *argv[])
{
GstElement *pipeline;
GMainLoop *loop;
GstBus *bus;
GstStateChangeReturn ret;
/* init GStreamer */
gst_init (&argc, &argv);
loop = g_main_loop_new (NULL, FALSE);
/* make sure we have a URI */
if (argc != 2) {
g_print ("Usage: %s <URI>\n", argv[0]);
return -1;
}
/* set up */
pipeline = gst_element_factory_make ("playbin", "pipeline");
g_object_set (G_OBJECT (pipeline), "uri", argv[1], NULL);
g_object_set (G_OBJECT (pipeline), "flags", 0x697 , NULL);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
gst_bus_add_signal_watch (bus);
g_signal_connect (bus, "message::buffering",
(GCallback) on_message_buffering, pipeline);
g_signal_connect (bus, "message::async-done",
(GCallback) on_message_async_done, pipeline);
gst_object_unref (bus);
is_buffering = FALSE;
target_state = GST_STATE_PLAYING;
ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
switch (ret) {
case GST_STATE_CHANGE_SUCCESS:
is_live = FALSE;
break;
case GST_STATE_CHANGE_FAILURE:
g_warning ("failed to PAUSE");
return -1;
case GST_STATE_CHANGE_NO_PREROLL:
is_live = TRUE;
break;
default:
break;
}
/* now run */
g_main_loop_run (loop);
/* also clean up */
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
g_main_loop_unref (loop);
return 0;
}
先看看我们如何将pipeline设置为 PAUSED 状态。 当需要缓冲时,我们将在预卷状态期间接收缓冲消息。 当我们已经预卷 (on_message_async_done) 时,我们会查看缓冲是否正在进行,如果没有,我们开始播放。 如果缓冲正在进行,我们开始通过定时器轮询缓冲状态。 如果估计下载时间小于剩余播放时间,我们开始播放。