distributed Atomikos é um software Java que, entre outras coisas, implementa os padrões JTA (Java Transaction API) e XA (eXtended Architecture, que suporta processamento de transações distribuídas).

Em geral, cada transação é associada à thread atual, de modo que os diversos métodos que atendem uma solicitação num servidor JEE podem compartilhá-la.

Entretanto, uma questão interessante do StackOverflow levantou a possibilidade de uma aplicação dividir uma operação atômica em tarefas delegadas a várias threads, porém compartilhando uma única transação global.

Bem, para fazer esse “desvio” da arquitetura original, a solução foi usar diretamente a API XA do Atomikos para incluir os DataSources das diferentes threads na transação principal.

Fiz um exemplo simples que implementa isso. O projeto está disponível no meu GitHub.

Implementação

Antes de mais nada, temos a inicialização do DataSource e do TransactionManager usando a API do Atomikos realizado na class AtomikosDataSource. Eis o trecho relevante:

// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;

// initialize resources
public static void init() {
    utm = new UserTransactionManager();
    try {
        utm.init();
        adsb = new AtomikosDataSourceBean();
        adsb.setMaxPoolSize(20);
        adsb.setUniqueResourceName("postgres");
        adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
        Properties p = new Properties();
        p.setProperty("user", "postgres");
        p.setProperty("password", "0");
        p.setProperty("serverName", "localhost");
        p.setProperty("portNumber", "5432");
        p.setProperty("databaseName", "postgres");
        adsb.setXaProperties(p);
    } catch (SystemException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

Depois, implementei uma thread chamada Processamento que recebe a instância da transação (Transaction) principal. A interface Callable define que a thread é um tipo de tarefa que retorna um valor Integer. Eis o código:

private static class Processamento implements Callable<Integer> {

    private int id;
    private boolean falhar;
    private Transaction transaction;

    public Processamento(int id, boolean falhar, Transaction transaction) {
        this.falhar = falhar;
        this.transaction = transaction;
        this.id = id;
    }

    public Integer call() throws Exception {
        if (falhar) {
            throw new RuntimeException("Falhou inesperadamente!");
        }

        //enlist xa connection
        XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
        synchronized (transaction) {
            transaction.enlistResource(xac.getXAResource());
        }

        //normal execution, update row with OK
        Connection c = xac.getConnection();
        Statement s = c.createStatement();
        s.executeUpdate("update teste set processado = 'ok' where id = " + id);
        s.close();
        c.close();

        //delist xa connection
        synchronized (transaction) {
            transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
        }
        return id;
    }

}

Note que, ao invés de usar o JTA, estou usando diretamente a API do XA implementada pelo Atomikos.

A chamada AtomikosDataSource.getDS().getXaDataSource().getXAConnection() recupera uma conexão do XA, a qual é adicionada à transação principal com o comando transaction.enlistResource(xac.getXAResource()). Esta operação é chamada de alistamento (enlistment). Ao final do processamento da thread, o alistamento é desfeito.

Sincronizei alguns trechos pois obtive aleatoriamente alguns NullPointerException nos testes. Não cheguei a averiguar se é um bug do Atomikos ou se é by design, isto é, o objeto Transaction não é thread-safe.

Finalmente, implementei um método que inicia cinco instâncias da thread de processamento listada acima e posteriormente colhe os resultados. Se uma delas falhar, a transação global é desfeita (rollback). Veja o código abaixo:

public static int processar(boolean falhar) {
    int ok = 0;
    Transaction transaction = null;
    try {

        //start transaction
        AtomikosDataSource.getTM().begin();
        transaction = AtomikosDataSource.getTM().getTransaction();

        //create thread pool
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();

        //create 5 threads, passing the main transaction as argument
        for (int i = 0; i < 5; i++) {
            processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
        }

        //execute threads and wait
        List<Future<Integer>> futures = executor.invokeAll(processos);

        //count the result; get() will fail if thread threw an exception
        Throwable ex = null;
        for (Future<Integer> future : futures) {
            try {
                int threadId = future.get();
                System.out.println("Thread " + threadId + " sucesso!");
                ok++; 
            } catch (Throwable e) {
                ex = e;
            }
        }

        if (ex != null) {
            throw ex;
        }

        //finish transaction normally
        transaction.commit();

    } catch (Throwable e) {

        e.printStackTrace();
        try {
            //try to rollback
            if (transaction != null) {
                AtomikosDataSource.getTM().rollback();
            }
        } catch (IllegalStateException e1) {
            e1.printStackTrace();
        } catch (SecurityException e1) {
            e1.printStackTrace();
        } catch (SystemException e1) {
            e1.printStackTrace();
        }

    }
    return ok;
}

Note que vários métodos possuem um parâmetro chamado falha. Ele será usado para gerar um cenário onde uma das threads irá gerar um erro e forçar o rollback das alterações das demais threads.

O método processar() retorna a quantidade de “sucessos”, isto é, threads que executaram sem erro, independentemente se a transação foi efetivada ou desfeita. Isso também será usado nos testes.

Testes

Fiz testes tanto de um cenário de sucesso quanto de falha para validar a solução.

No cenário de sucesso, cada uma das cinco threads atualiza uma linha da tabela TESTE com o valor ok e no final o método principal faz o commit da transação.

No cenário de falha, a última thread sempre lança uma exceção, forçando o rollback das demais. Note que a última thread criada não é necessariamente a última a ser executada.

O código de teste ficou muito simples. Veja:

public class AtomikosTest {

    @BeforeClass
    public static void init() {
        //create atomikos transaction manager and data source
        AtomikosDataSource.init();

    }
    @Before
    public void reset() {
        //recreate data of TEST table
        AtomikosDAO.resetTable();
    }

    @AfterClass
    public static void shutdown() {
        //close atomikos resources
        AtomikosDataSource.shutdown();
    }

    @Test
    public void sucesso() {
        //process 5 rows in 5 threads
        int okParcial = AtomikosDAO.processar(false);
        //should return 5 successes
        Assert.assertEquals(5, okParcial);
        //confirms in table, count 5 ok's
        Assert.assertEquals(5, AtomikosDAO.countOk());
    }

    @Test
    public void fail() {
        //process 5 rows in 5 threads, one should fail
        int okParcial = AtomikosDAO.processar(true);
        //should return 4 successes
        Assert.assertEquals(4, okParcial);
        //confirms in table, count zero ok's due to rollback
        Assert.assertEquals(0, AtomikosDAO.countOk());
    }

}

Notas sobre a configuração

Neste projeto, usei o servidor de banco de dados PostgreSQL como o recurso a participar da transação distribuída.

Foi necessário habilitar a configuração max_prepared_transactions no arquivo de configuração postgresql.conf com um valor maior que o número de participantes na transação distribuída. Sem isso, o PostgreSQL não será capaz de participar de transações desta natureza.

Considerações finais

Embora haja um crescente interesse sobre NoSQL e até NewSQL, transações ACID, como disponíveis nos SGBDRs tradicionais, são importantes em muitos cenários. Até por isso existem tutoriais sobre como simular uma transação com o conceito de two-phase commit em bancos de dados não transacionais como MongoDB.

Além disso, é importante ressaltar que cada participante de uma transação distribuída deve ser compatível com o protocolo XA. Infelizmente, alguns drivers de bancos de dados ou outras fontes de dados podem não ser compatíveis. Então, faça sua lição de casa e pesquise antes de sair implementando.


Este artigo foi baseado na minha resposta no StackOverflow em Português!