Flink的sink实战之三:cassandra3_flink cassandra-程序员宅基地

技术标签: flink  java  

本文是《Flink的sink实战》系列的第三篇,主要内容是体验Flink官方的cassandra connector,整个实战如下图所示,我们先从kafka获取字符串,再执行wordcount操作,然后将结果同时打印和写入cassandra:
在这里插入图片描述

全系列链接

  1. 《Flink的sink实战之一:初探》
  2. 《Flink的sink实战之二:kafka》
  3. 《Flink的sink实战之三:cassandra3》
  4. 《Flink的sink实战之四:自定义》

软件版本

本次实战的软件版本信息如下:

  1. cassandra:3.11.6
  2. kafka:2.4.0(scala:2.12)
  3. jdk:1.8.0_191
  4. flink:1.9.2
  5. maven:3.6.0
  6. flink所在操作系统:CentOS Linux release 7.7.1908
  7. cassandra所在操作系统:CentOS Linux release 7.7.1908
  8. IDEA:2018.3.5 (Ultimate Edition)

关于cassandra

本次用到的cassandra是三台集群部署的集群,搭建方式请参考《ansible快速部署cassandra3集群》

准备cassandra的keyspace和表

先创建keyspace和table:

  1. cqlsh登录cassandra:
cqlsh 192.168.133.168
  1. 创建keyspace(3副本):
CREATE KEYSPACE IF NOT EXISTS example
    WITH replication = {
    'class': 'SimpleStrategy', 'replication_factor': '3'};
  1. 建表:
CREATE TABLE IF NOT EXISTS example.wordcount (
    word text,
    count bigint,
    PRIMARY KEY(word)
    );

准备kafka的topic

  1. 启动kafka服务;
  2. 创建名为test001的topic,参考命令如下:
./kafka-topics.sh \
--create \
--bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 \
--partitions 1 \
--topic test001
  1. 进入发送消息的会话模式,参考命令如下:
./kafka-console-producer.sh \
--broker-list kafka:9092 \
--topic test001
  1. 在会话模式下,输入任意字符串然后回车,都会将字符串消息发送到broker;

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) [email protected]:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在flinksinkdemo文件夹下,如下图红框所示:
在这里插入图片描述

两种写入cassandra的方式

flink官方的connector支持两种方式写入cassandra:

  1. Tuple类型写入:将Tuple对象的字段对齐到指定的SQL的参数中;
  2. POJO类型写入:通过DataStax,将POJO对象对应到注解配置的表和字段中;

接下来分别使用这两种方式;

开发(Tuple写入)

  1. 《Flink的sink实战之二:kafka》中创建了flinksinkdemo工程,在此继续使用;
  2. 在pom.xml中增加casandra的connector依赖:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-cassandra_2.11</artifactId>
  <version>1.10.0</version>
</dependency>
  1. 另外还要添加flink-streaming-scala依赖,否则编译CassandraSink.addSink这段代码会失败:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
</dependency>
  1. 新增CassandraTuple2Sink.java,这就是Job类,里面从kafka获取字符串消息,然后转成Tuple2类型的数据集写入cassandra,写入的关键点是Tuple内容和指定SQL中的参数的匹配:
package com.bolingcavalry.addsink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;


public class CassandraTuple2Sink {
    
    public static void main(String[] args) throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度
        env.setParallelism(1);

        //连接kafka用到的属性对象
        Properties properties = new Properties();
        //broker地址
        properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
        //zookeeper地址
        properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
        //消费者的groupId
        properties.setProperty("group.id", "flink-connector");
        //实例化Consumer类
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                "test001",
                new SimpleStringSchema(),
                properties
        );

        //指定从最新位置开始消费,相当于放弃历史消息
        flinkKafkaConsumer.setStartFromLatest();

        //通过addSource方法得到DataSource
        DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);

        DataStream<Tuple2<String, Long>> result = dataStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    
                             @Override
                             public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    
                                 String[] words = value.toLowerCase().split("\\s");

                                 for (String word : words) {
    
                                     //cassandra的表中,每个word都是主键,因此不能为空
                                     if (!word.isEmpty()) {
    
                                         out.collect(new Tuple2<String, Long>(word, 1L));
                                     }
                                 }
                             }
                         }
                )
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        result.addSink(new PrintSinkFunction<>())
                .name("print Sink")
                .disableChaining();

        CassandraSink.addSink(result)
                .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
                .setHost("192.168.133.168")
                .build()
                .name("cassandra Sink")
                .disableChaining();

        env.execute("kafka-2.4 source, cassandra-3.11.6 sink, tuple2");
    }
}
  1. 上述代码中,从kafka取得数据,做了word count处理后写入到cassandra,注意addSink方法后的一连串API(包含了数据库连接的参数),这是flink官方推荐的操作,另外为了在Flink web UI看清楚DAG情况,这里调用disableChaining方法取消了operator chain,生产环境中这一行可以去掉;
  2. 编码完成后,执行mvn clean package -U -DskipTests构建,在target目录得到文件flinksinkdemo-1.0-SNAPSHOT.jar
  3. 在Flink的web UI上传flinksinkdemo-1.0-SNAPSHOT.jar,并指定执行类,如下图红框所示:
    在这里插入图片描述
  4. 启动任务后DAG如下:
    在这里插入图片描述
  5. 去前面创建的发送kafka消息的会话模式窗口,发送一个字符串"aaa bbb ccc aaa aaa aaa";
  6. 查看cassandra数据,发现已经新增了三条记录,内容符合预期:
    在这里插入图片描述
  7. 查看TaskManager控制台输出,里面有Tuple2数据集的打印结果,和cassandra的一致:
    在这里插入图片描述
  8. DAG上所有SubTask的记录数也符合预期:
    在这里插入图片描述

开发(POJO写入)

接下来尝试POJO写入,即业务逻辑中的数据结构实例被写入cassandra,无需指定SQL:

  1. 实现POJO写入数据库,需要datastax库的支持,在pom.xml中增加以下依赖:
<dependency>
  <groupId>com.datastax.cassandra</groupId>
  <artifactId>cassandra-driver-core</artifactId>
  <version>3.1.4</version>
  <classifier>shaded</classifier>
  <!-- Because the shaded JAR uses the original POM, you still need
                 to exclude this dependency explicitly: -->
  <exclusions>
    <exclusion>
	<groupId>io.netty</groupId>
	<artifactId>*</artifactId>
	</exclusion>
  </exclusions>
</dependency>
  1. 请注意上面配置的exclusions节点,依赖datastax的时候,按照官方指导对netty相关的间接依赖做排除,官方地址:https://docs.datastax.com/en/developer/java-driver/3.1/manual/shaded_jar/
  2. 创建带有数据库相关注解的实体类WordCount:
package com.bolingcavalry.addsink;

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;

@Table(keyspace = "example", name = "wordcount")
public class WordCount {
    

    @Column(name = "word")
    private String word = "";

    @Column(name = "count")
    private long count = 0;

    public WordCount() {
    
    }

    public WordCount(String word, long count) {
    
        this.setWord(word);
        this.setCount(count);
    }

    public String getWord() {
    
        return word;
    }

    public void setWord(String word) {
    
        this.word = word;
    }

    public long getCount() {
    
        return count;
    }

    public void setCount(long count) {
    
        this.count = count;
    }

    @Override
    public String toString() {
    
        return getWord() + " : " + getCount();
    }
}
  1. 然后创建任务类CassandraPojoSink:
package com.bolingcavalry.addsink;

import com.datastax.driver.mapping.Mapper;
import com.datastax.shaded.netty.util.Recycler;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class CassandraPojoSink {
    
    public static void main(String[] args) throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度
        env.setParallelism(1);

        //连接kafka用到的属性对象
        Properties properties = new Properties();
        //broker地址
        properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
        //zookeeper地址
        properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
        //消费者的groupId
        properties.setProperty("group.id", "flink-connector");
        //实例化Consumer类
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                "test001",
                new SimpleStringSchema(),
                properties
        );

        //指定从最新位置开始消费,相当于放弃历史消息
        flinkKafkaConsumer.setStartFromLatest();

        //通过addSource方法得到DataSource
        DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);

        DataStream<WordCount> result = dataStream
                .flatMap(new FlatMapFunction<String, WordCount>() {
    
                    @Override
                    public void flatMap(String s, Collector<WordCount> collector) throws Exception {
    
                        String[] words = s.toLowerCase().split("\\s");

                        for (String word : words) {
    
                            if (!word.isEmpty()) {
    
                                //cassandra的表中,每个word都是主键,因此不能为空
                                collector.collect(new WordCount(word, 1L));
                            }
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5))
                .reduce(new ReduceFunction<WordCount>() {
    
                    @Override
                    public WordCount reduce(WordCount wordCount, WordCount t1) throws Exception {
    
                        return new WordCount(wordCount.getWord(), wordCount.getCount() + t1.getCount());
                    }
                });

        result.addSink(new PrintSinkFunction<>())
                .name("print Sink")
                .disableChaining();

        CassandraSink.addSink(result)
                .setHost("192.168.133.168")
                .setMapperOptions(() -> new Mapper.Option[] {
     Mapper.Option.saveNullFields(true) })
                .build()
                .name("cassandra Sink")
                .disableChaining();

        env.execute("kafka-2.4 source, cassandra-3.11.6 sink, pojo");
    }

}
  1. 从上述代码可见,和前面的Tuple写入类型有很大差别,为了准备好POJO类型的数据集,除了flatMap的匿名类入参要改写,还要写好reduce方法的匿名类入参,并且还要调用setMapperOptions设置映射规则;
  2. 编译构建后,上传jar到flink,并且指定任务类为CassandraPojoSink:
    在这里插入图片描述
  3. 清理之前的数据,在cassandra的cqlsh上执行TRUNCATE example.wordcount;
  4. 像之前那样发送字符串消息到kafka:
    在这里插入图片描述
  5. 查看数据库,发现结果符合预期:
    在这里插入图片描述
  6. DAG和SubTask情况如下:
    在这里插入图片描述

至此,flink的结果数据写入cassandra的实战就完成了,希望能给您一些参考;

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/boling_cavalry/article/details/105598968

智能推荐

从零开始搭建Hadoop_创建一个hadoop项目-程序员宅基地

文章浏览阅读331次。第一部分:准备工作1 安装虚拟机2 安装centos73 安装JDK以上三步是准备工作,至此已经完成一台已安装JDK的主机第二部分:准备3台虚拟机以下所有工作最好都在root权限下操作1 克隆上面已经有一台虚拟机了,现在对master进行克隆,克隆出另外2台子机;1.1 进行克隆21.2 下一步1.3 下一步1.4 下一步1.5 根据子机需要,命名和安装路径1.6 ..._创建一个hadoop项目

心脏滴血漏洞HeartBleed CVE-2014-0160深入代码层面的分析_heartbleed代码分析-程序员宅基地

文章浏览阅读1.7k次。心脏滴血漏洞HeartBleed CVE-2014-0160 是由heartbeat功能引入的,本文从深入码层面的分析该漏洞产生的原因_heartbleed代码分析

java读取ofd文档内容_ofd电子文档内容分析工具(分析文档、签章和证书)-程序员宅基地

文章浏览阅读1.4k次。前言ofd是国家文档标准,其对标的文档格式是pdf。ofd文档是容器格式文件,ofd其实就是压缩包。将ofd文件后缀改为.zip,解压后可看到文件包含的内容。ofd文件分析工具下载:点我下载。ofd文件解压后,可以看到如下内容: 对于xml文件,可以用文本工具查看。但是对于印章文件(Seal.esl)、签名文件(SignedValue.dat)就无法查看其内容了。本人开发一款ofd内容查看器,..._signedvalue.dat

基于FPGA的数据采集系统(一)_基于fpga的信息采集-程序员宅基地

文章浏览阅读1.8w次,点赞29次,收藏313次。整体系统设计本设计主要是对ADC和DAC的使用,主要实现功能流程为:首先通过串口向FPGA发送控制信号,控制DAC芯片tlv5618进行DA装换,转换的数据存在ROM中,转换开始时读取ROM中数据进行读取转换。其次用按键控制adc128s052进行模数转换100次,模数转换数据存储到FIFO中,再从FIFO中读取数据通过串口输出显示在pc上。其整体系统框图如下:图1:FPGA数据采集系统框图从图中可以看出,该系统主要包括9个模块:串口接收模块、按键消抖模块、按键控制模块、ROM模块、D.._基于fpga的信息采集

微服务 spring cloud zuul com.netflix.zuul.exception.ZuulException GENERAL-程序员宅基地

文章浏览阅读2.5w次。1.背景错误信息:-- [http-nio-9904-exec-5] o.s.c.n.z.filters.post.SendErrorFilter : Error during filteringcom.netflix.zuul.exception.ZuulException: Forwarding error at org.springframework.cloud..._com.netflix.zuul.exception.zuulexception

邻接矩阵-建立图-程序员宅基地

文章浏览阅读358次。1.介绍图的相关概念  图是由顶点的有穷非空集和一个描述顶点之间关系-边(或者弧)的集合组成。通常,图中的数据元素被称为顶点,顶点间的关系用边表示,图通常用字母G表示,图的顶点通常用字母V表示,所以图可以定义为:  G=(V,E)其中,V(G)是图中顶点的有穷非空集合,E(G)是V(G)中顶点的边的有穷集合1.1 无向图:图中任意两个顶点构成的边是没有方向的1.2 有向图:图中..._给定一个邻接矩阵未必能够造出一个图

随便推点

MDT2012部署系列之11 WDS安装与配置-程序员宅基地

文章浏览阅读321次。(十二)、WDS服务器安装通过前面的测试我们会发现,每次安装的时候需要加域光盘映像,这是一个比较麻烦的事情,试想一个上万个的公司,你天天带着一个光盘与光驱去给别人装系统,这将是一个多么痛苦的事情啊,有什么方法可以解决这个问题了?答案是肯定的,下面我们就来简单说一下。WDS服务器,它是Windows自带的一个免费的基于系统本身角色的一个功能,它主要提供一种简单、安全的通过网络快速、远程将Window..._doc server2012上通过wds+mdt无人值守部署win11系统.doc

python--xlrd/xlwt/xlutils_xlutils模块可以读xlsx吗-程序员宅基地

文章浏览阅读219次。python–xlrd/xlwt/xlutilsxlrd只能读取,不能改,支持 xlsx和xls 格式xlwt只能改,不能读xlwt只能保存为.xls格式xlutils能将xlrd.Book转为xlwt.Workbook,从而得以在现有xls的基础上修改数据,并创建一个新的xls,实现修改xlrd打开文件import xlrdexcel=xlrd.open_workbook('E:/test.xlsx') 返回值为xlrd.book.Book对象,不能修改获取sheett_xlutils模块可以读xlsx吗

关于新版本selenium定位元素报错:‘WebDriver‘ object has no attribute ‘find_element_by_id‘等问题_unresolved attribute reference 'find_element_by_id-程序员宅基地

文章浏览阅读8.2w次,点赞267次,收藏656次。运行Selenium出现'WebDriver' object has no attribute 'find_element_by_id'或AttributeError: 'WebDriver' object has no attribute 'find_element_by_xpath'等定位元素代码错误,是因为selenium更新到了新的版本,以前的一些语法经过改动。..............._unresolved attribute reference 'find_element_by_id' for class 'webdriver

DOM对象转换成jQuery对象转换与子页面获取父页面DOM对象-程序员宅基地

文章浏览阅读198次。一:模态窗口//父页面JSwindow.showModalDialog(ifrmehref, window, 'dialogWidth:550px;dialogHeight:150px;help:no;resizable:no;status:no');//子页面获取父页面DOM对象//window.showModalDialog的DOM对象var v=parentWin..._jquery获取父window下的dom对象

什么是算法?-程序员宅基地

文章浏览阅读1.7w次,点赞15次,收藏129次。算法(algorithm)是解决一系列问题的清晰指令,也就是,能对一定规范的输入,在有限的时间内获得所要求的输出。 简单来说,算法就是解决一个问题的具体方法和步骤。算法是程序的灵 魂。二、算法的特征1.可行性 算法中执行的任何计算步骤都可以分解为基本可执行的操作步,即每个计算步都可以在有限时间里完成(也称之为有效性) 算法的每一步都要有确切的意义,不能有二义性。例如“增加x的值”,并没有说增加多少,计算机就无法执行明确的运算。 _算法

【网络安全】网络安全的标准和规范_网络安全标准规范-程序员宅基地

文章浏览阅读1.5k次,点赞18次,收藏26次。网络安全的标准和规范是网络安全领域的重要组成部分。它们为网络安全提供了技术依据,规定了网络安全的技术要求和操作方式,帮助我们构建安全的网络环境。下面,我们将详细介绍一些主要的网络安全标准和规范,以及它们在实际操作中的应用。_网络安全标准规范