专栏首页腾讯云流计算 OceanusFlink 实践教程:入门9-Jar 作业开发
原创

Flink 实践教程:入门9-Jar 作业开发

(福利推荐:你还在原价购买腾讯云服务器?现在腾讯云0.8折限时抢购活动来啦!4核8G企业云服务器仅2998元/3年,立即抢购>>>:9i0i.cn/qcloud

腾讯云最新优惠活动来了:云产品限时1折,云服务器低至88元/年 ,点击这里立即抢购:9i0i.cn/qcloud,更有2860元代金券免费领取,付款直接抵现金用,点击这里立即领取:9i0i.cn/qcloudquan

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

Flink Jar 作业既支持使用 DataStream API 编程也支持使用 Table API/SQL 编程, Table API 和 SQL 也可以很容易地集成并嵌入到 DataStream 程序中,请参见 与 DataStream API 集成 ****1章节了解如何将 DataStream 与 Table 之间的相互转化。

流计算 Oceanus 支持 Flink Jar 作业和 Flink SQL 作业,本文将向您详细介绍如何使用 Flink DataStream API 进行 Jar 作业开发,并在流计算 Oceanus 平台运行。

Flink 实践教程:入门9-Jar作业开发

前置准备

创建流计算 Oceanus 集群

在流计算 Oceanus 产品活动页面 1 元购买 Oceanus 集群

进入 Oceanus 控制台 2,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 3。

创建消息队列 CKafka

进入 CKafka 控制台 4,点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 5。

创建 Topic:

进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic 6。

开发 DataStream 作业

1. 新建 Maven 工程。

在本地 IDEA 中新建Maven Project,并配置 pom.xml 文件。

配置 pom.xml 文件时需要设置主类,否则在打包完上传至 Oceanus/Flink 集群时无法找到主类。

pom.xml 文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.oceanus</groupId>
    <artifactId>jar_demos</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Oceanus 平台自带了 flink-java、flink-streaming 等依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <!-- 使用 Oceanus 内置 Kafka Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>

        <!-- test -->
        <!-- flink-clients 用于本地调试 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <!-- 设置主类 -->
                    <archive>
                        <manifestEntries>
                            <Main-Class>com.demos.HelloWorld</Main-Class>
                        </manifestEntries>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2. 代码编写

Flink DataStream 作业代码如下:

package com.demos;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // 1. 设置运行环境
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            data.add(i);
        }

        // 2. 配置数据源读取数据
        // 预定义数据源支持从文件、套接字、集合读入数据;自定义数据源支持 Kafka、MySQL 等使用 addSource() 函数读入数据
        DataStreamSource<List<Integer>> dataStream = sEnv.fromElements(data);

        // 3. 数据加工
        DataStream ds = dataStream.flatMap(new FlatMapFunction<List<Integer>, String>() {
            @Override
            public void flatMap(List<Integer> value, Collector<String> out) throws Exception {
                value.forEach(v -> out.collect(v.toString()));
            }
        });

        // 4. 数据输出
        // 预定义目的端支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket;自定义目的端支持 Kafka、MySQL 等使用 addSink() 函数写出数据
        Properties sinkProps = new Properties();
        String hosts = "10.0.0.29:9092";
        sinkProps.setProperty("bootstrap.servers", hosts);
        String outTopic = "flink-demo9";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(outTopic, new SimpleStringSchema(), sinkProps);
        ds.addSink(producer);
        // ds.print();

        // 5. 执行程序
        sEnv.execute("helloworld");
    }
}

打包 Jar 包

使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。 命令行打包命令:

mvn clean package

命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。

流计算 Oceanus 作业

1. 上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。

2. 创建作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。

【主程序包】选择刚刚上传的依赖,并选择最新版本。参考 pom.xml 文件填写主类,此处填入 com.demos.HelloWorld。

3. 运行作业

点击【发布草稿】即可运行,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

总结

  1. DataStream 作业支持各类异构数据源与数据目的端。自定义数据源支持 Kafka、MySQL 等,使用 addSource() 函数读入数据;自定义目的端支持 Kafka、MySQL 等,使用 addSink() 函数写出数据。
  2. 打包时无需打包 flink 核心依赖,流计算 Oceanus 平台已提供。

阅读参考

1 与 DataStream API 集成:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/

2 Oceanus 控制台:https://console.cloud.tencent.com/oceanus

3 创建独享集群:/document/product/849/48298

4 CKafka 控制台:https://console.cloud.tencent.com/ckafka

5 CKafka 创建实例:/document/product/597/54839

6 Ckafka 创建 Topic:/document/product/597/54854

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 [email protected] 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 实践教程-入门(9):Jar 作业开发

    作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apach...

    腾讯QQ大数据
  • Flink 实践教程:入门8-简单 ETL 作业

    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点...

    吴云涛
  • Flink 实践教程-入门(8): 简单 ETL 作业

    作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache ...

    腾讯QQ大数据
  • Dlink On Yarn 三种 Flink 执行方式的实践

    Dlink 为 Apache Flink 而生,让 Flink SQL 更加丝滑。它是一个 交互式的 FlinkSQL Studio,可以在线开发、预览...

    文末丶
  • Flink 实践教程:入门(2):写入 Elasticsearch

    作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache F...

    腾讯QQ大数据
  • Flink 实践教程-入门(5):写入 ClickHouse

    作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache...

    腾讯QQ大数据
  • Flink 实践教程:入门5-写入 ClickHouse

    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点...

    吴云涛
  • Flink 实践教程:入门2-写入 Elasticsearch

    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点...

    吴云涛
  • 大数据云原生系列| 微信 Flink on Kubernetes 实战总结

    涂小刚,微信高级开发工程师,负责微信大数据平台开发及建设。 王玉君,腾讯云后台高级开发工程师,负责腾讯云原生系统开发及建设。 前言 架构转型,拥抱云原生服务...

    腾讯云原生
  • 基于Flink打造实时计算平台为企业赋能

    随着互联网技术的广泛使用,信息的实时性对业务的开展越来越重要,特别是业务的异常信息,没滞后一点带来的就是直接的经济损失。所以实时信息处理能力,越来越成为企业的重...

    王知无-import_bigdata
  • Dlink ?一款FlinkSQL交互式开发平台

    目前 Flink 社区正如火如荼的发展,但苦于没有一款适合 Flink SQL 界面化开发的工具,于是增加了 Flink 的门槛与成本。虽然官方提供了 SQL ...

    文末丶
  • flink学习笔记

    <!--> ![Flink类型分类](./img/introduction-to-type-and-serialization-mechainisms-1.pn...

    皮皮熊
  • 相信我,你也能成为大数据开发工程师(一)

    大家好啊,老李最近高产如母猪,我也来凑个热闹。说起来挺魔幻的,去年这时候,我还是一个连java curd都不会的菜鸡,今天却在这里大谈大数据开发- -。我也没想...

    老李秀
  • Flink or Spark?实时计算框架在K12场景的应用实践

    如今,越来越多的业务场景要求 OLTP 系统能及时得到业务数据计算、分析后的结果,这就需要实时的流式计算如Flink等来保障。例如,在 TB 级别数据量的数据库...

    芋道源码
  • Flink入门宝典(详细截图版)

    本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。

    实时计算
  • Flink 01 | 十分钟搭建第一个Flink应用和本地集群

    上一篇文章中我对新一代大数据处理引擎Flink做了简单的介绍,包括:批量计算与流式计算的区别、流式计算引擎的重要性,以及Flink相比其他流式计算引擎的优势。因...

    PP鲁
  • Flink1.9.2源码编译和使用

    用于学习,在IDEA上开发的flink job,能直接在IDEA运行,如果运行时依赖的flink框架是我们自己编译构建的,就做到了从业务到框架都可以修改源码并验...

    程序员欣宸
  • 推荐10本大数据领域必读的经典好书(火速收藏)

    写博客也已经快一年了,从去年的1024到现在金秋10月已纷至沓来。回顾这一年所发布的原创文章,基本都是与大数据主流或者周边的技术为主。本篇博客,...

    大数据梦想家
  • 有赞 Flink 实时任务资源优化探索与实践

    随着 Flink k8s 化以及实时集群迁移完成,有赞越来越多的 Flink 实时任务运行在 K8s 集群上,Flink k8s 化提升了实时集群在大促时弹性扩...

    有赞coder

扫码关注云+社区

领取腾讯云代金券


http://www.vxiaotou.com