Atomikos is a piece of software written in Java that, among other things, implements JTA (Java Transaction API) e XA (eXtended Architecture, supports distributed transaction processing).
In general, a transaction is associated to the current thread, so the methods invoked during a request in a JEE server can share the same transaction.
However, an interesting question on StackOverflow raised the possibility of dividing an atomic operation composed by different tasks between threads with a single global transaction.
Well, in order to bypass the default architecture we’ll need invoke Atomikos XA API directly to include the various Data Sources from each thread in the main transaction.
I did a simple example implementing that. The project is available on my GitHub account.
Implementation
Firstly, we have the DataSource
and TransactionManager
initialization using Atomikos API in class AtomikosDataSource
. Here is the relevant excerpt:
// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;
// initialize resources
public static void init() {
utm = new UserTransactionManager();
try {
utm.init();
adsb = new AtomikosDataSourceBean();
adsb.setMaxPoolSize(20);
adsb.setUniqueResourceName("postgres");
adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
Properties p = new Properties();
p.setProperty("user", "postgres");
p.setProperty("password", "0");
p.setProperty("serverName", "localhost");
p.setProperty("portNumber", "5432");
p.setProperty("databaseName", "postgres");
adsb.setXaProperties(p);
} catch (SystemException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
After that, I implemented the thread named Processamento
(Processing) that receives the main transaction instance (Transaction
). The interface Callable
defines this thread as some kind of task that returns an Integer
value. Here is the code:
private static class Processamento implements Callable<Integer> {
private int id;
private boolean falhar; //fail or not?
private Transaction transaction;
public Processamento(int id, boolean falhar, Transaction transaction) {
this.falhar = falhar;
this.transaction = transaction;
this.id = id;
}
public Integer call() throws Exception {
if (falhar) {
//fail unexpectedly
throw new RuntimeException("Falhou inesperadamente!");
}
//enlist xa connection
XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
synchronized (transaction) {
transaction.enlistResource(xac.getXAResource());
}
//normal execution, update row with OK
Connection c = xac.getConnection();
Statement s = c.createStatement();
s.executeUpdate("update teste set processado = 'ok' where id = " + id);
s.close();
c.close();
//delist xa connection
synchronized (transaction) {
transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
}
return id;
}
}
Notice, instead of using JTA, I’m using Atomikos XA API directly.
The call to AtomikosDataSource.getDS().getXaDataSource().getXAConnection()
gets a XA connection, which is added to the main transaction with the command transaction.enlistResource(xac.getXAResource())
. This operation is called enlistment. At the end of the process, there is a delistment.
I synchronized a few commands because I got random NullPointerException
errors during tests. I didn’t investigate if it’s a bug or by design, that is, Transaction
object is not synchronized by default.
Finally, I implemented a method to create five instances of the thread above and then get the results. If one of them fail, the global transaction is rolled back. This is the code:
public static int processar(boolean falhar) {
int ok = 0;
Transaction transaction = null;
try {
//start transaction
AtomikosDataSource.getTM().begin();
transaction = AtomikosDataSource.getTM().getTransaction();
//create thread pool
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();
//create 5 threads, passing the main transaction as argument
for (int i = 0; i < 5; i++) {
//if falhar == true, fail the fifth thread
processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
}
//execute threads and wait
List<Future<Integer>> futures = executor.invokeAll(processos);
//count the result; get() will fail if thread threw an exception
Throwable ex = null;
for (Future<Integer> future : futures) {
try {
int threadId = future.get();
System.out.println("Thread " + threadId + " sucesso!");
ok++;
} catch (Throwable e) {
ex = e;
}
}
if (ex != null) {
throw ex;
}
//finish transaction normally
transaction.commit();
} catch (Throwable e) {
e.printStackTrace();
try {
//try to rollback
if (transaction != null) {
AtomikosDataSource.getTM().rollback();
}
} catch (IllegalStateException e1) {
e1.printStackTrace();
} catch (SecurityException e1) {
e1.printStackTrace();
} catch (SystemException e1) {
e1.printStackTrace();
}
}
return ok;
}
Notice that some methods have a parameter named falha
(fail). It’ll be used to create a scenario where one of the threads will generate an error and force a rollback of all changes made by other threads.
The processar()
(process) method returns the number of “successes”, i.e, threads executed without errors, independently if the transaction was committed or rolled back. It’ll be also used in tests.
Tests
I did tests for both success and error scenarios in order to validate the solution.
In the success scenario, each one of the five threads updates a row from TESTE
table with the value ok
, then the transaction is committed.
In the error scenario, the last thread always throw an exception, forcing the rollback of all operations. Notice that the last thread created it’s not necessarily the last one executed.
The test code is very simple. Look:
public class AtomikosTest {
@BeforeClass
public static void init() {
//create atomikos transaction manager and data source
AtomikosDataSource.init();
}
@Before
public void reset() {
//recreate data of TEST table
AtomikosDAO.resetTable();
}
@AfterClass
public static void shutdown() {
//close atomikos resources
AtomikosDataSource.shutdown();
}
@Test
public void sucesso() {
//process 5 rows in 5 threads
int okParcial = AtomikosDAO.processar(false);
//should return 5 successes
Assert.assertEquals(5, okParcial);
//confirms in table, count 5 ok's
Assert.assertEquals(5, AtomikosDAO.countOk());
}
@Test
public void fail() {
//process 5 rows in 5 threads, one should fail
int okParcial = AtomikosDAO.processar(true);
//should return 4 successes
Assert.assertEquals(4, okParcial);
//confirms in table, count zero ok's due to rollback
Assert.assertEquals(0, AtomikosDAO.countOk());
}
}
Notes about configuration
In this project, I chose PostgreSQL to be the resource to be modified in the distributed transaction.
It was necessary enable the configuration called max_prepared_transactions
in the configuration file postgresql.conf
with a value greater than the number of participants in the distributed transaction. Otherwise, PostgreSQL won’t be able of participating in distributed transactions.
Final thoughts
Even though there’s a growing interest about NoSQL and even NewSQL, ACID transactions, as available on traditional RDBMS, are very important in many situations. It’s so true, that there are tutorials on how to simulate a transaction with two-phase commit in non-transactional databases like MongoDB.
Furthermore, it’s important to say that each participant of a distributed transaction must support XA protocol. Unfortunately, some database drivers and other data source implementations aren’t. So do your homework and research before start coding.
Este artigo foi baseado na minha resposta no StackOverflow em Português!