package com.huaweicloud.dws.client.handler;

import com.huaweicloud.dws.client.DwsConfig;
import com.huaweicloud.dws.client.action.PutAction;
import com.huaweicloud.dws.client.exception.DwsClientException;
import com.huaweicloud.dws.client.exception.DwsClientRecordException;
import com.huaweicloud.dws.client.executor.CopyMergeExecutor;
import com.huaweicloud.dws.client.executor.CopyUpsertExecutor;
import com.huaweicloud.dws.client.executor.DeleteExecutor;
import com.huaweicloud.dws.client.executor.UpsertExecutor;
import com.huaweicloud.dws.client.model.OperationType;
import com.huaweicloud.dws.client.model.Record;
import com.huaweicloud.dws.client.model.TableName;
import com.huaweicloud.dws.client.model.WriteMode;
import com.huaweicloud.dws.client.util.JdbcUtil;
import com.huaweicloud.dws.client.worker.ConnectionProvider;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/dws/client/handler/PutActionHandler.class */
public class PutActionHandler extends AbstractActionHandler<PutAction> {
    private static final Logger log = LoggerFactory.getLogger(PutActionHandler.class);
    final DwsConfig config;
    final ConnectionProvider connectionProvider;
    final UpsertExecutor upsertExecutor = new UpsertExecutor();
    final DeleteExecutor deleteExecutor = new DeleteExecutor();
    final CopyMergeExecutor copyMergeExecutor = new CopyMergeExecutor();
    final CopyUpsertExecutor copyUpsertExecutor = new CopyUpsertExecutor();

    public PutActionHandler(DwsConfig dwsConfig, ConnectionProvider connectionProvider) {
        this.config = dwsConfig;
        this.connectionProvider = connectionProvider;
    }

    @Override // com.huaweicloud.dws.client.handler.AbstractActionHandler
    public void handle(PutAction putAction) throws DwsClientException {
        List<Record> records = putAction.getRecords();
        if (records == null || records.isEmpty()) {
            log.debug("current action records is empty.");
            putAction.getFuture().complete(null);
            return;
        }
        try {
            doHandle(putAction, records);
            Consumer<List<Record>> successFunction = this.config.getSuccessFunction();
            if (successFunction != null) {
                successFunction.accept(records);
            }
        } catch (DwsClientException e) {
            throw new DwsClientRecordException(e, records);
        }
    }

    private void doHandle(PutAction putAction, List<Record> list) throws DwsClientException {
        TableName tableName = list.get(0).getTableSchema().getTableName();
        WriteMode writeMode = this.config.getTableConfig(tableName).getWriteMode();
        try {
            Connection orInitConnection = this.connectionProvider.getOrInitConnection();
            try {
                if (orInitConnection.getAutoCommit()) {
                    orInitConnection.setAutoCommit(false);
                }
                JdbcUtil.executeSql(orInitConnection, String.format("SET statement_timeout = %s", Long.valueOf(this.config.getTimeOutMs())));
                this.deleteExecutor.execute((List) list.stream().filter(record -> {
                    return record.getType() == OperationType.DELETE;
                }).collect(Collectors.toList()), orInitConnection, this.config);
                List<Record> list2 = (List) list.stream().filter(record2 -> {
                    return record2.getType() == OperationType.UPSERT;
                }).collect(Collectors.toList());
                boolean z = writeMode == WriteMode.AUTO && list2.size() < this.config.getTableConfig(tableName).getCopyWriteBatchSize();
                if (writeMode != WriteMode.UPSERT && !z) {
                    switch (writeMode) {
                        case COPY_UPSERT:
                        case AUTO:
                            this.copyUpsertExecutor.execute(list2, orInitConnection, this.config);
                            break;
                        case COPY_MERGE:
                            this.copyMergeExecutor.execute(list2, orInitConnection, this.config);
                            break;
                        default:
                            log.error("unknown executor write mode {}", writeMode);
                            break;
                    }
                } else {
                    if (this.config.getTableConfig(tableName).isEnableHstoreUpsertAutocommit()) {
                        JdbcUtil.executeSql(orInitConnection, "set enable_hstore_upsert_autocommit=on;");
                    }
                    try {
                        this.upsertExecutor.execute(list2, orInitConnection, this.config);
                        if (this.config.getTableConfig(tableName).isEnableHstoreUpsertAutocommit()) {
                            JdbcUtil.executeSql(orInitConnection, "set enable_hstore_upsert_autocommit=off;");
                        }
                    } catch (Throwable th) {
                        if (this.config.getTableConfig(tableName).isEnableHstoreUpsertAutocommit()) {
                            JdbcUtil.executeSql(orInitConnection, "set enable_hstore_upsert_autocommit=off;");
                        }
                        throw th;
                    }
                }
                orInitConnection.commit();
                putAction.getFuture().complete(null);
            } catch (Throwable th2) {
                log.error("hand put action fail.", th2);
                try {
                    orInitConnection.rollback();
                    throw DwsClientException.fromException(th2);
                } catch (SQLException e) {
                    log.error("rollback fail.", e);
                    throw DwsClientException.fromException(e);
                }
            }
        } catch (Exception e2) {
            log.error("get connection fail", e2);
            throw DwsClientException.fromException(e2);
        }
    }
}
