Date   

Re: Not able to run queries using spark graph computer from java

hadoopmarc@...
 

Hi Sai,

The blog you mentioned is a bit outdated and  is for spark-1.x. To get an idea of what changes are needed to get OLAP running with spark-2.x, you can take a look at:
https://tinkerpop.apache.org/docs/current/recipes/#olap-spark-yarn

Best wishes,    Marc


Query Optimisation

Vinayak Bali
 

Hi All, 

g.inject(1).union(V().has('property1', 'vertex1').as('v1').union(outE().has('property1', 'edge1').as('e').inV().has('property1', 'vertex1'),outE().has('property1', 'edge2').as('e').inV().has('property1', 'vertex2')).as('v2'),V().has('property1', 'vertex3').as('v1').union(outE().has('property1', 'edge3').as('e').inV().has('property1', 'vertex2'),outE().has('property1', 'Component_Of').as('e').inV().has('property1', 'vertex1')).as('v2')).limit(100).select('v1','e','v2').by(valueMap().by(unfold()))

This query is returning 100 results of the form (v1,e,v2) and the time taken is in milliseconds.

 Rather than returning 100 results of (v1,e,v2) form, need to return 100 edges of each type. The query is as follows: 

//Query1

// 2mins vertex1:77, edge1: 36, edge2: 5, vertex2: 105, vertex3: 100, edge3: 100

g.inject(1).union(V().has('property1', 'vertex1').as('v1').union(outE().has('property1', 'edge1').limit(100).as('e').inV().has('property1', 'vertex1'),outE().has('property1', 'edge2').limit(100).as('e').inV().has('property1', 'vertex2')).as('v2'),V().has('property1', 'vertex3').as('v1').union(outE().has('property1', 'edge3').limit(100).as('e').inV().has('property1', 'vertex2'),outE().has('property1', 'Component_Of').limit(100).as('e').inV().has('property1', 'vertex1')).as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

But this takes 2 mins to execute which is not optimal. Tried some other approaches.

//Query2

// 2mins vertex1:77, edge1: 36, edge2: 5, vertex2: 105, vertex3: 100, edge3: 100

g.inject(1).union(V().has('property1', 'vertex1').as('v1').union(outE().has('property1', 'edge1').as('e').inV().has('property1', 'vertex1').limit(100),outE().has('property1', 'edge2').as('e').inV().has('property1', 'vertex2').limit(100)).as('v2'),V().has('property1', 'vertex3').as('v1').union(outE().has('property1', 'edge3').as('e').inV().has('property1', 'vertex2').limit(100),outE().has('property1', 'Component_Of').as('e').inV().has('property1', 'vertex1').limit(100)).as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

//Query3

// 529 ms vertex1:77, edge1: 36, edge2: 5, vertex2: 105, vertex3: 100, edge3: 100

g.inject(1).union(V().has('property1', 'vertex1').as('v1').union(outE().has('property1', 'edge1').as('e').inV().has('property1', 'vertex1'),outE().has('property1', 'edge2').as('e').inV().has('property1', 'vertex2')).limit(100).as('v2'),V().has('property1', 'vertex3').as('v1').union(outE().has('property1', 'edge3').as('e').inV().has('property1', 'vertex2'),outE().has('property1', 'Component_Of').as('e').inV().has('property1', 'vertex1')).limit(100).as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

//Query3

// 18 sec vertex1:77, edge1: 36, edge2: 5, vertex2: 105, vertex3: 100, edge3: 100

g.inject(1).union(V().has('property1', 'vertex1').as('v1').outE().has('property1', 'edge1').as('e').inV().has('property1', 'vertex1').
limit(100).as('v2'),V().has('property1', 'vertex1').as('v1').outE().has('property1', 'edge2').as('e').inV().has('property1', 'vertex2').
limit(100).as('v2'),V().has('property1', 'vertex3').as('v1').outE().has('property1', 'edge3').as('e').inV().has('property1', 'vertex2').
limit(100).as('v2'),V().has('property1', 'vertex3').as('v1').outE().has('property1', 'Component_Of').as('e').inV().has('property1', 'vertex1').
limit(100).as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

//Query4

// 18 sec vertex1:77, edge1: 36, edge2: 5, vertex2: 105, vertex3: 100, edge3: 100

g.inject(1).union(V().has('property1', 'vertex1').as('v1').outE().has('property1', 'edge1').limit(100).as('e').
inV().has('property1', 'vertex1').as('v2'),V().has('property1', 'vertex1').as('v1').outE().
has('property1', 'edge2').limit(100).as('e').inV().has('property1', 'vertex2').as('v2'),V().has('property1', 'vertex3').
as('v1').outE().has('property1', 'edge3').limit(100).as('e').inV().has('property1', 'vertex2').as('v2'),
V().has('property1', 'vertex3').as('v1').outE().has('property1', 'Component_Of').
limit(100).as('e').inV().has('property1', 'vertex1').as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

Query3 performs better, but when the limit changes it doesn't return the expected result as shown in the following queries: Query7 is equivalent to Query3, just limit is changed.

//Query5

// 2mins vertex1:25, edge1: 10, edge2: 5, vertex2: 15, vertex3: 10, edge3: 10

g.inject(1).union(V().has('property1', 'vertex1').as('v1').union(outE().has('property1', 'edge1').limit(10).as('e').inV().has('property1', 'vertex1'),outE().has('property1', 'edge2').limit(10).as('e').inV().has('property1', 'vertex2')).as('v2'),V().has('property1', 'vertex3').as('v1').union(outE().has('property1', 'edge3').limit(10).as('e').inV().has('property1', 'vertex2'),outE().has('property1', 'Component_Of').limit(10).as('e').inV().has('property1', 'vertex1')).as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

//Query6

// 2mins vertex1:25, edge1: 10, edge2: 5, vertex2: 15, vertex3: 10, edge3: 10

g.inject(1).union(V().has('property1', 'vertex1').as('v1').union(outE().has('property1', 'edge1').as('e').inV().has('property1', 'vertex1').limit(10),outE().has('property1', 'edge2').as('e').inV().has('property1', 'vertex2').limit(10)).as('v2'),V().has('property1', 'vertex3').as('v1').union(outE().has('property1', 'edge3').as('e').inV().has('property1', 'vertex2').limit(10),outE().has('property1', 'Component_Of').as('e').inV().has('property1', 'vertex1').limit(10)).as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

//Query7

// 278 ms vertex1:18, edge1: 8, edge2: 2, vertex2: 12, vertex3: 10, edge3: 10

g.inject(1).union(V().has('property1', 'vertex1').as('v1').union(outE().has('property1', 'edge1').as('e').inV().has('property1', 'vertex1'),outE().has('property1', 'edge2').as('e').inV().has('property1', 'vertex2')).limit(10).as('v2'),V().has('property1', 'vertex3').as('v1').union(outE().has('property1', 'edge3').as('e').inV().has('property1', 'vertex2'),outE().has('property1', 'Component_Of').as('e').inV().has('property1', 'vertex1')).limit(10).as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

//Query8

// 18 sec vertex1:25, edge1: 10, edge2: 5, vertex2: 15, vertex3: 10, edge3: 10

g.inject(1).union(V().has('property1', 'vertex1').as('v1').outE().has('property1', 'edge1').as('e').inV().has('property1', 'vertex1').
limit(10).as('v2'),V().has('property1', 'vertex1').as('v1').outE().has('property1', 'edge2').as('e').inV().has('property1', 'vertex2').
limit(10).as('v2'),V().has('property1', 'vertex3').as('v1').outE().has('property1', 'edge3').as('e').inV().has('property1', 'vertex2').
limit(10).as('v2'),V().has('property1', 'vertex3').as('v1').outE().has('property1', 'Component_Of').as('e').
inV().has('property1', 'vertex1').limit(10).as('v2')).select('v1','e','v2').by(valueMap().by(unfold()))

Also, the limit doesn't affect the time taken to execute the query. It's constant for both the limits. 
Request you share your view and help me to solve the problem in an efficient way.

Thanks & Regards,
Vinayak


Re: Not able to run queries using spark graph computer from java

Sai Supraj R
 

Hi Marc,
I got this when querying using OLTP:
gremlin> g.V(1469152598528)
==>v[1469152598528]
gremlin> g.V(1469152598528).elementMap()
==>[id:1469152598528,label:vertex]

I am also trying to run spark graph computer with yarn on emr.

Spark version = 2.4.4
Scala version = 2.12.10
java.io.FileNotFoundException: File file:/home/hadoop/.sparkStaging/application_1618505307369/__spark_libs__910446852825.zip does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:671)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:992)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:661)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:464)
at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:243)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:236)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:224)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


I followed this blog but ended up with the above exception:

Thanks
Sai



On Fri, May 7, 2021 at 7:33 AM <hadoopmarc@...> wrote:
Hi Sai,

What happens in createTraversal()?

What do you get with g.V(1469152598528).elementMap() if you open the graph for OLTP queries?

Best wishes,   Marc


Re: Not able to run queries using spark graph computer from java

hadoopmarc@...
 

Hi Sai,

What happens in createTraversal()?

What do you get with g.V(1469152598528).elementMap() if you open the graph for OLTP queries?

Best wishes,   Marc


Re: Support for DB cache for Multi Node Janus Server Setup

hadoopmarc@...
 

Hi Pasan,

The multiple janusgraph nodes share the same storage backend, so the common caching is done in the storage backend.

Best wishes,    Marc


Support for DB cache for Multi Node Janus Server Setup

pasansumanathilake@...
 

Hi All,
I would like to understand the possibility of horizontal scaling of janusgraph servers while keeping the cache enabled. Based on the janusgraph document - https://docs.janusgraph.org/basics/cache/ if needed to use multi-node Janus server setup we need to disable the cache. Is there common caching layer support for the Janus graph?

Regards,
Pasan


Re: Not able to run queries using spark graph computer from java

Sai Supraj R
 

Hi Marc,

Sorry my bad I have posted the wrong code.

I used Graph graph = GraphFactory.open("read-cql.properties");

and i got the above error.

Thanks
Sai


On Thu, May 6, 2021 at 10:11 AM <hadoopmarc@...> wrote:
Hi Sai,

The calling code you present is not complete.

The first line should read (because HadoopGraph does not derive from JanusGraph):
Graph graph = GraphFactory.open("read-cql.properties");
Best wishes,    Marc



Re: Not able to run queries using spark graph computer from java

hadoopmarc@...
 

Hi Sai,

The calling code you present is not complete.

The first line should read (because HadoopGraph does not derive from JanusGraph):
Graph graph = GraphFactory.open("read-cql.properties");
Best wishes,    Marc



Re: olap connection with spark standalone cluster

hadoopmarc@...
 

Hi Sai,

This exception is not really related to this thread.

JanusGraph with SparkGraphComputer can only be used with the TinkerPop HadoopGraph. Therefore, the example in the JanusGraph ref docs has a properties file starting with the following lines:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat
Some of the other JanusGraph storage backends have their own InputFormat.
If you encounter other problems please include the properties file and calling code.

Best wishes,    Marc


Not able to run queries using spark graph computer from java

Sai Supraj R
 

Hi,

I am getting the following error when running queries using spark graph computer from java.
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Edge with id already exists: 1469152598528
at org.janusgraph.hadoop.formats.util.JanusGraphVertexDeserializer.readHadoopVertex(JanusGraphVertexDeserializer.java:182)
at org.janusgraph.hadoop.formats.util.HadoopRecordReader.nextKeyValue(HadoopRecordReader.java:69)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
... 3 more

code:
Graph graph = JanusGraphFactory.open("read-cql.properties");
GraphTraversalSource g = createTraversal();
x = g.V().count()

read-cql.properties:
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat


gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.persistContext=true

janusgraphmr.ioformat.conf.storage.backend=cql
# This specifies the hostname & port for Cassandra data store.
janusgraphmr.ioformat.conf.storage.hostname=10.88.68.52,10.88.68.11,10.88.68.47
janusgraphmr.ioformat.conf.storage.port=9042
# This specifies the keyspace where data is stored.
janusgraphmr.ioformat.conf.storage.cql.keyspace=iqvia

cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.widerows=true
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator

Thanks
Sai



Re: olap connection with spark standalone cluster

Sai Supraj R
 

HI I tried with the above solution but it is still throwing error :

java.lang.Throwable: Hook creation trace
at org.janusgraph.graphdb.database.StandardJanusGraph.<init>(StandardJanusGraph.java:185) [load_test.jar:na]
at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:161) [load_test.jar:na]
at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:132) [load_test.jar:na]
at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:79) [load_test.jar:na]
at com.iqvia.janus.LoadDataTest1.main(LoadDataTest1.java:41) [load_test.jar:na]
Exception in thread "main" java.lang.IllegalArgumentException: Graph does not support the provided graph computer: SparkGraphComputer
at org.apache.tinkerpop.gremlin.structure.Graph$Exceptions.graphDoesNotSupportProvidedGraphComputer(Graph.java:1190)
at org.janusgraph.graphdb.tinkerpop.JanusGraphBlueprintsGraph.compute(JanusGraphBlueprintsGraph.java:157)
at com.iqvia.janus.LoadDataTest1.main(LoadDataTest1.java:58)

Thanks
Sai


Re: Any advice on performance concern of JanusGraph with Cassandra&Elastic Search?

hadoopmarc@...
 

Many organizations use JanusGraph on this scale. Insertion of data is slow so you need massive parallel operations to do an entire bulk load overnight. Most people use tools like Apache Spark for this. A useful blog series on this, can be found at (you see, there is a wide community!):
https://www.experoinc.com/post/janusgraph-nuts-and-bolts-part-1-write-performance
https://www.experoinc.com/post/have-you-had-your-janusgraph-tuneup
https://www.scylladb.com/2020/05/14/zeotap-a-graph-of-twenty-billion-ids-built-on-scylla-and-janusgraph/

Of course, you have to monitor the Cassandra and Elasticsearch clusters to check whether they are well balanced and not overloaded. Although JanusGraph can handle some overloading ("TemporaryBackendException") there are limits to this.

If you do not have any Cassandra legacy or experience and with the knowledge that JanusGraph fully supports ScyllaDb, you probably can believe the pitch made by Scylla itself:
https://www.scylladb.com/scylla-vs-cassandra/

Best wishes,    Marc


Re: Any advice on performance concern of JanusGraph with Cassandra&Elastic Search?

hazalkecoglu@...
 

+ I want to add one more question.
What about ScyllaDB as storage backend? Is it better to use this in terms of performance?

 


Any advice on performance concern of JanusGraph with Cassandra&Elastic Search?

hazalkecoglu@...
 

Hi everyone,
We are working on a project that we would like to use JanusGraph. Our system will consist of 100MM nodes and 1B edges between those nodes.
We are going to work with last 90 days data.System needs to work like uploading 1B edges and deleting last 90 days of everyday.
So far we tried to experience JanusGraph with Cassandra and ElasticSearch.
 
We want to learn about your experiences and also contribute with ours during the project.
Is there anyone who worked with that huge volume of data? What should be our concerns when to work with that kind of big data?
 
Also what will be the best and fast approach of uploading 1B edges everyday?
Thanks a lot
 


Re: Backup & Restore of Janusgraph Data with Mixed Index Backend (Elastisearch)

rngcntr
 

If your use case can handle the downtime, stopping writes and waiting until all changes are propagated to both the storage and the index backend sounds like a viable solution. However, I have no idea about the order of magnitude of the necessary downtime.


Re: Backup & Restore of Janusgraph Data with Mixed Index Backend (Elastisearch)

florian.caesar
 

Yeah, good point, it's a bit hairy. Having potentially inconsistent index backups makes them much less attractive. Though I guess I could run a reindex job on just the delta since last Scylla write time and last ES write time.
As a simpler alternative, how about pausing write transactions for say ~1s and initiating simultaneous backups of my Scylla and ES clusters during that time?
From what I can tell, both backup mechanisms guarantee snapshot isolation. A short write pause should ensure that all writes have propagated.
What caveats do you see with this approach?

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐

On Monday, May 3rd, 2021 at 9:49 AM, rngcntr <florian.grieskamp@...> wrote:

Although the solution presented by Marc is also the closest to a consistent backup that I can think of, there are obviously caveats to it. Updates of values which were written after the time of the Scylla snapshot could be present in ES, corrupting the state of the index. Therefore, checking the pure existence of a vertex in Scylla may not be sophisticated enough to guarantee a consistent state. Verifying the property values explicitly can be helpful here, but that still leaves us with the question how to handle mismatches of this kind.
Just keep that in mind when using such a backup strategy in your environment.

Best regards,
Florian


Re: Backup & Restore of Janusgraph Data with Mixed Index Backend (Elastisearch)

rngcntr
 

Although the solution presented by Marc is also the closest to a consistent backup that I can think of, there are obviously caveats to it. Updates of values which were written after the time of the Scylla snapshot could be present in ES, corrupting the state of the index. Therefore, checking the pure existence of a vertex in Scylla may not be sophisticated enough to guarantee a consistent state. Verifying the property values explicitly can be helpful here, but that still leaves us with the question how to handle mismatches of this kind.
Just keep that in mind when using such a backup strategy in your environment.

Best regards,
Florian


Re: Backup & Restore of Janusgraph Data with Mixed Index Backend (Elastisearch)

florian.caesar
 

Awesome, yes, that's very similar to what I was planning!
It's not perfect and definitely needs to tested thoroughly, but it should be much faster and reasonably scriptable.
I'll let you all know how it goes when I get to setting this up.. hopefully won't be long, a decade or so at most.

Thanks!

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐

On Monday, May 3rd, 2021 at 9:06 AM, <hadoopmarc@...> wrote:
In theory (not used in practice) the following should be possible:

  1. make a snapshot of the ScyllaDB keyspace
  2. after the ScyllaDB snapshot is written, make a snapshot of corresponding ES mixed indices
  3. restore all snapshots on separate temporary clusters (doing this manually on a production cluster is a no-go)
  4. find the latest writetime in the ScyllaDB snapshot
  5. try all ES index items later than this timestamp and remove them if the corresponding vertices cannot be retrieved from ScyllaDB
  6. make a new snapshot of the ES mixed indices
This is rather cumbersome, of course, but it would allow for a fast restore of consistent indices (this does not deal with the other issue, the partially succeeded transactions).

Best wishes,   Marc


Re: Backup & Restore of Janusgraph Data with Mixed Index Backend (Elastisearch)

hadoopmarc@...
 

In theory (not used in practice) the following should be possible:
  1. make a snapshot of the ScyllaDB keyspace
  2. after the ScyllaDB snapshot is written, make a snapshot of corresponding ES mixed indices
  3. restore all snapshots on separate temporary clusters (doing this manually on a production cluster is a no-go)
  4. find the latest writetime in the ScyllaDB snapshot
  5. try all ES index items later than this timestamp and remove them if the corresponding vertices cannot be retrieved from ScyllaDB
  6. make a new snapshot of the ES mixed indices
This is rather cumbersome, of course, but it would allow for a fast restore of consistent indices (this does not deal with the other issue, the partially succeeded transactions).

Best wishes,   Marc


Re: Backup & Restore of Janusgraph Data with Mixed Index Backend (Elastisearch)

florian.caesar
 

Thanks again. Yeah, might end up doing that, but it seems like a complicated solution.. hmm.

Regarding the feature request, I'll dig into the code and ask around the janusgraph-dev group :)

781 - 800 of 6661