Sorry for the delay.
When a request comes in via REST, Quarkus creates a thread to
handle it; I believe it actually comes from a thread pool.
This code now does:
GraphTraversalSource traversal =
StaticInfo.getSingleton().getGraph().buildTransaction().checkExternalVertexExistence(false).consistencyChecks(false).propertyPrefetching(true).start().traversal();
for each incoming request. It then does a tx().commit and a
traversal.close() when complete.
The StaticInfo class contains a JanusGraph object (getGraph()).
What I'm wondering is there appears to be two ways to add nodes
and edges. One is using a GraphTraversalSource, and one is using
a JanusGraph:
GraphTraversalSource.addV()
Or
JanusGraph.addVertex
-Joe
On 6/22/2022 4:51 PM, Boxuan Li wrote:
toggle quoted messageShow quoted text
When you say use
JanusGraph.tx().createdThreadedTx() directly, what do
you mean? Can you give an example?
Thanks for all the help on this. I'm coming closer to a
solution thanks to you all.
Question - I've been using GraphTraversalSource to do all the
adding vertices and edges to my graph. Example:
GraphTraversalSource traversal =
JanusGraph.tx().createThreadedTx().traversal();
Is it better to use JanusGraph.tx().createdThreadedTx()
directly?
-Joe
On 6/17/2022 3:03 PM, Boxuan Li
wrote:
Yeah using `newTransaction()` won't make a
difference in your use case. Based on your input, there
are a couple of things you could try:
- As suggested by
Kevin, you could use locking. See https://docs.janusgraph.org/advanced-topics/eventual-consistency/#data-consistency.
It is slow but it will hopefully solve most race
conditions you have. Based on my understanding of
Cassandra's nature, I think you could still see such
inconsistencies but the chance is much lower for sure.
- You could periodically identify and remove the
inconsistencies using an offline pipeline.
- You could use an external locking service on client
side. For example, using Redis to make sure a
conflicting transaction won't start at the first place.
These solutions have their own pros & cons, so it
really depends on you.
Best,
Boxuan
So - unsurprisingly, Boxuan is correct.
Code like this:
GraphTraversalSource traversal =
StaticInfo.getGraph().newTransaction().traversal();
try {
datasourceVertex =
traversal.V().has("someID", id).next();
} catch (java.util.NoSuchElementException
nse) {
datasourceVertex =
traversal.addV("source").property("someID", id).next();
}
being called from multiple threads results in several
vertices with the same 'someID'.
Not sure how to fix this.
-Joe
On 6/17/2022 10:28 AM, Joe
Obernberger via lists.lfaidata.foundation wrote:
Good stuff - thank you Boxuan.
Backend is Cassandra running on bare metal on 15 nodes.
Race condition is rare.
When the race condition happens, I'm seeing duplicate
nodes/edges; basically the graph becomes invalid.
Yes. This is a good idea. I could write a spark job to
examine the graph and fix up discrepancies. Smart.
Not sure what a locking services is? Example?
My current plan (not tested yet) is to use a static
class that contains the JanusGraph 'graph'. On Quarkus
when a REST call comes in, a new thread is created.
That thread will use Marc's idea of
GraphTraversalSource traversal =
StaticInfo.getGraph().newTransaction().traversal();
Do stuff and then traversal.tx().commit();
That will be done in a loop so that if the commit fails,
it will retry X times.
At least that's my current plan. Not sure if it will
work.
-Joe
On 6/17/2022 8:52 AM,
Boxuan Li wrote:
Hi Joe,
Unfortunately the way Marc suggests
won’t help with your usecase. Tbh I would have
suggested the same answer as Marc before I saw
your second post. If one has one JVM thread
handling multiple transactions (not familiar
with quarkus so not sure if that is possible),
then one has to do what Marc suggested. But in
your usecase, it won't be any different from
your current usage because JanusGraph will
automatically create threaded transaction for
each thread (using ThreadLocal) when you use the
traversal object.
The real issue in your use case is
that you want ACID support, which really depends
on your backend storage. At least in our
officially supported Cassandra, HBase, and
BigTable adapters, this is not (yet) supported.
There are a few workarounds, though. Before
discussing that further, I would like to ask a few
questions:
- What is your backend storage and is it
distributed?
- How often does this “race condition”
happen? Is it very rare or it’s fairly
common?
- What is your end goal? Do you want to reduce
the chance of this “race condition”, or you
want to make sure this does not happen at all?
- Are you willing to resolve such duplicate
vertices/edges at either read time or offline?
- Are you willing to introduce a third
dependency, e.g. a distributed locking
service?
Best,
Boxuan
Thank you Marc. I'm currently doing everything
with a traversal, and then doing a
traversal.tx().commit()
Sounds like what you suggested is what I want, but
just to be clear:
Here's what I'm trying to do.
Thread 1/JVM1 gets a request that requires adding
new vertices and edges to the graph.
Thread 2/JVM1 gets a similar request.
Some of the vertices added in Thread 1 end up having
the same attributes/name has vertices from Thread 2,
but I only want to have one vertex if it's going to
have the same attributes.
If Thread 1 adds that vertex before it does a
commit, then Thread 2, when it looks up said vertex
won't find it; so it will also add it.
Code example (traversal is a GraphTraversalSource
gotten from JanusGraphFactory.traversal())
try {
correlationVertex =
traversal.V().has("correlationID",
correlationID).next();
} catch (java.util.NoSuchElementException
nse) {
correlationVertex = null;
}
.
.
.
if (correlationVertex == null) {
correlationVertex =
traversal.addV("correlation").property("correlationID",
correlationID).next();
correlationVertex.property("a", blah1);
correlationVertex.property("b", blah2);
}
I do similar things with edges:
try {
dataSourceToCorrelationEdge =
traversal.E().has("edgeID", edgeID).next();
} catch (NoSuchElementException nse) {
dataSourceToCorrelationEdge = null;
}
Ultimately, I'd like to have several JVMs handling
these requests; each which runs multiple threads.
I'll look at using a new transaction per call.
Thank you!
-Joe
Hi Joe,
Do you mean with threadsafe transactions that
requests from different client threads should be
handled independently, that is in different
JanusGraph Transactions?
In that case, I think you want to use a
GraphTraversalSource per request like this:
g = graph.newTransaction().traversal()
Best wishes, Marc
|
This email has been checked for viruses by
AVG antivirus software.
www.avg.com
|
|