How to replay a transaction log from the begining


Sandeep Mishra <sandy...@...>
 

Hi All,

I have created a transaction log in hbase as backend using following code.

JanusGraphTransaction tx = graph.buildTransaction().logIdentifier("addedPerson").start();

JanusGraphVertex u = tx.addVertex("human");
u.property("name", "proteros");
u.property("age", 36);

JanusGraphVertex u1 = tx.addVertex("human");
u.property("name", "sandeep");
u.property("age", 34);
tx.commit();


and trying to replay it using below code.

LogProcessorFramework logProcessor1 = JanusGraphFactory.openTransactionLog(graph);
logProcessor1.addLogProcessor("addedPerson").setStartTime(Instant.EPOCH).setProcessorIdentifier("addedPersonProcessor").setStartTime(Instant.EPOCH).addProcessor(new ChangeProcessor() {
@Override
public void process(JanusGraphTransaction janusGraphTransaction, TransactionId transactionId, ChangeState changeState) {
System.out.println("processing old log");
for (Vertex v : changeState.getVertices(Change.ADDED)) {
System.out.println("vertex " + v.label());
if (v.label().equals("human")) totalHumansAdded.incrementAndGet();
}
}
}).build();

However, my logProcessor doesn't process any previously saved vertices and only detect changes which are performed in after the LogProcessor has been built.

I tried setting up Start Time to INSTANT.EPOCH, which as per documentation should make log processing to start from BEGINING but no help.
The only difference is see in logs  is the MessagePuller is started from EPOCH.

22:59:19,039  INFO KCVSLog:744 - Loaded unidentified ReadMarker start time 2018-05-01T14:59:19.023Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@57db2b13
22:59:19,054  INFO KCVSLog:748 - Loaded indentified ReadMarker start time 1970-01-01T02:18:20Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@2f953efd
22:59:19,054  INFO KCVSLog:748 - Loaded indentified ReadMarker start time 1970-01-01T02:18:20Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@14bdbc74
22:59:19,054  INFO KCVSLog:748 - Loaded indentified ReadMarker start time 1970-01-01T02:18:20Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@5a7fe64f
22:59:19,054  INFO KCVSLog:748 - Loaded indentified ReadMarker start time 1970-01-01T02:18:20Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@41330d4f

Another point to note is, an vertex inserted using another Java Instance is detected by logprocessor instances running in any other Java Instance.

Looks like its a bug as it doesn't behave as per documentation.
When a log processor is built against a particular log, such as the addedPerson log in the example above, it will start reading transactional change records from the log immediately upon successful construction and initialization up to the head of the log. The start time specified in the builder marks the time point in the log where the log processor will start reading records. Optionally, one can specify an identifier for the log processor in the builder. The log processor will use the identifier to regularly persist its state of processing, i.e. it will maintain a marker on the last read log record. If the log processor is later restarted with the same identifier, it will continue reading from the last read record. This is particularly useful when the log processor is supposed to run for long periods of time and is therefore likely to fail. In such failure situations, the log processor can simply be restarted with the same identifier. It must be ensured that log processor identifiers are unique in a JanusGraph cluster in order to avoid conflicts on the persisted read markers.



Any comments???

Regards,
Sandeep


abdel...@...
 

hello i have the same error did you result it ?????????


Sandeep Mishra <sandy...@...>
 

Hello Abdell,

Basically the documentation is not clear.
If you want to process a log from the begining of the log, you need to know the time of begining.
You can use that time to start log processor.

For Epoch.Instant makes the logProcessor continue polling starting from Epoch, which gives an illusion that its not working.
I have submitted a enhancement request to support processing from begining of log.

Cheers,
Sandeep

On Tuesday, May 29, 2018 at 12:54:21 AM UTC+8, abd...@... wrote:
hello i have the same error did you result it ?????????


ojas.dubey@...
 

Hi,
 
Was wondering if this had been implemented.
 
I am running JanusGraph over Cassandra and was trying to work with the transaction log feature using the provided documentation.
 
So far I have managed to start the transaction with the identifier (the ulog tables are created in cassandra) but am still unable to get the Java callback to work. Have browsed through some threads here as well but still not able to get it to work.
 
Any help is appreciated.
 
 
Regards,
Ojas


Boxuan Li
 

Hi Ojas,

Can you share your code and explain what you mean by "unable to work"? Is it running but not producing results as you expected, or encountering errors/exceptions?

Best,
Boxuan


ojas.dubey@...
 

Hi Boxuan,
Please find the code below:

1. Starting the transaction (identifier value is TestBatchLogger)

public JanusGraphTransaction startJanusGraphTransaction(String identifier) {
    return janusGraphSchema.getConfiguredGraph().buildTransaction().logIdentifier(identifier).start();
}

2. Multiple add vertex/edge operations on the graph through  (e.g.) 

GraphTraversal<Vertex, Vertex> traversal = g.addV("idVertex")
.property(id, "uuid");
return traversal.next();

Here g is the gremlin GraphTraversalSource object obtained from JanusGraphFactory.open(<graphConfigPropertiesFile>).traversal()

3. Commit on the transaction object returned by the start transaction method.

So I wanted to replay the logs of this transaction. For this I made a call to the below method

public void startLogProcessor(String identifier) {
LogProcessorFramework logProcessor =
JanusGraphFactory.openTransactionLog(graph);
logProcessor.addLogProcessor(identifier).
setProcessorIdentifier("BatchTxLogger").
setStartTime(Instant.now()).
addProcessor((tx, txId, changeState) -> {
System.out.println("tx--"+tx.toString() + "  txId--"+txId.toString()
+"  changeState--"+changeState.toString());
for (JanusGraphVertex v : changeState.getVertices(Change.ANY)) {
System.out.println(v.label());
}
}).build();
}

But here I am unable to get the sysout. Tried different combinations of startTime (Instance.EPOCH, Instance.now().minusMillis(500) etc.) but did not get the println output on the console (No exception or error in any case).
I also tried removing the identifier which gave the invalid readmarker error. So after checking the class files I also removed the start time to resolve the error. But still no console output :(

Regards,
Ojas


Boxuan Li
 

Hi Ojas,

Your `startLogProcessor` method looks good to me. I suspect that you are not using the transaction returned in step 1 to do the vertex/edge operations. In step 2, you are using `g.addV` which automatically starts a new anonymous transaction. To commit using that transaction, you will do `g.tx().commit()`, and of course, it will not be captured by your log processor. Therefore, you need to make sure you are using the transaction associated with the log processor to do the mutations.

Try replacing `g` with `tx.traversal()` where `tx` is returned in step 1. Then, your code should look like this:

JanusGraphTransaction tx = startJanusGraphTransaction(identifier);
tx.traversal().addV("idVertex").property(id, "uuid").next();
tx.commit();

Hope this helps.

Best,
Boxuan
 


ojas.dubey@...
 

Hi Boxuan,

Thanks. This indeed helped. Initially nothing happened (or at least it appeared that way) so I changed the start time to EPOCH and left the application running for a while and after sometime the callback was executed. 

So I was wondering how the log processor uses the start time value to replay the log and why did it take a long time to replay the logs. Is there a way by which I can reduce the time by setting the correct UTC time to start time (as i dont want to use EPOCH everytime) so that the callback is executed immediately?

Also is there a difference in the values of Instant.now() used by ReadMarker vs the actual local time used by the applicatioon because the ReadMarker initialization logs showed a different time. e.g.

2021-06-30T13:21:32.003+05:30 INFO |InternalEventLogger|||||||o.j.diskstorage.log.kcvs.KCVSLog|Loaded identified ReadMarker start time 2021-06-30T04:00:00Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@4051e47b
2021-06-30T13:21:32.008+05:30 INFO |InternalEventLogger|||||||o.j.diskstorage.log.kcvs.KCVSLog|Loaded identified ReadMarker start time 2021-06-30T04:00:00Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@6a332fb7
2021-06-30T13:21:32.013+05:30 INFO |InternalEventLogger|||||||o.j.diskstorage.log.kcvs.KCVSLog|Loaded identified ReadMarker start time 2021-06-30T04:00:00Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@1f2cf847
2021-06-30T13:21:32.015+05:30 INFO |InternalEventLogger|||||||o.j.diskstorage.log.kcvs.KCVSLog|Loaded identified ReadMarker start time 2021-06-30T04:00:00Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@3cc2c61b

while the application log shows another time

2021-06-30T13:26:15.794+05:30 INFO |InternalEventLogger||c.a.o.s.b.s.i.Test|Started tx standardjanusgraphtx[0x39c4068c] for requestId 5ba073c8-68c2-4356-8097-2e62ef56299a and batchId 9632dceb-7996-4464-91d8-1b157fc8ca00


Regards,
Ojas 


Boxuan Li
 

Hi Ojas,

Ideally, by using Instant.now() to add your log processor, you should be able to see your callback invoked as soon as the transaction completes (if you are using a single in-memory storage backend), or with a minimal delay (depending on the read latency of your storage backend).

The time difference in your log looks a bit weird to me. Can you check if there is a clock drift among your servers?

Best,
Boxuan

「ojas.dubey via lists.lfaidata.foundation <ojas.dubey=amdocs.com@...>」在 2021年6月30日 週三,下午10:12 寫道:

Hi Boxuan,

Thanks. This indeed helped. Initially nothing happened (or at least it appeared that way) so I changed the start time to EPOCH and left the application running for a while and after sometime the callback was executed. 

So I was wondering how the log processor uses the start time value to replay the log and why did it take a long time to replay the logs. Is there a way by which I can reduce the time by setting the correct UTC time to start time (as i dont want to use EPOCH everytime) so that the callback is executed immediately?

Also is there a difference in the values of Instant.now() used by ReadMarker vs the actual local time used by the applicatioon because the ReadMarker initialization logs showed a different time. e.g.

2021-06-30T13:21:32.003+05:30 INFO |InternalEventLogger|||||||o.j.diskstorage.log.kcvs.KCVSLog|Loaded identified ReadMarker start time 2021-06-30T04:00:00Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@4051e47b
2021-06-30T13:21:32.008+05:30 INFO |InternalEventLogger|||||||o.j.diskstorage.log.kcvs.KCVSLog|Loaded identified ReadMarker start time 2021-06-30T04:00:00Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@6a332fb7
2021-06-30T13:21:32.013+05:30 INFO |InternalEventLogger|||||||o.j.diskstorage.log.kcvs.KCVSLog|Loaded identified ReadMarker start time 2021-06-30T04:00:00Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@1f2cf847
2021-06-30T13:21:32.015+05:30 INFO |InternalEventLogger|||||||o.j.diskstorage.log.kcvs.KCVSLog|Loaded identified ReadMarker start time 2021-06-30T04:00:00Z into org.janusgraph.diskstorage.log.kcvs.KCVSLog$MessagePuller@3cc2c61b

while the application log shows another time

2021-06-30T13:26:15.794+05:30 INFO |InternalEventLogger||c.a.o.s.b.s.i.Test|Started tx standardjanusgraphtx[0x39c4068c] for requestId 5ba073c8-68c2-4356-8097-2e62ef56299a and batchId 9632dceb-7996-4464-91d8-1b157fc8ca00


Regards,
Ojas