Spark Streaming - Kafka - java.nio.BufferUnderflowException -
i'm running below error while trying consume message kafka through spark streaming (kafka direct api). used work ok when using spark standalone cluster manager. switched using cloudera 5.7 using yarn manage spark cluster , started see below error.
few details: - spark 1.6.0 - using kafka direct stream api - kafka broker version (0.8.2.1) - kafka version in classpath of yarn executors (0.9) - kafka brokers not managed cloudera
the difference see between using standalone cluster manager , yarn kafka version being used on consumer end. (0.8.2.1 vs 0.9)
trying figure if version mismatch issue ? if indeed case, fix other upgrading kafka brokers 0.9 well. (eventually yes not now)
org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 200.0 failed 4 times, recent failure: lost task 0.3 in stage 200.0 (tid 203,..): java.nio.bufferunderflowexception @ java.nio.heapbytebuffer.get(heapbytebuffer.java:151) @ java.nio.bytebuffer.get(bytebuffer.java:715) @ kafka.api.apiutils$.readshortstring(apiutils.scala:40) @ kafka.api.topicdata$.readfrom(fetchresponse.scala:96) @ kafka.api.fetchresponse$$anonfun$4.apply(fetchresponse.scala:170) @ kafka.api.fetchresponse$$anonfun$4.apply(fetchresponse.scala:169) @ scala.collection.traversablelike$$anonfun$flatmap$1.apply(traversablelike.scala:251) @ scala.collection.traversablelike$$anonfun$flatmap$1.apply(traversablelike.scala:251) @ scala.collection.immutable.range.foreach(range.scala:141) @ scala.collection.traversablelike$class.flatmap(traversablelike.scala:251) @ scala.collection.abstracttraversable.flatmap(traversable.scala:105) @ kafka.api.fetchresponse$.readfrom(fetchresponse.scala:169) @ kafka.consumer.simpleconsumer.fetch(simpleconsumer.scala:135) @ org.apache.spark.streaming.kafka.kafkardd$kafkardditerator.fetchbatch(kafkardd.scala:192) @ org.apache.spark.streaming.kafka.kafkardd$kafkardditerator.getnext(kafkardd.scala:208) @ org.apache.spark.util.nextiterator.hasnext(nextiterator.scala:73) @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:327) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.generic.growable$class.$plus$plus$eq(growable.scala:48) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:103) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:47) @ scala.collection.traversableonce$class.to(traversableonce.scala:273) @ scala.collection.abstractiterator.to(iterator.scala:1157) @ scala.collection.traversableonce$class.tobuffer(traversableonce.scala:265) @ scala.collection.abstractiterator.tobuffer(iterator.scala:1157) @ scala.collection.traversableonce$class.toarray(traversableonce.scala:252) @ scala.collection.abstractiterator.toarray(iterator.scala:1157) @ org.apache.spark.rdd.rdd$$anonfun$tolocaliterator$1$$anonfun$org$apache$spark$rdd$rdd$$anonfun$$collectpartition$1$1.apply(rdd.scala:942) @ org.apache.spark.rdd.rdd$$anonfun$tolocaliterator$1$$anonfun$org$apache$spark$rdd$rdd$$anonfun$$collectpartition$1$1.apply(rdd.scala:942) @ org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1869) @ org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1869) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:66) @ org.apache.spark.scheduler.task.run(task.scala:89) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:214) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745)
driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1431) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1419) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1418) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)
Comments
Post a Comment