参考链接:
https://www.cnblogs.com/codetouse/p/12746612.html
https://www.cnblogs.com/kancy/p/13912443.html
https://www.e-learn.cn/content/wangluowenzhang/227836
hadoop-common依赖版本不对
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<!-- <version>2.6.5</version>-->
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.leadeon</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.12</scala.version>
<flink.version>1.13.3</flink.version>
<mockito.version>2.8.9</mockito.version>
<powermock.version>1.7.4</powermock.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<!--scala类打包需要-->
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<classifier>dist</classifier>
<appendAssemblyId>true</appendAssemblyId>
<descriptorRefs>
<descriptor>jar-with-dependencies</descriptor>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>false</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path</javacArg>
</javacArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>4.7</version>
<executions>
<execution>
<id>antlr</id>
<goals>
<goal>antlr4</goal>
</goals>
<phase>none</phase>
</execution>
</executions>
<configuration>
<outputDirectory>src/test/java</outputDirectory>
<listener>true</listener>
<treatWarningsAsErrors>true</treatWarningsAsErrors>
</configuration>
</plugin>
</plugins>
</build>
</project>
注意:早一天要提早建好分区,要不了分区数据不会进去hive元数据中,数据也查询不出来。
alter table bus_login_dtl_bak2 add if not exists partition(dt=‘20220208’)
location ‘/apps/hive/datahouse/cmbh_real_log/bus_login_dtl/dt=20220208’;
package com.leadeon.utils;
import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
public class HdfsBasePathBucketer extends BasePathBucketer<String> {
private static final long serialVersionUID = 1L;
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
String dateString = formatter.format(new Date().getTime());
@Override
public Path getBucketPath(Clock clock, Path basePath, String element) {
Path path = new Path(basePath + File.separator + "dt=" + dateString);
return path;
}
}
package com.leadeon.service
import com.typesafe.config.ConfigFactory
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import com.leadeon.utils.HdfsBasePathBucketer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
object bos_login_dtl {
def main(args: Array[String]): Unit = {
if (args.length != 1) {
println("usage: set application.conf file path!")
return
}
val Array(brokers, kafkaTopic, group, times,parallelism, outputHDFS) = getConf(args.head)
val date = new SimpleDateFormat("yyyyMMdd").format(new Date)
//1.获取执行器的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000)
env.setParallelism(parallelism.toInt)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//2.kafka配置
val props = new Properties()
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,brokers)
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
//3.设置数据源
val kafkaDataStream= env.addSource(new FlinkKafkaConsumer[String](kafkaTopic,
new SimpleStringSchema(), props))
//4.Storage into hdfs
val sink = new BucketingSink[String](outputHDFS)
// sink.setBucketer(new DateTimeBucketer("yyyyMMdd"))
sink.setBucketer(new HdfsBasePathBucketer)
//每个文件最大小256M,达到后关闭或创建新文件// 每个文件最大小256M,达到后关闭或创建新文件
sink.setBatchSize(1024 * 1024 * 256L)
//设定批次滚动时间翻滚间隔30分钟,达到后关闭或创建新文件,和上面的`batchSize`双重检查决定
sink.setBatchRolloverInterval(times.toLong * 1000L)
//设定不活动桶时间阈值,超过此值便关闭文件
sink.setInactiveBucketThreshold(3 * 60 * 1000L)
//设定检查不活动桶的频率
sink.setInactiveBucketCheckInterval(30 * 1000L)
//添加sink 将数据输出到hdfs
// kafkaDataStream.map(_.replaceAll("\\|#\\$", "\t")).addSink(sink)
kafkaDataStream.map(new MapOperator).addSink(sink)
// 打印数据
kafkaDataStream.print()
env.execute("login_steaming")
}
/**
* 获取基础配置
* @param path
* @return
*
*/
def getConf(path: String): Array[String] = {
val conf = ConfigFactory.parseFile(new File(path))
Array(conf.getString("conf.brokers"),
conf.getString("conf.topics"),
conf.getString("conf.group"),
conf.getString("conf.times"),
conf.getString("conf.parallelism"),
conf.getString("conf.outputHDFS")
)
}
}
import org.apache.flink.api.common.functions.MapFunction
class MapOperator extends MapFunction[String, String] {
@throws[Exception]
override def map(line: String): String = { // 转化过程,把字符串转化为大写
val str = line.replaceAll("\\|#\\$", "\t")
println("处理后的数据 ************ "+str)
str
}
}
#!/bin/bash
export HADOOP_CLASSPATH=/usr/hdp/3.1.4.0-315/hadoop
export HBASE_CONF_DIR=/usr/hdp/3.1.4.0-315/hbase/conf
export YARN_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop-yarn/conf
/opt/flink/bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m -ys 5 -c com.leadeon.service.bos_login_dtl /data/work/cmbh/project/flinkJob/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar /data/work/cmbh/project/flinkJob/application.conf
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- shangjiatang.cn 版权所有 湘ICP备2022005869号-4
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务