Tag: transação

Transações distribuídas e processamento paralelo com Atomikos

distributed Atomikos é um software Java que, entre outras coisas, implementa os padrões JTA (Java Transaction API) e XA (eXtended Architecture, que suporta processamento de transações distribuídas).

Em geral, cada transação é associada à thread atual, de modo que os diversos métodos que atendem uma solicitação num servidor JEE podem compartilhá-la.

Entretanto, uma questão interessante do StackOverflow levantou a possibilidade de uma aplicação dividir uma operação atômica em tarefas delegadas a várias threads, porém compartilhando uma única transação global.

Bem, para fazer esse “desvio” da arquitetura original, a solução foi usar diretamente a API XA do Atomikos para incluir os DataSources das diferentes threads na transação principal.

Fiz um exemplo simples que implementa isso. O projeto está disponível no meu GitHub.

Implementação

Antes de mais nada, temos a inicialização do DataSource e do TransactionManager usando a API do Atomikos realizado na class AtomikosDataSource. Eis o trecho relevante:

// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;

// initialize resources
public static void init() {
    utm = new UserTransactionManager();
    try {
        utm.init();
        adsb = new AtomikosDataSourceBean();
        adsb.setMaxPoolSize(20);
        adsb.setUniqueResourceName("postgres");
        adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
        Properties p = new Properties();
        p.setProperty("user", "postgres");
        p.setProperty("password", "0");
        p.setProperty("serverName", "localhost");
        p.setProperty("portNumber", "5432");
        p.setProperty("databaseName", "postgres");
        adsb.setXaProperties(p);
    } catch (SystemException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

Depois, implementei uma thread chamada Processamento que recebe a instância da transação (Transaction) principal. A interface Callable define que a thread é um tipo de tarefa que retorna um valor Integer. Eis o código:

private static class Processamento implements Callable<Integer> {

    private int id;
    private boolean falhar;
    private Transaction transaction;

    public Processamento(int id, boolean falhar, Transaction transaction) {
        this.falhar = falhar;
        this.transaction = transaction;
        this.id = id;
    }

    public Integer call() throws Exception {
        if (falhar) {
            throw new RuntimeException("Falhou inesperadamente!");
        }

        //enlist xa connection
        XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
        synchronized (transaction) {
            transaction.enlistResource(xac.getXAResource());
        }

        //normal execution, update row with OK
        Connection c = xac.getConnection();
        Statement s = c.createStatement();
        s.executeUpdate("update teste set processado = 'ok' where id = " + id);
        s.close();
        c.close();

        //delist xa connection
        synchronized (transaction) {
            transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
        }
        return id;
    }

}

Note que, ao invés de usar o JTA, estou usando diretamente a API do XA implementada pelo Atomikos.

A chamada AtomikosDataSource.getDS().getXaDataSource().getXAConnection() recupera uma conexão do XA, a qual é adicionada à transação principal com o comando transaction.enlistResource(xac.getXAResource()). Esta operação é chamada de alistamento (enlistment). Ao final do processamento da thread, o alistamento é desfeito.

Sincronizei alguns trechos pois obtive aleatoriamente alguns NullPointerException nos testes. Não cheguei a averiguar se é um bug do Atomikos ou se é by design, isto é, o objeto Transaction não é thread-safe.

Finalmente, implementei um método que inicia cinco instâncias da thread de processamento listada acima e posteriormente colhe os resultados. Se uma delas falhar, a transação global é desfeita (rollback). Veja o código abaixo:

public static int processar(boolean falhar) {
    int ok = 0;
    Transaction transaction = null;
    try {

        //start transaction
        AtomikosDataSource.getTM().begin();
        transaction = AtomikosDataSource.getTM().getTransaction();

        //create thread pool
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();

        //create 5 threads, passing the main transaction as argument
        for (int i = 0; i < 5; i++) {
            processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
        }

        //execute threads and wait
        List<Future<Integer>> futures = executor.invokeAll(processos);

        //count the result; get() will fail if thread threw an exception
        Throwable ex = null;
        for (Future<Integer> future : futures) {
            try {
                int threadId = future.get();
                System.out.println("Thread " + threadId + " sucesso!");
                ok++; 
            } catch (Throwable e) {
                ex = e;
            }
        }

        if (ex != null) {
            throw ex;
        }

        //finish transaction normally
        transaction.commit();

    } catch (Throwable e) {

        e.printStackTrace();
        try {
            //try to rollback
            if (transaction != null) {
                AtomikosDataSource.getTM().rollback();
            }
        } catch (IllegalStateException e1) {
            e1.printStackTrace();
        } catch (SecurityException e1) {
            e1.printStackTrace();
        } catch (SystemException e1) {
            e1.printStackTrace();
        }

    }
    return ok;
}

Note que vários métodos possuem um parâmetro chamado falha. Ele será usado para gerar um cenário onde uma das threads irá gerar um erro e forçar o rollback das alterações das demais threads.

O método processar() retorna a quantidade de “sucessos”, isto é, threads que executaram sem erro, independentemente se a transação foi efetivada ou desfeita. Isso também será usado nos testes.

Testes

Fiz testes tanto de um cenário de sucesso quanto de falha para validar a solução.

No cenário de sucesso, cada uma das cinco threads atualiza uma linha da tabela TESTE com o valor ok e no final o método principal faz o commit da transação.

No cenário de falha, a última thread sempre lança uma exceção, forçando o rollback das demais. Note que a última thread criada não é necessariamente a última a ser executada.

O código de teste ficou muito simples. Veja:

public class AtomikosTest {

    @BeforeClass
    public static void init() {
        //create atomikos transaction manager and data source
        AtomikosDataSource.init();

    }
    @Before
    public void reset() {
        //recreate data of TEST table
        AtomikosDAO.resetTable();
    }

    @AfterClass
    public static void shutdown() {
        //close atomikos resources
        AtomikosDataSource.shutdown();
    }

    @Test
    public void sucesso() {
        //process 5 rows in 5 threads
        int okParcial = AtomikosDAO.processar(false);
        //should return 5 successes
        Assert.assertEquals(5, okParcial);
        //confirms in table, count 5 ok's
        Assert.assertEquals(5, AtomikosDAO.countOk());
    }

    @Test
    public void fail() {
        //process 5 rows in 5 threads, one should fail
        int okParcial = AtomikosDAO.processar(true);
        //should return 4 successes
        Assert.assertEquals(4, okParcial);
        //confirms in table, count zero ok's due to rollback
        Assert.assertEquals(0, AtomikosDAO.countOk());
    }

}

Notas sobre a configuração

Neste projeto, usei o servidor de banco de dados PostgreSQL como o recurso a participar da transação distribuída.

Foi necessário habilitar a configuração max_prepared_transactions no arquivo de configuração postgresql.conf com um valor maior que o número de participantes na transação distribuída. Sem isso, o PostgreSQL não será capaz de participar de transações desta natureza.

Considerações finais

Embora haja um crescente interesse sobre NoSQL e até NewSQL, transações ACID, como disponíveis nos SGBDRs tradicionais, são importantes em muitos cenários. Até por isso existem tutoriais sobre como simular uma transação com o conceito de two-phase commit em bancos de dados não transacionais como MongoDB.

Além disso, é importante ressaltar que cada participante de uma transação distribuída deve ser compatível com o protocolo XA. Infelizmente, alguns drivers de bancos de dados ou outras fontes de dados podem não ser compatíveis. Então, faça sua lição de casa e pesquise antes de sair implementando.


Este artigo foi baseado na minha resposta no StackOverflow em Português!

O que são e como funcionam transações em SQL?

Bancos de dados relacionais que operam no padrão SQL em geral são transacionais, isto é, eles permitem a execução de uma sequência de operações como um bloco indivisível de forma a garantir a integridade dos dados em um ambiente com acesso concorrente.

O Problema

Imagine três operações sequenciais que afetam a base de dados e não usamos uma transação para controlá-las. Vamos usar como exemplo um caso de uso comum de um e-commerce:

  1. Verificar se possui o produto em estoque
  2. Inserir um novo registro da compra
  3. Debitar o estoque

Agora vamos supor que dois clientes estão tentando finalizar suas compras neste e-commerce fictício. O servidor recebe duas requisições quase simultaneamente e começa a processar os pedidos na sequência apresentada acima. Os dois pedidos estão sendo processados paralelamente em threads diferentes. Tanto o cliente A quanto o cliente B selecionaram um produto com apenas uma unidade em estoque.

Podemos acabar com a seguinte linha de execução:

  1. Thread A verifica o estoque (passo #1) e insere o registro da compra (passo #2)
  2. thread A é bloqueada e B passa a ser executada
  3. Thread B verifica o estoque (passo #1), o qual ainda não foi debitado, e insere o registro da compra (passo #2)
  4. Thread B atualiza o estoque (passo #3), que agora fica zerado
  5. Thread B é bloqueada e A passa a ser executada
  6. Thread A atualiza o estoque (passo #3), que agora fica negativo!

Apesar de verificarmos o estoque a ordem de execução dos diferentes processos é imprevisível, então neste cenário concorrente ela não traz garantia do valor no passo seguinte.

A Solução

Bancos de dados transacionais usam o conceito ACID:

  • Atomicidade: uma transação é uma sequência de operações indivisível, ou é executado como um todo, ou tudo é desfeito.
  • Consistência: ao final da transação, o estado dos dados deve ser consistente.
  • Isolamento: embora alguns sistemas permitam quebrar o isolamento, em geral, uma transação em andamento não pode ser acessada por outras transações de modo a evitar leitura de um estado inconsistente, uma “sujeira”.
  • Durabilidade: em caso de sucesso (commit) a persistência dos dados deve ser garantida

Para garantir esses conceitos, em geral, os bancos de dados usam bloqueios quando ocorrem acessos simultâneos à mesma estrutura de dados. Ou seja, se alguém já está mexendo nos dados, os demais tem que aguardar sua vez numa fila até ele acabar.

Na prática

Ao usar bancos de dados transacionais, nós podemos usufruir deste controle de gerenciamento por parte dos SGBDRs (Sistemas Gerenciadores de Bancos de Dados Relacionais).

Incluindo o conceito de transação ACID no exemplo anterior, vamos ver como fica a execução:

  1. Thread A inicia uma transação, verifica o estoque (passo #1) e insere o registro da compra (passo #2)
  2. Thread A é bloqueada e B passa a ser executada
  3. Thread B inicia uma transação, mas ao tentar verificar o estoque ela é bloqueada porque a transação de A ainda não acabou
  4. Thread A atualiza o estoque, que agora fica zerado, e faz commit na transação.
  5. Thread B é desbloqueada e passa a ser executada
  6. Thread B conclui a verificação do estoque (passo #1) e retorna um erro pois não encontra o produto disponível no estoque
  7. Thread B executa um rollback para desfazer outras alterações que tenha efetuado, por exemplo, se houve outro produto debitado na mesma transação.

O resultado final é como se somente a thread A tivesse executado e B nunca existisse.

Nem tudo é um mar de rosas

Existem alguns problemas inerentes a transações ACID, sendo o desempenho o maior deles.

Embora seja importante garantir a integridade dos dados, para muitos sistemas onde a disponibilidade é o fator mais crítico um modelo que bloqueia acessos simultâneos torna-se inviável. Este é um dos principais fatores para o surgimento e a adoção de diversos sistemas de bancos de dados não transacionais e NoSQL.

O importante é entender que o uso de transações tem um custo e em algumas ocasiões este pode ser alto demais. Uma das representações mais comuns do trade-off de persistência de dados é a seguinte (retirada deste artigo):

tradeoff-database

O gráfico demonstra que consistência, disponibilidade e particionamento (escalar o banco de dados em diversos nós) são recursos que afetam uns aos outros. Você simplesmente não pode ter o melhor dos três e isso foi provado no teorema de CAP.

Bancos de dados relacionais geralmente sacrificam o particionamento em prol da consistência e da disponibilidade, enquanto alguns sistemas NoSQL sacrificam a consistência dos dados.

Note que o termo “geralmente” ou “em geral” foi cuidadosamente utilizado no artigo porque os diferentes sistemas de bancos de dados permitem vários níveis de configuração de isolamento de transações e acesso simultâneo, possibilitando um ajuste fino do desempenho de cada funcionalidade

Este artigo foi baseado na minha resposta no StackOverflow em Português!

Creative Commons O blog State of the Art de Luiz Ricardo é licenciado sob uma Licença Creative Commons. Copie, compartihe e modifique, apenas cite a fonte.