天天看點

spark structured-streaming 最全的使用總結

一、spark structured-streaming  介紹

       我們都知道spark streaming  在v2.4.5 之後 就進入了維護階段,不再有新的大版本出現,而且 spark streaming  一直是按照微批來處理streaming  資料的,隻能做到準實時,無法像flink一樣做到資料的實時資料處理。是以在spark streaming  進入到不再更新的維護階段後,spark 推出了 structured-streaming 來同flink 進行競争,structured-streaming 支援視窗計算,watermark、state 等flink 一樣的特性。

作者:張永清,轉載請注明出處為部落格園:https://www.cnblogs.com/laoqing/p/15517078.html

二、spark structured-streaming  如何對接kafka 資料源

jar依賴:

1、從kafka資料源讀取資料

2、Creating a Kafka Source for Batch Queries

schema:

Column

Type

key

binary

value

topic

string

partition

int

offset

long

timestamp

timestampType

headers (optional)

array

 Option參數:

Option

meaning

assign

json string {"topicA":[0,1],"topicB":[2,4]}

Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source.

subscribe

A comma-separated list of topics

The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source.

subscribePattern

Java regex string

The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source.

kafka.bootstrap.servers

A comma-separated list of host:port

The Kafka "bootstrap.servers" configuration.

Option可選參數:

default

query type

startingTimestamp

timestamp string e.g. "1000"

none (next preference is <code>startingOffsetsByTimestamp</code>)

streaming and batch

The start point of timestamp when a query is started, a string specifying a starting timestamp for all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the behavior will follow to the value of the option <code>startingOffsetsByTimestampStrategy</code>

Note1: <code>startingTimestamp</code> takes precedence over <code>startingOffsetsByTimestamp</code> and <code>startingOffsets</code>.

Note2: For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

startingOffsetsByTimestamp

json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """

none (next preference is <code>startingOffsets</code>)

The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the behavior will follow to the value of the option <code>startingOffsetsByTimestampStrategy</code>

Note1: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.

startingOffsets

"earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """

"latest" for streaming, "earliest" for batch

The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

endingTimestamp

none (next preference is <code>endingOffsetsByTimestamp</code>)

batch query

The end point when a batch query is ended, a json string specifying an ending timestamp for all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the offset will be set to latest.

Note: <code>endingTimestamp</code> takes precedence over <code>endingOffsetsByTimestamp</code> and <code>endingOffsets</code>.

endingOffsetsByTimestamp

none (next preference is <code>endingOffsets</code>)

The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the offset will be set to latest.

Note: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.

endingOffsets

latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}

latest

The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.

failOnDataLoss

true or false

true

Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.

kafkaConsumer.pollTimeoutMs

120000

The timeout in milliseconds to poll data from Kafka in executors. When not defined it falls back to <code>spark.network.timeout</code>.

fetchOffset.numRetries

3

Number of times to retry before giving up fetching Kafka offsets.

fetchOffset.retryIntervalMs

10

milliseconds to wait before retrying to fetch Kafka offsets

maxOffsetsPerTrigger

none

streaming query

Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

minOffsetsPerTrigger

Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. Note, if the maxTriggerDelay is exceeded, a trigger will be fired even if the number of available offsets doesn't reach minOffsetsPerTrigger.

maxTriggerDelay

time with units

15m

Maximum amount of time for which trigger can be delayed between two triggers provided some data is available from the source. This option is only applicable if minOffsetsPerTrigger is set.

minPartitions

Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. If you set this option to a value greater than your topicPartitions, Spark will divvy up large Kafka partitions to smaller pieces. Please note that this configuration is like a <code>hint</code>: the number of Spark tasks will be approximately <code>minPartitions</code>. It can be less or more depending on rounding errors or Kafka partitions that didn't receive any new data.

groupIdPrefix

spark-kafka-source

Prefix of consumer group identifiers (<code>group.id</code>) that are generated by structured streaming queries. If "kafka.group.id" is set, this option will be ignored.

kafka.group.id

The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.

includeHeaders

boolean

false

Whether to include the Kafka headers in the row.

startingOffsetsByTimestampStrategy

"error" or "latest"

"error"

The strategy will be used when the specified starting offset by timestamp (either global or per partition) doesn't match with the offset Kafka returned. Here's the strategy name and corresponding descriptions:

"error": fail the query and end users have to deal with workarounds requiring manual steps.

"latest": assigns the latest offset for these partitions, so that Spark can read newer records from these partitions in further micro-batches.

It’s time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor. Because of this, Spark pools Kafka consumers on executors, by leveraging Apache Commons Pool.

The caching key is built up from the following information:

Topic name

Topic partition

Group ID

The following properties are available to configure the consumer pool:

Property Name

Default

Meaning

Since Version

spark.kafka.consumer.cache.capacity

64

The maximum number of consumers cached. Please note that it's a soft limit.

3.0.0

spark.kafka.consumer.cache.timeout

5m (5 minutes)

The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.

spark.kafka.consumer.cache.evictorThreadRunInterval

1m (1 minute)

The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.

spark.kafka.consumer.cache.jmx.enable

Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance. The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".

The size of the pool is limited by <code>spark.kafka.consumer.cache.capacity</code>, but it works as “soft-limit” to not block Spark tasks.

Idle eviction thread periodically removes consumers which are not used longer than given timeout. If this threshold is reached when borrowing, it tries to remove the least-used entry that is currently not in use.

If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to the max number of concurrent tasks that can run in the executor (that is, number of task slots).

If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons. At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used in failed execution. Consumers which any other tasks are using will not be closed, but will be invalidated as well when they are returned into pool.

Along with consumers, Spark pools the records fetched from Kafka separately, to let Kafka consumers stateless in point of Spark’s view, and maximize the efficiency of pooling. It leverages same cache key with Kafka consumers pool. Note that it doesn’t leverage Apache Commons Pool due to the difference of characteristics.

The following properties are available to configure the fetched data pool:

spark.kafka.consumer.fetchedData.cache.timeout

The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.

spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval

The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.

特别注意:

spark structured-streaming 最全的使用總結

3、streaming資料寫入kafka sink

  

The Dataframe being written to Kafka should have the following columns in schema:

key (optional)

string or binary

value (required)

topic (*optional)

partition (optional)

* The topic column is required if the “topic” configuration option is not specified.

The value column is the only required option. If a key column is not specified then a <code>null</code> valued key column will be automatically added (see Kafka semantics on how <code>null</code> valued key values are handled). If a topic column exists then its value is used as the topic when writing the given row to Kafka, unless the “topic” configuration option is set i.e., the “topic” configuration option overrides the topic column. If a “partition” column is not specified (or its value is <code>null</code>) then the partition is calculated by the Kafka producer. A Kafka partitioner can be specified in Spark by setting the <code>kafka.partitioner.class</code> option. If not present, Kafka default partitioner will be used.

The following options must be set for the Kafka sink for both batch and streaming queries.

The following configurations are optional:

Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data.

Given Kafka producer instance is designed to be thread-safe, Spark initializes a Kafka producer instance and co-use across tasks for same caching key.

Kafka producer configuration

This includes configuration for authorization, which Spark will automatically include when delegation token is being used. Even we take authorization into account, you can expect same Kafka producer instance will be used among same Kafka producer configuration. It will use different Kafka producer when delegation token is renewed; Kafka producer instance for old delegation token will be evicted according to the cache policy.

The following properties are available to configure the producer pool:

spark.kafka.producer.cache.timeout

10m (10 minutes)

The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.

2.2.1

spark.kafka.producer.cache.evictorThreadRunInterval

The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.

Idle eviction thread periodically removes producers which are not used longer than given timeout. Note that the producer is shared and used concurrently, so the last used timestamp is determined by the moment the producer instance is returned and reference count is 0.

Kafka’s own configurations can be set via <code>DataStreamReader.option</code> with <code>kafka.</code> prefix, e.g, <code>stream.option("kafka.bootstrap.servers", "host:port")</code>. For possible kafka parameters, see Kafka consumer config docs for parameters related to reading data, and Kafka producer config docs for parameters related to writing data.

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

group.id: Kafka source will create a unique group id for each query automatically. The user can set the prefix of the automatically generated group.id’s via the optional source option <code>groupIdPrefix</code>, default value is “spark-kafka-source”. You can also set “kafka.group.id” to force Spark to use a special group id, however, please read warnings for this option and use it with caution.

auto.offset.reset: Set the source option <code>startingOffsets</code> to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that <code>startingOffsets</code> only applies when a new streaming query is started, and that resuming will always pick up from where the query left off. Note that when the offsets consumed by a streaming application no longer exist in Kafka (e.g., topics are deleted, offsets are out of range, or offsets are removed after retention period), the offsets will not be reset and the streaming application will see data loss. In extreme cases, for example the throughput of the streaming application cannot catch up the retention speed of Kafka, the input rows of a batch might be gradually reduced until zero when the offset ranges of the batch are completely not in Kafka. Enabling <code>failOnDataLoss</code> option can ask Structured Streaming to fail the query for such cases.

key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.

value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

key.serializer: Keys are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the keys into either strings or byte arrays.

value.serializer: values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the values into either strings or byte arrays.

enable.auto.commit: Kafka source doesn’t commit any offset.

interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.

1、從event hub資料源讀取資料 

2、streaming資料寫入 event hub

更多詳情請參考:https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md#eventhubsconf

https://docs.microsoft.com/zh-cn/azure/databricks/_static/notebooks/structured-streaming-event-hubs-integration.html

注意:可以通過.setMaxEventsPerTrigger(3000) 控制每次從event hub讀取的資料條數

四、spark structured-streaming  的write 輸出模式

There are a few types of output modes.

Append mode (default) - This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only <code>select</code>, <code>where</code>, <code>map</code>, <code>flatMap</code>, <code>filter</code>, <code>join</code>, etc. will support Append mode.

Complete mode - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.

Update mode - (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.

Different types of streaming queries support different output modes. Here is the compatibility matrix.

Query Type

Supported Output Modes

Notes

Queries with aggregation

Aggregation on event-time with watermark

Append, Update, Complete

Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in <code>withWatermark()</code> as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.

Update mode uses watermark to drop old aggregation state.

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.

Other aggregations

Complete, Update

Since no watermark is defined (only defined in other category), old aggregation state is not dropped.

Append mode is not supported as aggregates can update thus violating the semantics of this mode.

Queries with <code>mapGroupsWithState</code>

Update

Aggregations not allowed in a query with <code>mapGroupsWithState</code>.

Queries with <code>flatMapGroupsWithState</code>

Append operation mode

Append

Aggregations are allowed after <code>flatMapGroupsWithState</code>.

Update operation mode

Aggregations not allowed in a query with <code>flatMapGroupsWithState</code>.

Queries with <code>joins</code>

Update and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.

Other queries

Append, Update

Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.

五、spark structured-streaming  的其他sink

Sink

Options

Fault-tolerant

File Sink

<code>path</code>: path to the output directory, must be specified.

<code>retention</code>: time to live (TTL) for output files. Output files which batches were committed older than TTL will be eventually excluded in metadata log. This means reader queries which read the sink's output directory may not process them. You can provide the value as string format of the time. (like "12h", "7d", etc.) By default it's disabled.

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>

Yes (exactly-once)

Supports writes to partitioned tables. Partitioning by time may be useful.

Kafka Sink

See the Kafka Integration Guide

Yes (at-least-once)

More details in the Kafka Integration Guide

Foreach Sink

None

More details in the next section

ForeachBatch Sink

Depends on the implementation

Console Sink

<code>numRows</code>: Number of rows to print every trigger (default: 20)

<code>truncate</code>: Whether to truncate the output if too long (default: true)

No

Memory Sink

Append, Complete

No. But in Complete Mode, restarted query will recreate the full table.

Table name is the query name.

The <code>foreach</code> and <code>foreachBatch</code> operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while <code>foreach</code> allows custom write logic on every row, <code>foreachBatch</code> allows arbitrary operations and custom logic on the output of each micro-batch. Let’s understand their usages in more detail.

ForeachBatch

<code>foreachBatch(...)</code> allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.

Foreach

If <code>foreachBatch</code> is not an option (for example, corresponding batch data writer does not exist, or continuous processing mode), then you can express your custom writer logic using <code>foreach</code>. Specifically, you can express the data writing logic by dividing it into three methods: <code>open</code>, <code>process</code>, and <code>close</code>. Since Spark 2.4, <code>foreach</code> is available in Scala, Java and Python.

六、spark structured-streaming  的State Store

State store is a versioned key-value store which provides both read and write operations. In Structured Streaming, we use the state store provider to handle the stateful operations across batches. There are two built-in state store provider implementations. End users can also implement their own state store provider by extending StateStoreProvider interface.

The HDFS backend state store provider is the default implementation of [[StateStoreProvider]] and [[StateStore]] in which all the data is stored in memory map in the first stage, and then backed by files in an HDFS-compatible file system. All updates to the store have to be done in sets transactionally, and each set of updates increments the store’s version. These versions can be used to re-execute the updates (by retries in RDD operations) on the correct version of the store, and regenerate the store version.

As of Spark 3.2, we add a new built-in state store implementation, RocksDB state store provider.

If you have stateful operations in your streaming query (for example, streaming aggregation, streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or flatMapGroupsWithState) and you want to maintain millions of keys in the state, then you may face issues related to large JVM garbage collection (GC) pauses causing high variations in the micro-batch processing times. This occurs because, by the implementation of HDFSBackedStateStore, the state data is maintained in the JVM memory of the executors and large number of state objects puts memory pressure on the JVM causing high GC pauses.

In such cases, you can choose to use a more optimized state management solution based on RocksDB. Rather than keeping the state in the JVM memory, this solution uses RocksDB to efficiently manage the state in the native memory and the local disk. Furthermore, any changes to this state are automatically saved by Structured Streaming to the checkpoint location you have provided, thus providing full fault-tolerance guarantees (the same as default state management).

To enable the new build-in state store implementation, set <code>spark.sql.streaming.stateStore.providerClass</code> to <code>org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider</code>.

在databricks的商業spark中,可以通過spark.conf.set( "spark.sql.streaming.stateStore.providerClass", "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"),更多請參考:https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/production#--configure-rocksdb-state-store

Here are the configs regarding to RocksDB instance of the state store provider:

Config Name

Description

Default Value

spark.sql.streaming.stateStore.rocksdb.compactOnCommit

Whether we perform a range compaction of RocksDB instance for commit operation

False

spark.sql.streaming.stateStore.rocksdb.blockSizeKB

Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format.

4

spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB

The size capacity in MB for a cache of blocks.

8

spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs

The waiting time in millisecond for acquiring lock in the load operation for RocksDB instance.

60000

spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad

Whether we resets all ticker and histogram stats for RocksDB on load.

True

The stateful operations store states for events in state stores of executors. State stores occupy resources such as memory and disk space to store the states. So it is more efficient to keep a state store provider running in the same executor across different streaming batches. Changing the location of a state store provider requires the extra overhead of loading checkpointed states. The overhead of loading state from checkpoint depends on the external storage and the size of the state, which tends to hurt the latency of micro-batch run. For some use cases such as processing very large state data, loading new state store providers from checkpointed states can be very time-consuming and inefficient.

The stateful operations in Structured Streaming queries rely on the preferred location feature of Spark’s RDD to run the state store provider on the same executor. If in the next batch the corresponding state store provider is scheduled on this executor again, it could reuse the previous states and save the time of loading checkpointed states.

However, generally the preferred location is not a hard requirement and it is still possible that Spark schedules tasks to the executors other than the preferred ones. In this case, Spark will load state store providers from checkpointed states on new executors. The state store providers run in the previous batch will not be unloaded immediately. Spark runs a maintenance task which checks and unloads the state store providers that are inactive on the executors.

By changing the Spark configurations related to task scheduling, for example <code>spark.locality.wait</code>, users can configure Spark how long to wait to launch a data-local task. For stateful operations in Structured Streaming, it can be used to let state store providers running on the same executors across batches.

Specifically for built-in HDFS state store provider, users can check the state store metrics such as <code>loadedMapCacheHitCount</code> and <code>loadedMapCacheMissCount</code>. Ideally, it is best if cache missing count is minimized that means Spark won’t waste too much time on loading checkpointed state. User can increase Spark locality waiting configurations to avoid loading state store providers in different executors across batches.

七、spark structured-streaming Join

未完待續

更多詳情請參考:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html

作者的原創文章,轉載須注明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對于轉載了部落客的原創文章,不标注出處的,作者将依法追究版權,請尊重作者的成果。

繼續閱讀