Topics

Janusgraph spark on yarn error


j2kupper@...
 

Hi!

I have this configuration

janusgraph: 0.5.2
spark 2.4.0
hbase 2.1.5
hadoop 2.7.7

I have 3 nodes hadoop on my cluster.
I am set up janusgraph with hadoop infrastructure and run load data and read data on spark. But i have error

21/01/18 17:27:25 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/01/18 17:27:26 INFO yarn.ApplicationMaster: Preparing Local resources
Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetFileInfoRequestProto cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:226)
    at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:776)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2117)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$8$$anonfun$apply$3.apply(ApplicationMaster.scala:220)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$8$$anonfun$apply$3.apply(ApplicationMaster.scala:217)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$8.apply(ApplicationMaster.scala:217)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$8.apply(ApplicationMaster.scala:182)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:773)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:772)
    at org.apache.spark.deploy.yarn.ApplicationMaster.<init>(ApplicationMaster.scala:182)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:796)
    at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:827)
    at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)

This is my hadoop-load.properties

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat
gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.inputLocation=./files.json
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true
gremlin.spark.persistContext=false

spark.master=yarn
spark.yarn.archive=hdfs:///user/root/janusgraph_libs.zip

spark.yarn.maxAppAttempts=5
spark.executor.instances=2
spark.shuffle.service.enabled=false
spark.driver.memory=4g
spark.driver.cores=4
spark.executor.cores=5
spark.executor.memory=19g
spark.executor.extraClassPath=/usr/local/janusgraph/lib/*:/usr/local/hadoop/etc/hadoop/conf:/usr/local/spark/conf:/usr/local/hbase/conf
spark.executor.extraJavaOptions=-Djava.library.path=/usr/local/hadoop/lib/native
spark.yarn.am.extraJavaOptions=-Djava.library.path=/usr/local/hadoop/lib/native
spark.dynamicAllocation.enabled=false
spark.io.compression.codec=snappy
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator
spark.hadoop.home.dir=/usr/local/hadoop
spark.hadoop.cloneConf=true


How can i fix it?
Thank you


hadoopmarc@...
 

Hi

OK, do I understand right that you want to bulk load data from hdfs into janusgraph-hbase? Nothing wrong with that requirement, I do not know how to ask this in a more friendly way!

Is your input data really in GraphSON format? (it is difficult to get this right!)

With that established, we can see further, because this is a broad subject.

Marc


hadoopmarc@...
 

#Private reply from OP:
Yes, i am running bulk load from hdfs(graphson) in janusgraph-hbase.
Yes, i have graphson part files from spark job with a structure like grateful-dead.json example.

But if application master starting on certain(third) hadoop node is working well.
All nodes have identical configuration.

#Answer HadoopMarc
You do not need to use HadoopGraph for this. Indeed, there used to be a BulkLoaderVertexProgram in Apache TinkerPop, but this could not be maintained and keep working reliably for the various versions of the various graph systems. Until now, JanusGraph does not have developed its own BulkLoaderVertexProgram. Also note that while their does exist an HBaseInputFormat for loading a janusgraph-hbase graph into a HadoopGraph, there does not exist an HBaseOutputFormat to write an HadoopGraph into janusgraph-hbase.

This being said, nothing is lost. You can simply write a spark application that has individual spark executors connect to janusgraph in the usual (OLTP) way and load data with the usual graph.traversal() API, that is using the addV(), addE() and properties() traversal steps. Of course, you could also try and copy the old code for the BulkLoaderVertexProgram into your project, but I believe the way I sketched is conceptually simpler and less error prone.

I tend to remember that their exist some blog series about using JanusGraph at scale, but I do not have then at hand and will look for them later on. If you find these blogs yourself, pleas post the links!

Best wishes,      Marc


j2kupper@...
 
Edited

Thank you for response!

I am using BulkLoaderVertexProgram from console. Sometimes it works correctly.
This error still exist when i am running read from hbase spark job.

my  read-hbase.properties

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.hbase.HBaseInputFormat
gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=false
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output

janusgraphmr.ioformat.conf.storage.backend=hbase
janusgraphmr.ioformat.conf.storage.hostname=192.168.1.11,192.168.1.12,192.168.1.13,192.168.1.14
janusgraphmr.ioformat.conf.storage.hbase.table=testTable


spark.master=yarn
spark.submit.deployMode=client
spark.yarn.archive=/usr/local/janusgraph/janusgraph_libs.zip
spark.executor.instances=2
spark.driver.memory=8g
spark.driver.cores=4
spark.executor.cores=5
spark.executor.memory=19g

spark.executor.extraClassPath=/usr/local/janusgraph/lib:/usr/local/hadoop/etc/hadoop/conf
spark.executor.extraJavaOptions=-Djava.library.path=/usr/local/hadoop/lib/native
spark.yarn.am.extraJavaOptions=-Djava.library.path=/usr/local/hadoop/lib/native
spark.yarn.appMasterEnv.CLASSPATH=/usr/local/janusgraph/lib:/usr/local/hadoop/etc/hadoop/conf


spark.driver.extraLibraryPath=/usr/local/hadoop/lib/native
spark.executor.extraLibraryPath=/usr/local/hadoop/lib/native

spark.dynamicAllocation.enabled=false
spark.io.compression.codec=snappy
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator

Can you provide some code example spark of application loading data OLTP way?
Which program langugage can i use? (I want python, if it`s possible)


hadoopmarc@...
 

The path of the BulkLoaderVertexProgram might be doable, but I cannot help you on that one. In the stack trace above, the yarn appmaster from spark-yarn apparently tries to communicate with HBase but finds that various libraries do not match. This failure arises because the JanusGraph distribution does not include spark-yarn and thus is not handcrafted to work with spark-yarn.

For the path without BulkLoaderVertexProgram you inevitably need a JVM language (java, scala, groovy). In this case, a spark executor is unaware of any other executors running and  is simply passed a callable (function) to execute (through RDD.mapPartitions() or through a spark-sql UDF). This callable can be part of a class that establish its own JanusGraph instances in the OLTP way. Now, you only have to deal with the executor CLASSPATH which does not need spark-yarn and the libs from the janusgraph distribution suffice.

Some example code can be found at:
https://nitinpoddar.medium.com/bulk-loading-data-into-janusgraph-part-2-ca946db26582

Best wishes,    Marc


j2kupper@...
 

Thank you!

Sorry for my long time answer.
I am do some experiments with janusgraph.