distributed Atomikos is a piece of software written in Java that, among other things, implements JTA (Java Transaction API) e XA (eXtended Architecture, supports distributed transaction processing).

In general, a transaction is associated to the current thread, so the methods invoked during a request in a JEE server can share the same transaction.

However, an interesting question on StackOverflow raised the possibility of dividing an atomic operation composed by different tasks between threads with a single global transaction.

Well, in order to bypass the default architecture we’ll need invoke Atomikos XA API directly to include the various Data Sources from each thread in the main transaction.

I did a simple example implementing that. The project is available on my GitHub account.

Implementation

Firstly, we have the DataSource and TransactionManager initialization using Atomikos API in class AtomikosDataSource. Here is the relevant excerpt:

// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;

// initialize resources
public static void init() {
    utm = new UserTransactionManager();
    try {
        utm.init();
        adsb = new AtomikosDataSourceBean();
        adsb.setMaxPoolSize(20);
        adsb.setUniqueResourceName("postgres");
        adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
        Properties p = new Properties();
        p.setProperty("user", "postgres");
        p.setProperty("password", "0");
        p.setProperty("serverName", "localhost");
        p.setProperty("portNumber", "5432");
        p.setProperty("databaseName", "postgres");
        adsb.setXaProperties(p);
    } catch (SystemException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

After that, I implemented the thread named Processamento (Processing) that receives the main transaction instance (Transaction). The interface Callable defines this thread as some kind of task that returns an Integer value. Here is the code:

private static class Processamento implements Callable<Integer> {

    private int id;
    private boolean falhar; //fail or not?
    private Transaction transaction;

    public Processamento(int id, boolean falhar, Transaction transaction) {
        this.falhar = falhar;
        this.transaction = transaction;
        this.id = id;
    }

    public Integer call() throws Exception {
        if (falhar) {
            //fail unexpectedly
            throw new RuntimeException("Falhou inesperadamente!"); 
        }

        //enlist xa connection
        XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
        synchronized (transaction) {
            transaction.enlistResource(xac.getXAResource());
        }

        //normal execution, update row with OK
        Connection c = xac.getConnection();
        Statement s = c.createStatement();
        s.executeUpdate("update teste set processado = 'ok' where id = " + id);
        s.close();
        c.close();

        //delist xa connection
        synchronized (transaction) {
            transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
        }
        return id;
    }

}

Notice, instead of using JTA, I’m using Atomikos XA API directly.

The call to AtomikosDataSource.getDS().getXaDataSource().getXAConnection() gets a XA connection, which is added to the main transaction with the command transaction.enlistResource(xac.getXAResource()). This operation is called enlistment. At the end of the process, there is a delistment.

I synchronized a few commands because I got random NullPointerException errors during tests. I didn’t investigate if it’s a bug or by design, that is, Transaction object is not synchronized by default.

Finally, I implemented a method to create five instances of the thread above and then get the results. If one of them fail, the global transaction is rolled back. This is the code:

public static int processar(boolean falhar) {
    int ok = 0;
    Transaction transaction = null;
    try {

        //start transaction
        AtomikosDataSource.getTM().begin();
        transaction = AtomikosDataSource.getTM().getTransaction();

        //create thread pool
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();

        //create 5 threads, passing the main transaction as argument
        for (int i = 0; i < 5; i++) {
            //if falhar == true, fail the fifth thread
            processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
        }

        //execute threads and wait
        List<Future<Integer>> futures = executor.invokeAll(processos);

        //count the result; get() will fail if thread threw an exception
        Throwable ex = null;
        for (Future<Integer> future : futures) {
            try {
                int threadId = future.get();
                System.out.println("Thread " + threadId + " sucesso!");
                ok++; 
            } catch (Throwable e) {
                ex = e;
            }
        }

        if (ex != null) {
            throw ex;
        }

        //finish transaction normally
        transaction.commit();

    } catch (Throwable e) {

        e.printStackTrace();
        try {
            //try to rollback
            if (transaction != null) {
                AtomikosDataSource.getTM().rollback();
            }
        } catch (IllegalStateException e1) {
            e1.printStackTrace();
        } catch (SecurityException e1) {
            e1.printStackTrace();
        } catch (SystemException e1) {
            e1.printStackTrace();
        }

    }
    return ok;
}

Notice that some methods have a parameter named falha (fail). It’ll be used to create a scenario where one of the threads will generate an error and force a rollback of all changes made by other threads.

The processar() (process) method returns the number of “successes”, i.e, threads executed without errors, independently if the transaction was committed or rolled back. It’ll be also used in tests.

Tests

I did tests for both success and error scenarios in order to validate the solution.

In the success scenario, each one of the five threads updates a row from TESTE table with the value ok, then the transaction is committed.

In the error scenario, the last thread always throw an exception, forcing the rollback of all operations. Notice that the last thread created it’s not necessarily the last one executed.

The test code is very simple. Look:

public class AtomikosTest {

    @BeforeClass
    public static void init() {
        //create atomikos transaction manager and data source
        AtomikosDataSource.init();

    }
    @Before
    public void reset() {
        //recreate data of TEST table
        AtomikosDAO.resetTable();
    }

    @AfterClass
    public static void shutdown() {
        //close atomikos resources
        AtomikosDataSource.shutdown();
    }

    @Test
    public void sucesso() {
        //process 5 rows in 5 threads
        int okParcial = AtomikosDAO.processar(false);
        //should return 5 successes
        Assert.assertEquals(5, okParcial);
        //confirms in table, count 5 ok's
        Assert.assertEquals(5, AtomikosDAO.countOk());
    }

    @Test
    public void fail() {
        //process 5 rows in 5 threads, one should fail
        int okParcial = AtomikosDAO.processar(true);
        //should return 4 successes
        Assert.assertEquals(4, okParcial);
        //confirms in table, count zero ok's due to rollback
        Assert.assertEquals(0, AtomikosDAO.countOk());
    }

}

Notes about configuration

In this project, I chose PostgreSQL to be the resource to be modified in the distributed transaction.

It was necessary enable the configuration called max_prepared_transactions in the configuration file postgresql.conf with a value greater than the number of participants in the distributed transaction. Otherwise, PostgreSQL won’t be able of participating in distributed transactions.

Final thoughts

Even though there’s a growing interest about NoSQL and even NewSQL, ACID transactions, as available on traditional RDBMS, are very important in many situations. It’s so true, that there are tutorials on how to simulate a transaction with two-phase commit in non-transactional databases like MongoDB.

Furthermore, it’s important to say that each participant of a distributed transaction must support XA protocol. Unfortunately, some database drivers and other data source implementations aren’t. So do your homework and research before start coding.


Este artigo foi baseado na minha resposta no StackOverflow em Português!