Re: How to open the same graph multiple times and not get the same object?


Mladen Marović <mladen...@...>
 

Hello Boxuan,

I need to support reindexing very large graphs. To my knowledge, the only feasible way that's supported is via the `MapReduceIndexManagement` class. This is not ideal for me as I'd like to utilise an existing Apache Spark cluster to run this job, and `MapReduceIndexManagement` is a Hadoop/MapReduce implementation. Therefore, I started writing a `SparkIndexManagement` class that's supposed to be a drop-in replacement that offers Spark support.

The basic structure of the code that processes a single partition should be something like this:

        public ScanMetrics processPartition(Iterator<Tuple2<NullWritable, VertexWritable>> vertices) {
            if (partition.hasNext()) {
                // open the graph
                JanusGraph graph = JanusGraphFactory.open(getGraphConfiguration());

                // prepare for partition processing
                job.workerIterationStart(graph, getJobConfiguration(), metrics);
                
                // find and process each vertex
                vertices.forEachRemaining(
                    tuple -> {
                        ...
                        JanusGraphVertex vertex = ...  // load the vertex
                        job.process(vertex, metrics);
                        ...
                    }
                );
                
                // finish processing the partition
                job.workerIterationEnd(metrics);
            }

            ...
        }

At first everything seemed quite straightforward, so I implemented a quick-and-dirty solution as a proof of concept. However, after running the first buildable solution, I came upon an unexpected error: "java.lang.IllegalArgumentException: The transaction has already been closed". The confusing part was that the implementation worked when I ran the local Spark cluster as "local[1]" (which spawns only one worker thread), but when running it as "local[*]" (which spawns multiple worker threads, one per core), the error would always appear, although not always on the same task.

After some digging, I seem to have found the main cause. Loading the graph data by using `org.janusgraph.hadoop.formats.cql.CqlInputFormat` in the `SparkContext.newAPIHadoopRDD()` call returns a `JavaPairRDD<NullWritable, VertexWritable>` with several partitions, as expected. The graph used to read vertices in this input format is opened via `JanusGraphFactory.open()`. After iterating through all vertices returned by the partition, the underlying graph is closed in a final `release()` call for that partition. This makes sense because that partition is done with reading. However, when processing that partition, I need to open a graph to pass to `IndexRepairJob.workerIterationStart()`, and also create a separate read-only transaction (fromt that same graph) to fetch the vertex properly and pass it to `IndexRepairJob.process()`. `IndexRepairJob` also creates a write transaction to make some changes to the graph.

This would all work fine in MapReduce because there, the first `map()` step is run in its entirety first, which means that reindexing/vertex is done only after ALL partitions have been read and the `CqlInputFormat` finished its part. I don't have much experience in MapReduce, but that's how I understand it to work - a single map() result is first written on disk, and then that result is read from the disk to be the input to the subsequent map() call. On the other hand, Spark optimizes the map-reduce paradigm by chaining subsequent map() calls to keep objects in memory as much as possible. So, when this runs on a "local[*]" cluster, or a Spark executor with multiple cores, and the graph is opened via JanusGraphFactory.open(), all threads in that executor share the graph object. Each thread runs on a different RDD partition, but they can be at different phases of the reindexing process (different map() steps) at the same time. When one thread closes the graph for whatever reason (e.g. when `CqlInputFormat` finishes reading a partition), other threads simply blow up.

For example, if I have partitions/tasks with 300, 600 and 900 vertices and they all run on a single 3-core Spark executor, they'll be processed in parallel by three separate threads. The first thread will process 300 vertices and, upon iterating the final vertex, will close the underlying graph (as part of the `CqlInputFormat` implementation, from what I gathered). Closing the graph immediately closes all opened transactions. However, the same graph is used in other threads as well in parallel. The second thread might have only finished processing 350 vertices at the time the first closed the graph, so the next time it tries to write something, it crashes because it uses a transaction that's already closed.

The ideal solution should be to open separate graph instances of the same graph, one in `CqlInputFormat`, and the other that is passed to `IndexRepairJob.workerIterationStart()`, for each task. In that case, if one graph is closed, no other tasks or processing phases would be affected. I tried that out today by opening the graph using the `StandardJanusGraph` constructor (at least in my part of the code) and so far that worked well because in most of my test runs the job completed successfully. The runs that failed occurred during debugging, when the execution was stuck on a breakpoint for a while, so maybe there were some timeouts involved or something. This remains to be tested. I also strongly suspect that the problem still remains, at least in theory, because `CqlInputFormat` still uses the `JanusGraphFactory.open()` call, but the probability for that is reduced, at least in the environment and on the data I'm currently testing on. I haven't analyzed the `CqlInputFormat` code fully to understand how it behaves in that case yet.

Admittedly, I could provide my own InputFormat class, or at least subclass it and try to hack and slash and make it work somehow, but that seriously complicates everything and defeats the purpose of everything I'm trying to do here.

Another workaround would be to limit each Spark executor to use only one core, but that seems wasteful and is definitely something I would try to avoid.

I probably missed a lot of details, but that's the general idea and my conclusions so far. Feel free to correct me if I missed anything or wrote anything wrong, as well as point me in the right direction if such an implementation already exists and I just didn't come across it. 

Best regards,

Mladen

PS An additional question here would be to see if there is any danger in opening multiple separate graph instances and using them to modify the graph, but as this is already done in the current MapReduce implementation anyway, and all my transactions are opened as read-only, I'm guessing that shouldn't pose a problem here.


On Wednesday, December 9, 2020 at 4:32:10 PM UTC+1 li...@... wrote:
Hi Mladen,

Agree with Marc, that's something you could try. If possible, could you share the reason why you have to open the same graph multiple times with different graph objects? If there is no other solution to your problem then this can be a feature request.

Best regards,
Boxuan
On Wednesday, December 9, 2020 at 2:50:48 PM UTC+8 HadoopMarc wrote:
Hi Mladen,

The constructor of StandardJanusGraph seems worth a try:


HTH,   Marc

Op dinsdag 8 december 2020 om 19:34:55 UTC+1 schreef Mladen Marović:
Hello,

I'm writing a Java program that, for various implementation details, needs to open the same graph multiple times. Currently I'm using JanusGraphFactory.open(...), but this always looks up the graph by its name in the static JanusGraphManager instance and returns the same object.

is there a way to create two different object instances of the same Janusgraph graph? These instances need to be completely separate, so that closing one graph does not close transactions created using the other graph. I checked the documentation and inspected the code directly while debugging, but couldn't find anything useful.

Thanks in advance,

Mladen

Join janusgraph-users@lists.lfaidata.foundation to automatically receive all group messages.