1.flink集群安装部署及WEB 图形界面提交作业任务。1.flink角色简介 有界流数据结束程序会退出。 无界流来一条处理一条程序会一直在线等待数据到来。除非异常退出。 Flink提交作业和执行任务需要几个关键组件。 Flink Client | JobManager | TaskManager flink run 之后; 客户端:client:代码由客户端获取并作转换之后提交给JobManager JobManager:就是Flink集群里的管事人对作业进行中央调度管理而它获取到要执行的作业后 会进一步处理转换然后分发任务给众多的TaskManager ; TaskManager:就是真正干活的人数据的处理操作都是他们来做的。 注意: Flink是一个非常灵活的处理框架它支持多种不同的部署场景还可以和不同的资源管理平台方便地集成。 2.flink规划 hadoop001:JobManagerTaskManager hadoop002:TaskManager hadoop003:TaskManager vim /etc/hosts 192.168.3.11 hadoop001 192.168.3.12 hadoop002 192.168.3.13 hadoop003 useradd flink mkdir -p /opt/module chown -R flink:flink /opt/module tar zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module chown -R flink:flink /opt/module su - flink cd /opt/module cd flink-1.17.0 最重要的配置文件是: conf/flink-conf.yaml #hadoop001节点 #JobManager: vim flink-conf.yaml #JobManager节点地址 jobmanager.rpc.address: hadoop001 jobmanager.rpc.port: 6123 jobmanager.bind-host: 0.0.0.0 #所有节点均可访问 rest.address: hadoop001 rest.bind-address: 0.0.0.0 #TaskManager 节点地址需要配置为当前机器名 taskManager.bind-host: 0.0.0.0 taskManager.host: hadoop002 #带bind标识的都设置:0.0.0.0 vim workers #工作节点 hadoop001 hadoop002 hadoop003 vim masters hadoop001 配置分发到:hadoop002,hadoop003; xsync flink-1.17.0 #用封装的命令把这些拷贝过去。 #hadoop002 vim flink-conf.yaml taskmanager.host: hadoop002 #hadoop003 vim flink-conf.yaml taskmanager.host: hadoop003 flink-conf.yaml 文件中还可以对集群中的JobManager和TaskManager组件进行优化配置注意配置项如下: jobmanager.memory.process.size: 对JobManager进程可使用到的全部内存进行配置包括JVM元空间和其他开销 默认 1600M,可以根据集群规模进行适当调整。 taskmanager.memory.process.size :对 TaskManager 进程可使用到的全部内存进行配置包括JVM元空间和其他开销 默认 1728M,可以根据集群规模进行适当调整。 taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的Slot数量进行配置默认为1可根据 TaskManager 所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是 TaskManager 中具体运行一个任务所分配的计算资源。 parallelism.default: Flink 任务执行的并行度默认为1。优先级低于代码中进行的并行度配置和任务提交时 使用参数指定的并行度数量。 #启动flink集群 cd flink-1.17.0 [flinkhadoop001 flink-1.17.0]$ bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host hadoop001. Starting taskexecutor daemon on host hadoop001. Starting taskexecutor daemon on host hadoop002. Starting taskexecutor daemon on host hadoop003. 启动之后通过: hadoop001:8081 访问flink集群 flink的作业可以通过后台启动也可以通过flink的页面启动。 flink官方推荐的打包插件: org.apache.maven.plugins maven-shade-plugin-3.2.4 常见的还有:assembly ,有些情况有问题不推荐。 project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd modelVersion4.0.0/modelVersion groupIdorg.example/groupId artifactIdFlinkTuTorial-1.17/artifactId version1.0-SNAPSHOT/version nameArchetype - FlinkTuTorial-1.17/name urlhttp://maven.apache.org/url properties flink.version1.17.0/flink.version /properties dependencies dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-java/artifactId version${flink.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients/artifactId version${flink.version}/version scopeprovided/scope /dependency /dependencies build plugins plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-shade-plugin/artifactId version3.2.4/version executions execution phasepackage/phase goals goalshade/goal /goals configuration artifactSet excludes excludecom.google.com.findbugs:jsr305/exclude excludeorg.slf4j:*/exclude excludelog4j:*/exclude /excludes /artifactSet filters filter artifact*:*/artifact excludes excludeMETA-INF/*.SF/exclude excludeMETA-INF/*.DSA/exclude excludeMETA-INF/*.RSA/exclude /excludes /filter /filters transformers combine.childrenappend transformer implementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformer /transformers /configuration /execution /executions /plugin /plugins /build /project */ #使用shade插件的配置: build plugins plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-shade-plugin/artifactId version3.2.4/version executions execution phasepackage/phase goals goalshade/goal /goals configuration artifactSet excludes excludecom.google.com.findbugs:jsr305/exclude excludeorg.slf4j:*/exclude excludelog4j:*/exclude /excludes /artifactSet filters filter artifact*:*/artifact excludes excludeMETA-INF/*.SF/exclude excludeMETA-INF/*.DSA/exclude excludeMETA-INF/*.RSA/exclude /excludes /filter /filters transformers combine.childrenappend transformer implementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformer /transformers /configuration /execution /executions /plugin /plugins /build #如果是flink安装包里已有的就不要打包了写:scopeprovidedscope */ 如果在pom.xml 里面给flink的依赖写上scopeprovided/scope 那么在打包的时候不会打包它但是此时运行会报错找不到类。 需要在 --run--编辑配置文件--勾选:Include dependencies with Provided scope; 所有的类都要勾选。 #使用shade插件的配置 都次打包都要先 clean ,再packege ; #打包好之后的工程路径。 C:\Users\admin\IdeaProjects\FlinkTuTorial-1.17\target vim xsync #!/bin/bash # 获取输入参数个数如果没有参数直接退出 pcount$# if [ $pcount -lt 1 ]; then echo No Enough Arguement! exit; fi # 2. 遍历集群所有机器, 这里要替换成你实际的主机名或IP for host in hadoop001 hadoop002 hadoop003 do echo $host # 3. 遍历所有目录挨个发送 for file in $ do # 4. 判断文件是否存在 if [ -e $file ]; then # 5. 获取父目录 pdir$(cd -P $(dirname $file); pwd) # 6. 获取当前文件的名称 fname$(basename $file) ssh $host mkdir -p $pdir rsync -av $pdir/$fname $USER$host:$pdir else echo $file does not exists! fi done done ssh-keygen # 将公钥复制到所有目标节点 ssh-copy-id hadoop001 ssh-copy-id hadoop002 ssh-copy-id hadoop003#启动flink集群cd flink-1.17.0[flinkhadoop001 flink-1.17.0]$ bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host hadoop001.Starting taskexecutor daemon on host hadoop001.Starting taskexecutor daemon on host hadoop002.Starting taskexecutor daemon on host hadoop003.添加编译好的项目左下角框传入main方法的参数不填。并行度:保持默认1保存点:暂时不处理。启动: 7777的监听提交flink作业。socket端口中写入数据。查看输出信息。此时可以看到flink 已将消息处理后输出。