Atomikos é um software Java que, entre outras coisas, implementa os padrões JTA (Java Transaction API) e XA (eXtended Architecture, que suporta processamento de transações distribuídas).
Em geral, cada transação é associada à thread atual, de modo que os diversos métodos que atendem uma solicitação num servidor JEE podem compartilhá-la.
Entretanto, uma questão interessante do StackOverflow levantou a possibilidade de uma aplicação dividir uma operação atômica em tarefas delegadas a várias threads, porém compartilhando uma única transação global.
Bem, para fazer esse “desvio” da arquitetura original, a solução foi usar diretamente a API XA do Atomikos para incluir os DataSources das diferentes threads na transação principal.
Fiz um exemplo simples que implementa isso. O projeto está disponível no meu GitHub.
Implementação
Antes de mais nada, temos a inicialização do DataSource
e do TransactionManager
usando a API do Atomikos realizado na class AtomikosDataSource
. Eis o trecho relevante:
// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;
// initialize resources
public static void init() {
utm = new UserTransactionManager();
try {
utm.init();
adsb = new AtomikosDataSourceBean();
adsb.setMaxPoolSize(20);
adsb.setUniqueResourceName("postgres");
adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
Properties p = new Properties();
p.setProperty("user", "postgres");
p.setProperty("password", "0");
p.setProperty("serverName", "localhost");
p.setProperty("portNumber", "5432");
p.setProperty("databaseName", "postgres");
adsb.setXaProperties(p);
} catch (SystemException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
Depois, implementei uma thread chamada Processamento
que recebe a instância da transação (Transaction
) principal. A interface Callable
define que a thread é um tipo de tarefa que retorna um valor Integer
. Eis o código:
private static class Processamento implements Callable<Integer> {
private int id;
private boolean falhar;
private Transaction transaction;
public Processamento(int id, boolean falhar, Transaction transaction) {
this.falhar = falhar;
this.transaction = transaction;
this.id = id;
}
public Integer call() throws Exception {
if (falhar) {
throw new RuntimeException("Falhou inesperadamente!");
}
//enlist xa connection
XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
synchronized (transaction) {
transaction.enlistResource(xac.getXAResource());
}
//normal execution, update row with OK
Connection c = xac.getConnection();
Statement s = c.createStatement();
s.executeUpdate("update teste set processado = 'ok' where id = " + id);
s.close();
c.close();
//delist xa connection
synchronized (transaction) {
transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
}
return id;
}
}
Note que, ao invés de usar o JTA, estou usando diretamente a API do XA implementada pelo Atomikos.
A chamada AtomikosDataSource.getDS().getXaDataSource().getXAConnection()
recupera uma conexão do XA, a qual é adicionada à transação principal com o comando transaction.enlistResource(xac.getXAResource())
. Esta operação é chamada de alistamento (enlistment). Ao final do processamento da thread, o alistamento é desfeito.
Sincronizei alguns trechos pois obtive aleatoriamente alguns NullPointerException
nos testes. Não cheguei a averiguar se é um bug do Atomikos ou se é by design, isto é, o objeto Transaction
não é thread-safe.
Finalmente, implementei um método que inicia cinco instâncias da thread de processamento listada acima e posteriormente colhe os resultados. Se uma delas falhar, a transação global é desfeita (rollback). Veja o código abaixo:
public static int processar(boolean falhar) {
int ok = 0;
Transaction transaction = null;
try {
//start transaction
AtomikosDataSource.getTM().begin();
transaction = AtomikosDataSource.getTM().getTransaction();
//create thread pool
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();
//create 5 threads, passing the main transaction as argument
for (int i = 0; i < 5; i++) {
processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
}
//execute threads and wait
List<Future<Integer>> futures = executor.invokeAll(processos);
//count the result; get() will fail if thread threw an exception
Throwable ex = null;
for (Future<Integer> future : futures) {
try {
int threadId = future.get();
System.out.println("Thread " + threadId + " sucesso!");
ok++;
} catch (Throwable e) {
ex = e;
}
}
if (ex != null) {
throw ex;
}
//finish transaction normally
transaction.commit();
} catch (Throwable e) {
e.printStackTrace();
try {
//try to rollback
if (transaction != null) {
AtomikosDataSource.getTM().rollback();
}
} catch (IllegalStateException e1) {
e1.printStackTrace();
} catch (SecurityException e1) {
e1.printStackTrace();
} catch (SystemException e1) {
e1.printStackTrace();
}
}
return ok;
}
Note que vários métodos possuem um parâmetro chamado falha
. Ele será usado para gerar um cenário onde uma das threads irá gerar um erro e forçar o rollback das alterações das demais threads.
O método processar()
retorna a quantidade de “sucessos”, isto é, threads que executaram sem erro, independentemente se a transação foi efetivada ou desfeita. Isso também será usado nos testes.
Testes
Fiz testes tanto de um cenário de sucesso quanto de falha para validar a solução.
No cenário de sucesso, cada uma das cinco threads atualiza uma linha da tabela TESTE
com o valor ok
e no final o método principal faz o commit da transação.
No cenário de falha, a última thread sempre lança uma exceção, forçando o rollback das demais. Note que a última thread criada não é necessariamente a última a ser executada.
O código de teste ficou muito simples. Veja:
public class AtomikosTest {
@BeforeClass
public static void init() {
//create atomikos transaction manager and data source
AtomikosDataSource.init();
}
@Before
public void reset() {
//recreate data of TEST table
AtomikosDAO.resetTable();
}
@AfterClass
public static void shutdown() {
//close atomikos resources
AtomikosDataSource.shutdown();
}
@Test
public void sucesso() {
//process 5 rows in 5 threads
int okParcial = AtomikosDAO.processar(false);
//should return 5 successes
Assert.assertEquals(5, okParcial);
//confirms in table, count 5 ok's
Assert.assertEquals(5, AtomikosDAO.countOk());
}
@Test
public void fail() {
//process 5 rows in 5 threads, one should fail
int okParcial = AtomikosDAO.processar(true);
//should return 4 successes
Assert.assertEquals(4, okParcial);
//confirms in table, count zero ok's due to rollback
Assert.assertEquals(0, AtomikosDAO.countOk());
}
}
Notas sobre a configuração
Neste projeto, usei o servidor de banco de dados PostgreSQL como o recurso a participar da transação distribuída.
Foi necessário habilitar a configuração max_prepared_transactions
no arquivo de configuração postgresql.conf
com um valor maior que o número de participantes na transação distribuída. Sem isso, o PostgreSQL não será capaz de participar de transações desta natureza.
Considerações finais
Embora haja um crescente interesse sobre NoSQL e até NewSQL, transações ACID, como disponíveis nos SGBDRs tradicionais, são importantes em muitos cenários. Até por isso existem tutoriais sobre como simular uma transação com o conceito de two-phase commit em bancos de dados não transacionais como MongoDB.
Além disso, é importante ressaltar que cada participante de uma transação distribuída deve ser compatível com o protocolo XA. Infelizmente, alguns drivers de bancos de dados ou outras fontes de dados podem não ser compatíveis. Então, faça sua lição de casa e pesquise antes de sair implementando.
Este artigo foi baseado na minha resposta no StackOverflow em Português!