flink实战--开发中常见的错误与问题_checkpoint expired before completing.-程序员宅基地

技术标签: Flink学习必读系列  flink常见错误  

常见错误集

1.Checkpoint失败:Checkpoint expired before completing

        env.enableCheckpointing(1000L)
        val checkpointConf = env.getCheckpointConfig
        checkpointConf.setMinPauseBetweenCheckpoints(30000L)
        checkpointConf.setCheckpointTimeout(8000L)

原因是因为checkpointConf.setCheckpointTimeout(8000L)设置的太小了,默认是10min,这里只设置了8sec。当一个Flink App背压的时候(例如由外部组件异常引起),Barrier会流动的非常缓慢,导致Checkpoint时长飙升。

2.在Flink中,资源的隔离是通过Slot进行的,也就是说多个Slot会运行在同一个JVM中,这种隔离很弱,尤其对于生产环境。Flink App上线之前要在一个单独的Flink集群上进行测试,否则一个不稳定、存在问题的Flink App上线,很可能影响整个Flink集群上的App。

3 .Flink App抛出The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed.此类异常,通过查看日志,一般就是某一个Flink App内存占用大,导致TaskManager(在Yarn上就是Container)被Kill掉。但是并不是所有的情况都是这个原因,还需要进一步看yarn的日志( 查看yarn任务日志:yarn logs -applicationId   -appOwner),如果代码写的没问题,就确实是资源不够了,其实1G Slot跑多个Task(Slot Group Share)其实挺容易出现的。因此有两种选择。可以根据具体情况,权衡选择一个。

  • 将该Flink App调度在Per Slot内存更大的集群上。
  • 通过slotSharingGroup("xxx"),减少Slot中共享Task的个数
org.apache.flink.util.FlinkException: The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed.
    at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
    at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
    at 。。。。。。。。。。。。
此处省略n行
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

4.抛出Caused by: java.io.IOException: Too many open files异常,一般是因为存在Flink App在结束的时候没有释放资源,这里指的是例如忘记关闭连接池,线程池等资源。如果一个Flink App结束的时候没有释放资源,又因为异常被重启多次后,很容易出现Too many open files异常,从而拖垮整个TaskManager上的Flink App。

  • 重写RichFunction的Close()方法,加上例如:suishenRedisTemplate.quit()hbaseClient.shutdown().join(TimeUnit.SECONDS.toMillis(30))等。由于现在Scala Api不支持RichAsyncFunction,没有Close方法,无法释放资源,这是一件很蛋疼的事情。。。
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:296)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
    at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:156)
    at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:147)
    at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:126)
    at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:36)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
    ... 21 more
Caused by: java.io.IOException: Too many open files
    at sun.nio.ch.IOUtil.makePipe(Native Method)
    at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
    at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
    at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:154)
    ... 25 more

5.启动报错,提示找不到 jersey 的类java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties解决办法进入yarn中 把lib目中的一下两个问价拷贝到flink的lib中

hadoop/share/hadoop/yarn/lib/jersey-client-1.9.jar
/hadoop/share/hadoop/yarn/lib/jersey-core-1.9.jar

6.Scala版本冲突--java.lang.NoSuchMethodError:scala.collection.immutable.HashSet$.empty()Lscala/collection/

解决办法:添加import org.apache.flink.api.scala._

7.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]Could not build the program from JAR file.
或者Could not identify hostname and port in 'yarn-cluster' 

解决:flink1.8需要在lib中添加一下jar包

8.Creating the input splits caused an error: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.mapreduce.lib.input.FileInputFormat或者 tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.mapreduce.lib.input.FileInputFormat

解决方式:导入以下依赖即可   

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.1</version>
        </dependency>

9.Table is not an append一only table. Use the toRetractStream() in order to handle add and retract messages.

这个是因为动态表不是append一only模式的,需要用toRetractStream(回撤流)处理就好了.
tableEnv. toRet ractSt ream [Person](result) .print()

10.启动Flink任务的时候报错Caused by: java.lang.RuntimeException: Couldn't deploy Yarn cluster,然后仔细看发现里面有这么一句system times on machines may be out of sync,意思说是机器上的系统时间可能不同步.
已置
(1)安装ntpdate工具

yum 一y install ntp ntpdate

(2)设置系统时间与网络时间同步

ntpdate cn.pool.ntp.org

在三台机器上分别执行完这个,在启动任务,发现可以了. 

9.oom问题

java.lang.OutOfMemoryError: GC overhead limit exceeded

java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:177)
        。。。。。。。。
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

解决方案如下:

1.检查slot槽位够不够或者slot分配的数量有没有生效
2.程序起的并行是否都正常分配了(会有这样的情况出现,假如5个并行,但是只有2个在几点上生效了,另外3个没有数据流动)
3.检查flink程序有没有数据倾斜,可以通过flink的ui界面查看每个分区子节点处理的数据量

10.The proctime attribute can only be appended to the table schema and not replace anexisting field. Please move 'proctime' to the end of the schema


这个报错的翻译为, proctime属性只能附加到表模式,而不能替换现有字段。请将proctime'移到架构的末尾,我们在用proctime的时候要把他放到字段的最后一个位置.而不能放到其他的位置.

11.flink. table. TableJob$person$3(name: String, id: Integer, timestamp: Long)' must bestatic and globally accessible


这个报错是因为我们定义的case class类型必须是静态的,全局可访问的,就是说要把它放到main方法的外面就可以了

12.Could not retrieve the redirect address of the current leader. Please try to refresh,Flink任务在运行了一段时间后,进程还在但是刷新UI界面提示报错你把这个job杀掉重新启动,还是会报这个错.


解决方法是把这个目录下的文件删除,重启就可以了
high一availability.zookeeper.path.root: /flink
ZooKeeper节点根目录,其下放置所有集群节点的namespace.

13.hadoop的jar包冲突,导致写入hdfs报错

Caused by: java.io.IOException: The given file system URI (hdfs:///data/checkpoint-data/abtest) did not describe the authority (like for example HDFS NameNode address/port or S3 host). The attempt to use a configured default authority failed: Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:135)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)

原因:pom文件中添加和hadoop先关的依赖 去掉就好了

14.解析返回值类型失败报错

 The return type of function 'main(RemoteEnvironmentTest.java:27)' could not be determined automatically

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(RemoteEnvironmentTest.java:27)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
	at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
	at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)

解决方案:产生这种现象的原因一般是使用lambda表达式没有明确返回值类型,或者使用特使的数据结构flink无法解析其类型,这时候我们需要在方法的后面添加返回值类型,比如字符串

input.flatMap((Integer number, Collector<String> out) -> {
   。。。。。。
})
// 提供返回值类型
.returns(Types.STRING)

15.Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
    at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:140)
    at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:110)
    at org.apache.flink.table.api.java.StreamTableEnvironment.create(StreamTableEnvironment.java:112)
    at online_training.program.jarvis_flink.DDLtest.main(DDLtest.java:22)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' inthe classpath.

缺少blink或者flink的Planner 的相关依赖,添加如下依赖即可

<!-- Either... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- or... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
Internally, parts of the table ecosystem are i

16.Caused by: java.lang.IllegalArgumentException: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris

解决方案:flink 集成 hive 时 不支持embedded metastore的,配置hive时 需要起一个hive metastore 并在conf文件配置 hive.metastore.uris

17.Caused by: java.lang.NoSuchMethodError: scala.Some.value()Ljava/long/Object

原因:编译的scala版本和集群的scala版本不一样导致的

18..Caused by: org.apache.flink.util.SerializedThrowable: Ask timed out on [Actor[akka.tcp://flink@lxxxxx-091:36495/user/rpc/jobmanager_2#651891652]] after [120000 ms]. Message of type

akka通信超时

默认值如下 > > akka.ask.timeout: 10 s > web.timeout: 10000

分区多的建议调到60s  akka.ask.timeout和web.timeout两个参数

19.org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.

一般是flink程序使用的资源超高yarn节点的限制,比如内存,导致yarn节点kill掉容器

20. Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 10977 of 32768 bytes each.

如果以很高的并行度运行Flink,则可能需要增加网络缓冲区的数量。
  默认情况下,Flink占用JVM堆大小的10%作为网络缓冲区,最小为64MB,最大为1GB。您可以通过taskmanager.network.memory.fraction,taskmanager.network.memory.min和taskmanager.network.memory.max调整所有这些值。

其中taskmanager.network.memory.fraction作用是:

总Flink内存的一部分用作网络内存。网络内存是为ShuffleEnvironment保留的非堆内存(例如,网络缓冲区)。网络内存大小的推导,以构成总Flink内存的配置部分。如果派生的大小小于/大于配置的min/max大小,则将使用min/max大小。通过将最小/最大大小设置为相同的值,可以显式指定网络内存的确切大小。

可以通过设置./conf/flink-conf.yaml参数来配置taskmanager.network.numberOfBuffers文件中的网络缓冲区数。

该参数应设置为slots-per-TM^2 * TMs * 4,其中slots-per-TM是每个TaskManager的插槽数,而#TMs是任务taskManger的总数,可以看到TM个数固定的情况一下,slot个数越多,需要的网络缓存的内存就越大。

例如,要支持一个由20个8插槽计算机组成的集群,则应使用大约5000个网络缓冲区以实现最佳吞吐量。默认情况下,每个网络缓冲区的大小为32 KiBytes。在上面的示例中,系统将因此为网络缓冲区分配大约300 MiBytes。

21.cannot be cast to com.google.protobuf.Message

Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto cannot be cast to com.google.protobuf.Message
 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
 at com.sun.proxy.$Proxy14.registerApplicationMaster(Unknown Source)
 at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:106)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
 at com.sun.proxy.$Proxy15.registerApplicationMaster(Unknown Source)
 at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:222)
 at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:214)
 at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.registerApplicationMaster(AMRMClientAsyncImpl.java:138)
 at org.apache.flink.yarn.YarnResourceManager.createAndStartResourceManagerClient(YarnResourceManager.java:205)
 at org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:234)
 ... 11 common frames omitted

由于flink启动的时候会加载hadoop下面的jar包,加到自己的classpath下面,如果hadoop的jar包和自己flink工程的jar包版本不一致就会导致这个问题,解决办法:排除自己工程中的hadoop相关的jar,打包的时候不要打进来.

 <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>

提示,有可能你没有显式的引入hadoop-common、hadoop-hdfs这样的jar包,但是你引入了hbase的jar包,因为hbase会引入hdfs相关的jar包,也会导致类似的错误。

22.Caused by: java.io.IOException: The rpc invocation size 56424326 exceeds the maximum akka framesize.

 RPC 的消息大小超过了默认的 akka framesize 的最大值了,了解一下这个值的默认值,从 官网 我们可以看的到该值的默认大小为 "10485760b"

表现: 一般都是任务失败重试时, task 出现异常之后,它需要调用 updateTaskExecutionState(TaskExecutionState,taskExecutionState) 这个 RPC 接口去通知 Flink Jobmanager 去改变对应 task 的状态并且重启 task。但是呢,taskExecutionState 这个参数里面有个 error 属性,当我的 task 打出来的错误栈太多的时候,在序列化的之后超过了 rpc 接口要求的最大数据大小(也就是 maximum akka framesize),导致调用 updateTaskExecutionState 这个 rpc 接口失败,Jobmanager 无法获知这个 task 已经处于 fail 的状态,也无法重启,然后就导致了一系列连锁反应

23.Container id: container_e09_1659093339125_60632_01_000006

Exit code: 239

Stack trace: ExitCodeException exitCode=239:

at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)

现象: 任务正在运行,某个container退出了,并报了上面的错误如何排查问题?

第一步:去jobManager日志查看退出的container是在哪台机器启动的

第二步: 去所在机器查看container的log

可以看到此容器是内存溢出导致的,当时需要视情况而定,也有可能是别的原因导致退出 

解决办法

任务停止,在 flink-conf.yaml 中加入 akka.framesize 参数,调大该值。

akka.framesize: "62914560b"

或者application模型下提交命令使用-D指定参数设置覆盖flink-conf.yaml中的默认配置

23.Flink读取Kafka报错:KafkaException ByteArrayDeserializer is not an instance Deserializer

依赖包重复
在使用Flink连接Kafka的时候,需要两个依赖,一个是Flink端使用的flink-connector-kafka依赖包,另一个是kafka-clients包,两个包一起使用才可以连接到Kafka中。
如果在使用的过程中少了kafka-clients会导致找不到对应依赖。
但在我们这个报错过程中,大概率是打包时候将依赖包打入,并且在提交指定Flink依赖jar的时候也有对应的jar包。

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink-cersion}</version>
                <scope>provide</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.1.0</version>
                <scope>provide</scope>
            </dependency>

24.Caused by: java.lang.LinkageError: loader constraint violation错误,这是由于Flink的包加载机制引起的。

原因: 类加载顺序问题,可以在flink-conf.yaml中加入

classloader.resolve-order: parent-first
Flink的默认加载是child-first。

但是用了parent-first配置有可能出现类冲突问题。解决办法只针对个别包出来,不要一律用parent-first, 配置如下:

classloader.parent-first-patterns.additional: javax.script; jdk;

25. Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things:
either job(s) require(s) a larger size of JVM direct memory or there is a
direct memory leak.
The direct memory can be allocated by user code or some of its
dependencies. In this case ‘taskmanager.memory.task.off-heap.size’
configuration option should be increased.
Flink framework and its dependencies also consume the direct memory, mostly
for network communication.
The most of network memory is managed by Flink and should not result in
out-of-memory error. In certain special cases,
in particular for jobs with high parallelism, the framework may require
more direct memory which is not managed by Flink.
In this case ‘taskmanager.memory.framework.off-heap.size’ configuration
option should be increased.
If the error persists then there is probably a direct memory leak in user
code or some of its dependencies which has to be investigated and fixed.
The task executor has to be shutdown…
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
-----------------------------------

问题分析
taskmanager.memory.framework.off-heap.size 默认是一个固定值(256MB 以下),不是按百分比算的
OOM 应该是下游反压导致
建议是直接增加 taskmanager.memory.framework.off-heap.size

26.org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)

at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

at

一般情况下是内存太小了

27.org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition:Prd_IN_GeneralEvents-39

使用flink sql 创建kafka表时,发现当scan.startup.mode=group-offsets时,使用未初始化的group.id的情况,flink程序直接抛出此异常,而不是使用properties.auto.offset.reset设置的属性。通过查看源码,发现OffsetsInitializer::committedOffsets时设置了策略为None,且覆盖了properties参数,有没有其他方法实现当groupId不存在时,使用auto.offset.reset的属性

28 org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 

PartitionNotFoundException异常原因通常是下游task向上游task发送partition
request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功

同时我们可以通过参数:`taskmanager.network.request-backoff.max`,默认是10秒 设置请求下游task的超时时间,但是不要设置太大,太大会影响任务重启的时间

29 org.apache.flink.types.NullFieldException: Field 3 is null, but expected to hold a value.
  

 at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
    at org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeKey(RocksDBKeySerializationUtils.java:108)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:217)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:192)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:179)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.getKeyBytes(AbstractRocksDBState.java:161)
    at org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:96)
    at org.apache.flink.runtime.state.ttl.TtlReducingState.add(TtlReducingState.java:52)
    at com.yjp.stream.stat.business.crm.function.ReturnOrderFlatMapFunction.flatMap(ReturnOrderFlatMapFunction.java:101)
    at com.yjp.stream.stat.business.crm.function.ReturnOrderFlatMapFunction.flatMap(ReturnOrderFlatMapFunction.java:24)
    at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException


解决办法:

此异常是在keyby时以多个key分组某个key字段为null时抛出。在处理数据时将需要分组的数据都进行非null判断默认赋值。Field 3 is null 说明问题是分组的第四个key为null值(下标从0开始)

常见问题集

1.如何处理checkpoint设置RocksDBStateBackend方式,且当数据量大时,执行checkpoint会很慢的问题

原因分析

由于窗口使用自定义窗口,这时窗口的状态使用ListState,且同一个key值下,value的值非常多,每次新的value值到来都要使用RocksDB的merge()操作;触发计算时需要将该key值下所有的value值读出。RocksDB的方式为merge()->merge()....->merge()->read(),该方式读取数据时非常耗时

  source算子在瞬间发送了大量数据,所有数据的key值均相等,导致window算子处理速度过慢,使barrier在缓存中积压,快照的制作时间过长,导致window算子在规定时间内没有向CheckpointCoordinator报告快照制作完成,CheckpointCoordinator认为快照制作失败         

回答

Flink引入了第三方软件包RocksDB的缺陷问题导致该现象的发生。建议用户将checkpoint设置为FsStateBackend方式。

用户需要在应用代码中将checkpoint设置为FsStateBackend。例如:

 env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink-checkpoint/checkpoint/"));

2.如何处理blob.storage.directory配置/home目录时,启动yarn-session失败的问题

当用户设置“blob.storage.directory”为“/home”时,用户没有权限在“/home”下创建“blobStore-UUID”的文件,导致yarn-session启动失败。

1.  建议将"blob.storage.directory"配置选项设置成“/tmp”或者“/opt/huawei/Bigdata/tmp”。

2.  当用户将"blob.storage.directory"配置选项设置成自定义目录时,需要手动赋予用户该目录的owner权限。以下以FusionInsight的admin用户为例。

               A. 修改Flink客户端配置文件conf/flink-conf.yaml,配置blob.storage.directory:  /home/testdir/testdirdir/xxx。

               B. 创建目录/home/testdir(创建一层目录即可),设置该目录为admin用户所属。

说明:

                /home/testdir/下的testdirdir/xxx目录在启动Flink集群时会在每个节点下自动创建。

               C. 进入客户端路径,执行命令

               ./bin/yarn-session.sh -n 3 -jm 2048 -tm 3072,可以看到yarn-session正常启动并且成功创建目录。            

3.为什么Flink Web页面无法直接连接

问题:无法通过“http://JobManager IP:JobManager的端口”访问Web页面。

回答

由于浏览器所在的计算机IP地址未加到Web访问白名单导致。用户可以通过以下步骤来解决问题。

1.          查看客户端的配置文件“conf/flink-conf.yaml”。

2.          确认配置项“jobmanager.web.ssl.enabled”的值是“false”。如果不是,请修改配置项的值为“false”。如果是,请执行3

3.          确认配置项“jobmanager.web.access-control-allow-origin”和“jobmanager.web.allow-access-address”

               中是否已经添加浏览器所在的计算机IP地址。如果没有添加,可以通过这两项配置项进行添加。例如:

       jobmanager.web.access-control-allow-origin: 192.168.252.35,192.168.24.216 
        jobmanager.web.allow-access-address: 192.168.252.35,192.168.24.216

持续更新中。。。。。。。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/aA518189/article/details/103622261

智能推荐

settext 下划线_Android TextView 添加下划线的几种方式-程序员宅基地

文章浏览阅读748次。总结起来大概有5种做法:将要处理的文字写到一个资源文件,如string.xml(使用html用法格式化)当文字中出现URL、E-mail、电话号码等的时候,可以将TextView的android:autoLink属性设置为相应的的值,如果是所有的类型都出来就是**android:autoLink="all",当然也可以在java代码里 做,textView01.setAutoLinkMask(Li..._qaction::settext 无法添加下划线

TableStore时序数据存储 - 架构篇_tablestore 时间类型处理-程序员宅基地

文章浏览阅读6.3k次,点赞2次,收藏10次。摘要: 背景 随着近几年物联网的发展,时序数据迎来了一个不小的爆发。从DB-Engines上近两年的数据库类型增长趋势来看,时序数据库的增长是非常迅猛的。在去年我花了比较长的时间去了解了一些开源时序数据库,写了一个系列的文章(综述、HBase系、Cassandra系、InfluxDB、Prometheus),感兴趣的可以浏览。背景随着近几年物联网的发展,时序数据迎来了一个不小的爆发。从DB..._tablestore 时间类型处理

Ubuntu20.04下成功运行VINS-mono_uabntu20.04安装vins-mono-程序员宅基地

文章浏览阅读5.7k次,点赞8次,收藏49次。可以编译成功但是运行时段错误查找原因应该是ROS noetic版本中自带的OpenCV4和VINS-mono中需要使用的OpenCV3冲突的问题。为了便于查找问题,我只先编译feature_tracker包。解决思路历程:o想着把OpenCV4相关的库移除掉,但是发现编译feature_tracker的时候仍然会关联到Opencv4的库,查找原因是因为cv_bridge是依赖opencv4的,这样导致同时使用了opencv3和opencv4,因此运行出现段错误。oo进一步想着(1)把vins-mon_uabntu20.04安装vins-mono

TMS320C6748_EMIF时钟配置_tms 6748-程序员宅基地

文章浏览阅读3.6k次,点赞3次,收藏12次。创龙TL6748开发板中,EMIFA模块使用默认的PLL0_SYSCLK3时钟,使用AISgen for D800K008工具加载C6748配置文件C6748AISgen_456M_config(Configuration files,在TL_TMS6748/images文件夹下),由图可以看到DIV3等于4,注意这里的DIV3就是实际的分频值(x),而不是写入相应PLL寄存器的值(x-1)。_tms 6748

eigen稀疏矩阵拼接(基于块操作的二维拼接)的思考-程序员宅基地

文章浏览阅读5.9k次,点赞4次,收藏13次。转载请说明出处:eigen稀疏矩阵拼接(块操作)eigen稀疏矩阵拼接(块操作)关于稀疏矩阵的块操作:参考官方链接 However, for performance reasons, writing to a sub-sparse-matrix is much more limited, and currently only contiguous sets of columns..._稀疏矩阵拼接

基于Capon和信号子空间的变形算法实现波束形成附matlab代码-程序员宅基地

文章浏览阅读946次,点赞19次,收藏19次。波束形成是天线阵列信号处理中的一项关键技术,它通过对来自不同方向的信号进行加权求和,来增强特定方向的信号并抑制其他方向的干扰。本文介绍了两种基于 Capon 和信号子空间的变形算法,即最小方差无失真响应 (MVDR) 算法和最小范数算法,用于实现波束形成。这些算法通过优化波束形成权重向量,来最小化波束形成输出的方差或范数,从而提高波束形成性能。引言波束形成在雷达、声纳、通信和医学成像等众多应用中至关重要。它可以增强目标信号,抑制干扰和噪声,提高系统性能。

随便推点

Ubuntu好用的软件推荐_ubuntu开发推荐软件-程序员宅基地

文章浏览阅读3.4w次。转自:http://www.linuxidc.com/Linux/2017-07/145335.htm使用Ubuntu开发已经有些时间了。写下这篇文章,希望记录下这一年的小小总结。使用Linux开发有很多坑,同时也有很多有趣的东西,可以编写一些自动化脚本,添加定时器,例如下班定时关机等自动化脚本,同时对于服务器不太了解的朋友,建议也可以拿台Linux来实践下,同时Ubuntu在Androi_ubuntu开发推荐软件

Nginx反向代理获取客户端真实IP_nginx获取到的是交换机的ip-程序员宅基地

文章浏览阅读2.2k次。一,问题 nginx反向代理后,在应用中取得的ip都是反向代理服务器的ip,取得的域名也是反向代理配置的url的域名,解决该问题,需要在nginx反向代理配置中添加一些配置信息,目的将客户端的真实ip和域名传递到应用程序中。二,解决 Nginx服务器增加转发配置 proxy_set_header Host $host;_nginx获取到的是交换机的ip

Wireshark TCP数据包跟踪 还原图片 WinHex应用_wireshark抓包还原图片-程序员宅基地

文章浏览阅读1.4k次。Wireshark TCP数据包跟踪 还原图片 WinHex简单应用 _wireshark抓包还原图片

Win8蓝屏(WHEA_UNCORRECTABLE_ERROR)-程序员宅基地

文章浏览阅读1.5k次。Win8下安装VS2012时,蓝屏,报错WHEA_UNCORRECTABLE_ERROR(P.S.新的BSOD挺有创意":("),Google之,发现[via]需要BIOS中禁用Intel C-State,有严重Bug的嫌疑哦原因有空再看看..._win8.1 whea_uncorrectable_error蓝屏代码

案例课1——科大讯飞_科大讯飞培训案例-程序员宅基地

文章浏览阅读919次,点赞21次,收藏22次。科大讯飞是一家专业从事智能语音及语音技术研究、软件及芯片产品开发、语音信息服务的软件企业,语音技术实现了人机语音交互,使人与机器之间沟通变得像人与人沟通一样简单。语音技术主要包括语音合成和语音识别两项关键技术。此外,语音技术还包括语音编码、音色转换、口语评测、语音消噪和增强等技术,有着广阔的应用。_科大讯飞培训案例

perl下载与安装教程【工具使用】-程序员宅基地

文章浏览阅读4.7k次。Perl是一个高阶程式语言,由 Larry Wall和其他许多人所写,融合了许多语言的特性。它主要是由无所不在的 C语言,其次由 sed、awk,UNIX shell 和至少十数种其他的工具和语言所演化而来。Perl对 process、档案,和文字有很强的处理、变换能力,ActivePerl是一个perl脚本解释器。其包含了包括有 Perl for Win32、Perl for ISAPI、PerlScript、Perl。_perl下载

推荐文章

热门文章

相关标签