天天看点

Spark Streaming 08 Spark Streaming整合flume(一)push 方式

代码地址

1)添加flume-push-streaming.conf配置文件

# Name the components on this agent
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = localhost
simple-agent.sources.netcat-source.port = 

simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname=localhost
simple-agent.sinks.avro-sink.port=

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel
           

2)添加依赖 pom.xml

<!-- sparkstreaming & flume-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
           

3)FlumePushWC.scala

package com.lihaogn.sparkFlume

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * spark streaming & flume => push
  */
object FlumePushWC {

  def main(args: Array[String]): Unit = {

    if (args.length != ) {
      System.err.println("usage: flumepushwc <hostname> <por>")
      System.exit()
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf()

    val ssc = new StreamingContext(sparkConf, Seconds())

    // 使用sparkstreaming整合flume
    val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)

    flumeStream.map(x => new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" ")).map((_, )).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  }

}
           

4)编译工程

mvn clean package -DskipTests
           

5)部署 spark streaming 作业

spark-submit \
--class com.lihaogn.sparkFlume.FlumePushWC \
--master local[] \
--jars /Users/Mac/software/spark-streaming-flume-assembly_2.-..jar \
/Users/Mac/my-lib/Kafka-train-1.0.jar \
localhost 41414
           

6)启动flume

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-push-streaming.conf \
--name simple-agent \
-Dflume.root.logger=INFO,console
           

7)通过telnet输入数据,观测控制台输出

Spark Streaming 08 Spark Streaming整合flume(一)push 方式
Spark Streaming 08 Spark Streaming整合flume(一)push 方式
Spark Streaming 08 Spark Streaming整合flume(一)push 方式

继续阅读