一、spark structured-streaming 介紹
我們都知道spark streaming 在v2.4.5 之後 就進入了維護階段,不再有新的大版本出現,而且 spark streaming 一直是按照微批來處理streaming 資料的,隻能做到準實時,無法像flink一樣做到資料的實時資料處理。是以在spark streaming 進入到不再更新的維護階段後,spark 推出了 structured-streaming 來同flink 進行競争,structured-streaming 支援視窗計算,watermark、state 等flink 一樣的特性。
作者:張永清,轉載請注明出處為部落格園:https://www.cnblogs.com/laoqing/p/15517078.html
二、spark structured-streaming 如何對接kafka 資料源
jar依賴:
1、從kafka資料源讀取資料
2、Creating a Kafka Source for Batch Queries
schema:
Column
Type
key
binary
value
topic
string
partition
int
offset
long
timestamp
timestampType
headers (optional)
array
Option參數:
Option
meaning
assign
json string {"topicA":[0,1],"topicB":[2,4]}
Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source.
subscribe
A comma-separated list of topics
The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source.
subscribePattern
Java regex string
The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source.
kafka.bootstrap.servers
A comma-separated list of host:port
The Kafka "bootstrap.servers" configuration.
Option可選參數:
default
query type
startingTimestamp
timestamp string e.g. "1000"
none (next preference is <code>startingOffsetsByTimestamp</code>)
streaming and batch
The start point of timestamp when a query is started, a string specifying a starting timestamp for all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the behavior will follow to the value of the option <code>startingOffsetsByTimestampStrategy</code>
Note1: <code>startingTimestamp</code> takes precedence over <code>startingOffsetsByTimestamp</code> and <code>startingOffsets</code>.
Note2: For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
startingOffsetsByTimestamp
json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
none (next preference is <code>startingOffsets</code>)
The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the behavior will follow to the value of the option <code>startingOffsetsByTimestampStrategy</code>
Note1: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.
startingOffsets
"earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
"latest" for streaming, "earliest" for batch
The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
endingTimestamp
none (next preference is <code>endingOffsetsByTimestamp</code>)
batch query
The end point when a batch query is ended, a json string specifying an ending timestamp for all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the offset will be set to latest.
Note: <code>endingTimestamp</code> takes precedence over <code>endingOffsetsByTimestamp</code> and <code>endingOffsets</code>.
endingOffsetsByTimestamp
none (next preference is <code>endingOffsets</code>)
The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the offset will be set to latest.
Note: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
endingOffsets
latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
latest
The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.
failOnDataLoss
true or false
true
Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
kafkaConsumer.pollTimeoutMs
120000
The timeout in milliseconds to poll data from Kafka in executors. When not defined it falls back to <code>spark.network.timeout</code>.
fetchOffset.numRetries
3
Number of times to retry before giving up fetching Kafka offsets.
fetchOffset.retryIntervalMs
10
milliseconds to wait before retrying to fetch Kafka offsets
maxOffsetsPerTrigger
none
streaming query
Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
minOffsetsPerTrigger
Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. Note, if the maxTriggerDelay is exceeded, a trigger will be fired even if the number of available offsets doesn't reach minOffsetsPerTrigger.
maxTriggerDelay
time with units
15m
Maximum amount of time for which trigger can be delayed between two triggers provided some data is available from the source. This option is only applicable if minOffsetsPerTrigger is set.
minPartitions
Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. If you set this option to a value greater than your topicPartitions, Spark will divvy up large Kafka partitions to smaller pieces. Please note that this configuration is like a <code>hint</code>: the number of Spark tasks will be approximately <code>minPartitions</code>. It can be less or more depending on rounding errors or Kafka partitions that didn't receive any new data.
groupIdPrefix
spark-kafka-source
Prefix of consumer group identifiers (<code>group.id</code>) that are generated by structured streaming queries. If "kafka.group.id" is set, this option will be ignored.
kafka.group.id
The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.
includeHeaders
boolean
false
Whether to include the Kafka headers in the row.
startingOffsetsByTimestampStrategy
"error" or "latest"
"error"
The strategy will be used when the specified starting offset by timestamp (either global or per partition) doesn't match with the offset Kafka returned. Here's the strategy name and corresponding descriptions:
"error": fail the query and end users have to deal with workarounds requiring manual steps.
"latest": assigns the latest offset for these partitions, so that Spark can read newer records from these partitions in further micro-batches.
It’s time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor. Because of this, Spark pools Kafka consumers on executors, by leveraging Apache Commons Pool.
The caching key is built up from the following information:
Topic name
Topic partition
Group ID
The following properties are available to configure the consumer pool:
Property Name
Default
Meaning
Since Version
spark.kafka.consumer.cache.capacity
64
The maximum number of consumers cached. Please note that it's a soft limit.
3.0.0
spark.kafka.consumer.cache.timeout
5m (5 minutes)
The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.
spark.kafka.consumer.cache.evictorThreadRunInterval
1m (1 minute)
The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.
spark.kafka.consumer.cache.jmx.enable
Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance. The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".
The size of the pool is limited by <code>spark.kafka.consumer.cache.capacity</code>, but it works as “soft-limit” to not block Spark tasks.
Idle eviction thread periodically removes consumers which are not used longer than given timeout. If this threshold is reached when borrowing, it tries to remove the least-used entry that is currently not in use.
If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to the max number of concurrent tasks that can run in the executor (that is, number of task slots).
If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons. At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used in failed execution. Consumers which any other tasks are using will not be closed, but will be invalidated as well when they are returned into pool.
Along with consumers, Spark pools the records fetched from Kafka separately, to let Kafka consumers stateless in point of Spark’s view, and maximize the efficiency of pooling. It leverages same cache key with Kafka consumers pool. Note that it doesn’t leverage Apache Commons Pool due to the difference of characteristics.
The following properties are available to configure the fetched data pool:
spark.kafka.consumer.fetchedData.cache.timeout
The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval
The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.
特别注意:
3、streaming資料寫入kafka sink
The Dataframe being written to Kafka should have the following columns in schema:
key (optional)
string or binary
value (required)
topic (*optional)
partition (optional)
* The topic column is required if the “topic” configuration option is not specified.
The value column is the only required option. If a key column is not specified then a <code>null</code> valued key column will be automatically added (see Kafka semantics on how <code>null</code> valued key values are handled). If a topic column exists then its value is used as the topic when writing the given row to Kafka, unless the “topic” configuration option is set i.e., the “topic” configuration option overrides the topic column. If a “partition” column is not specified (or its value is <code>null</code>) then the partition is calculated by the Kafka producer. A Kafka partitioner can be specified in Spark by setting the <code>kafka.partitioner.class</code> option. If not present, Kafka default partitioner will be used.
The following options must be set for the Kafka sink for both batch and streaming queries.
The following configurations are optional:
Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data.
Given Kafka producer instance is designed to be thread-safe, Spark initializes a Kafka producer instance and co-use across tasks for same caching key.
Kafka producer configuration
This includes configuration for authorization, which Spark will automatically include when delegation token is being used. Even we take authorization into account, you can expect same Kafka producer instance will be used among same Kafka producer configuration. It will use different Kafka producer when delegation token is renewed; Kafka producer instance for old delegation token will be evicted according to the cache policy.
The following properties are available to configure the producer pool:
spark.kafka.producer.cache.timeout
10m (10 minutes)
The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.
2.2.1
spark.kafka.producer.cache.evictorThreadRunInterval
The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.
Idle eviction thread periodically removes producers which are not used longer than given timeout. Note that the producer is shared and used concurrently, so the last used timestamp is determined by the moment the producer instance is returned and reference count is 0.
Kafka’s own configurations can be set via <code>DataStreamReader.option</code> with <code>kafka.</code> prefix, e.g, <code>stream.option("kafka.bootstrap.servers", "host:port")</code>. For possible kafka parameters, see Kafka consumer config docs for parameters related to reading data, and Kafka producer config docs for parameters related to writing data.
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
group.id: Kafka source will create a unique group id for each query automatically. The user can set the prefix of the automatically generated group.id’s via the optional source option <code>groupIdPrefix</code>, default value is “spark-kafka-source”. You can also set “kafka.group.id” to force Spark to use a special group id, however, please read warnings for this option and use it with caution.
auto.offset.reset: Set the source option <code>startingOffsets</code> to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that <code>startingOffsets</code> only applies when a new streaming query is started, and that resuming will always pick up from where the query left off. Note that when the offsets consumed by a streaming application no longer exist in Kafka (e.g., topics are deleted, offsets are out of range, or offsets are removed after retention period), the offsets will not be reset and the streaming application will see data loss. In extreme cases, for example the throughput of the streaming application cannot catch up the retention speed of Kafka, the input rows of a batch might be gradually reduced until zero when the offset ranges of the batch are completely not in Kafka. Enabling <code>failOnDataLoss</code> option can ask Structured Streaming to fail the query for such cases.
key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
key.serializer: Keys are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
value.serializer: values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the values into either strings or byte arrays.
enable.auto.commit: Kafka source doesn’t commit any offset.
interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.
1、從event hub資料源讀取資料
2、streaming資料寫入 event hub
更多詳情請參考:https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md#eventhubsconf
https://docs.microsoft.com/zh-cn/azure/databricks/_static/notebooks/structured-streaming-event-hubs-integration.html
注意:可以通過.setMaxEventsPerTrigger(3000) 控制每次從event hub讀取的資料條數
四、spark structured-streaming 的write 輸出模式
There are a few types of output modes.
Append mode (default) - This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only <code>select</code>, <code>where</code>, <code>map</code>, <code>flatMap</code>, <code>filter</code>, <code>join</code>, etc. will support Append mode.
Complete mode - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.
Update mode - (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.
Different types of streaming queries support different output modes. Here is the compatibility matrix.
Query Type
Supported Output Modes
Notes
Queries with aggregation
Aggregation on event-time with watermark
Append, Update, Complete
Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in <code>withWatermark()</code> as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.
Update mode uses watermark to drop old aggregation state.
Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregations
Complete, Update
Since no watermark is defined (only defined in other category), old aggregation state is not dropped.
Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with <code>mapGroupsWithState</code>
Update
Aggregations not allowed in a query with <code>mapGroupsWithState</code>.
Queries with <code>flatMapGroupsWithState</code>
Append operation mode
Append
Aggregations are allowed after <code>flatMapGroupsWithState</code>.
Update operation mode
Aggregations not allowed in a query with <code>flatMapGroupsWithState</code>.
Queries with <code>joins</code>
Update and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
Other queries
Append, Update
Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
五、spark structured-streaming 的其他sink
Sink
Options
Fault-tolerant
File Sink
<code>path</code>: path to the output directory, must be specified.
<code>retention</code>: time to live (TTL) for output files. Output files which batches were committed older than TTL will be eventually excluded in metadata log. This means reader queries which read the sink's output directory may not process them. You can provide the value as string format of the time. (like "12h", "7d", etc.) By default it's disabled.
For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
Yes (exactly-once)
Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka Sink
See the Kafka Integration Guide
Yes (at-least-once)
More details in the Kafka Integration Guide
Foreach Sink
None
More details in the next section
ForeachBatch Sink
Depends on the implementation
Console Sink
<code>numRows</code>: Number of rows to print every trigger (default: 20)
<code>truncate</code>: Whether to truncate the output if too long (default: true)
No
Memory Sink
Append, Complete
No. But in Complete Mode, restarted query will recreate the full table.
Table name is the query name.
The <code>foreach</code> and <code>foreachBatch</code> operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while <code>foreach</code> allows custom write logic on every row, <code>foreachBatch</code> allows arbitrary operations and custom logic on the output of each micro-batch. Let’s understand their usages in more detail.
ForeachBatch
<code>foreachBatch(...)</code> allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
Foreach
If <code>foreachBatch</code> is not an option (for example, corresponding batch data writer does not exist, or continuous processing mode), then you can express your custom writer logic using <code>foreach</code>. Specifically, you can express the data writing logic by dividing it into three methods: <code>open</code>, <code>process</code>, and <code>close</code>. Since Spark 2.4, <code>foreach</code> is available in Scala, Java and Python.
六、spark structured-streaming 的State Store
State store is a versioned key-value store which provides both read and write operations. In Structured Streaming, we use the state store provider to handle the stateful operations across batches. There are two built-in state store provider implementations. End users can also implement their own state store provider by extending StateStoreProvider interface.
The HDFS backend state store provider is the default implementation of [[StateStoreProvider]] and [[StateStore]] in which all the data is stored in memory map in the first stage, and then backed by files in an HDFS-compatible file system. All updates to the store have to be done in sets transactionally, and each set of updates increments the store’s version. These versions can be used to re-execute the updates (by retries in RDD operations) on the correct version of the store, and regenerate the store version.
As of Spark 3.2, we add a new built-in state store implementation, RocksDB state store provider.
If you have stateful operations in your streaming query (for example, streaming aggregation, streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or flatMapGroupsWithState) and you want to maintain millions of keys in the state, then you may face issues related to large JVM garbage collection (GC) pauses causing high variations in the micro-batch processing times. This occurs because, by the implementation of HDFSBackedStateStore, the state data is maintained in the JVM memory of the executors and large number of state objects puts memory pressure on the JVM causing high GC pauses.
In such cases, you can choose to use a more optimized state management solution based on RocksDB. Rather than keeping the state in the JVM memory, this solution uses RocksDB to efficiently manage the state in the native memory and the local disk. Furthermore, any changes to this state are automatically saved by Structured Streaming to the checkpoint location you have provided, thus providing full fault-tolerance guarantees (the same as default state management).
To enable the new build-in state store implementation, set <code>spark.sql.streaming.stateStore.providerClass</code> to <code>org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider</code>.
在databricks的商業spark中,可以通過spark.conf.set( "spark.sql.streaming.stateStore.providerClass", "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"),更多請參考:https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/production#--configure-rocksdb-state-store
Here are the configs regarding to RocksDB instance of the state store provider:
Config Name
Description
Default Value
spark.sql.streaming.stateStore.rocksdb.compactOnCommit
Whether we perform a range compaction of RocksDB instance for commit operation
False
spark.sql.streaming.stateStore.rocksdb.blockSizeKB
Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format.
4
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB
The size capacity in MB for a cache of blocks.
8
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs
The waiting time in millisecond for acquiring lock in the load operation for RocksDB instance.
60000
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad
Whether we resets all ticker and histogram stats for RocksDB on load.
True
The stateful operations store states for events in state stores of executors. State stores occupy resources such as memory and disk space to store the states. So it is more efficient to keep a state store provider running in the same executor across different streaming batches. Changing the location of a state store provider requires the extra overhead of loading checkpointed states. The overhead of loading state from checkpoint depends on the external storage and the size of the state, which tends to hurt the latency of micro-batch run. For some use cases such as processing very large state data, loading new state store providers from checkpointed states can be very time-consuming and inefficient.
The stateful operations in Structured Streaming queries rely on the preferred location feature of Spark’s RDD to run the state store provider on the same executor. If in the next batch the corresponding state store provider is scheduled on this executor again, it could reuse the previous states and save the time of loading checkpointed states.
However, generally the preferred location is not a hard requirement and it is still possible that Spark schedules tasks to the executors other than the preferred ones. In this case, Spark will load state store providers from checkpointed states on new executors. The state store providers run in the previous batch will not be unloaded immediately. Spark runs a maintenance task which checks and unloads the state store providers that are inactive on the executors.
By changing the Spark configurations related to task scheduling, for example <code>spark.locality.wait</code>, users can configure Spark how long to wait to launch a data-local task. For stateful operations in Structured Streaming, it can be used to let state store providers running on the same executors across batches.
Specifically for built-in HDFS state store provider, users can check the state store metrics such as <code>loadedMapCacheHitCount</code> and <code>loadedMapCacheMissCount</code>. Ideally, it is best if cache missing count is minimized that means Spark won’t waste too much time on loading checkpointed state. User can increase Spark locality waiting configurations to avoid loading state store providers in different executors across batches.
七、spark structured-streaming Join
未完待續
更多詳情請參考:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html
作者的原創文章,轉載須注明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對于轉載了部落客的原創文章,不标注出處的,作者将依法追究版權,請尊重作者的成果。