1. 依赖HDFS
pom.xml 添加依赖
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsATOfd3bkFGazxCMx8VesATMfhHLlN3XnxCMwEzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-YWan5CMkhTZiRWZlljNhdzY1MDZhhDN1kTMzQjZldjYhNjNy8CXyAzLchDMxIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjL0M3Lc9CX6MHc0RHaiojIsJye.gif)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FlinkHdfs</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<log4j.version>2.12.1</log4j.version>
<hive.version>3.1.2</hive.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<!-- 运行FLINK必须-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 读HDFS必须-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- 写HDFS必须-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
View Code
2. 配置 HDFS
将
hdfs-site.xml
和
core-site.xml
放入到
src/main/resources
目录下面
3.写入HDFS
输入参数:
{"deviceType":"0","userNums":0,"newusers":1,"dayActivenums":1,"timeinfoString":"","timeinfo":"2018090704","userId":"1","monthActivenums":1,"weekActivenums":1,"groupByField":"1==0==2018090704","times":1,"hourActivenums":1}
{"deviceType":"1","userNums":0,"newusers":0,"dayActivenums":0,"timeinfoString":"","timeinfo":"2018090705","userId":"2","monthActivenums":0,"weekActivenums":0,"groupByField":"2==1==2018090705","times":1,"hourActivenums":0}
1.主程序
package com.atguigu
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object WriteToHDFS {
def main(args: Array[String]): Unit = {
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.setParallelism(1)
bsEnv.enableCheckpointing(5000L)//一定要开启checkpoint
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092")
properties.setProperty("group.id", "caimoutest3");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream = bsEnv.addSource(new FlinkKafkaConsumer011(
"datainfo4", new SimpleStringSchema(), properties
))
val fileSink = StreamingFileSink
.forRowFormat(new Path("hdfs://hadoop102:9820/dataanlay/liuliang/"),new SimpleStringEncoder[String]("UTF-8"))
.withBucketAssigner(new LiuLiangUserDetailBucketAssigner()) // 自定义分区路径
.withBucketCheckInterval(5*1000)
.build()
stream.addSink(fileSink)
bsEnv.execute("LiuLiangHourUserDetailAnaly")
}
}
2 LiuLiangUserDetailBucketAssigner
package com.atguigu
import java.io.File
import com.alibaba.fastjson.JSON
import org.apache.flink.core.io.SimpleVersionedSerializer
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
class LiuLiangUserDetailBucketAssigner extends BucketAssigner[String,String] {
override def getBucketId(in: String, context: BucketAssigner.Context): String = {
System.out.println(in)
val dateString = JSON.parseObject(in).getString("timeinfo")
val result = dateString.substring(0, 8) + "/" + dateString.substring(8, 10)
System.out.println(result)
result
}
override def getSerializer: SimpleVersionedSerializer[String] = new LiuLiangStringSerializer
def main(args: Array[String]): Unit = {
val dateString = "2018090707"
val result = dateString.substring(0, 8) + File.separator + dateString.substring(8, 10)
System.out.println(result)
}
}
3 LiuLiangStringSerializer
package com.atguigu
import org.apache.flink.core.io.SimpleVersionedSerializer
class LiuLiangStringSerializer extends SimpleVersionedSerializer[String]{
override def getVersion: Int = 0
override def serialize(e: String): Array[Byte] = e.getBytes()
override def deserialize(i: Int, bytes: Array[Byte]): String = {
if (i != 77){
throw new Exception("version mismatch")
}else{
new String(bytes)
}
}
}
TIP
- 请关闭HDFS 权限,不关闭需要把认证copy到resources目录下
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>