package com.huaweicloud.dws.connectors.flink.source;

import com.huaweicloud.dws.client.DwsConfig;
import com.huaweicloud.dws.client.exception.ExceptionCode;
import com.huaweicloud.dws.client.model.TableName;
import com.huaweicloud.dws.client.model.TableSchema;
import com.huaweicloud.dws.client.util.JdbcUtil;
import com.huaweicloud.dws.client.worker.ConnectionProvider;
import com.huaweicloud.dws.connectors.flink.config.DwsConnectionOptions;
import com.huaweicloud.dws.connectors.flink.config.DwsReadOptions;
import com.huaweicloud.dws.connectors.flink.exception.InternalException;
import com.huaweicloud.dws.connectors.flink.utils.ConverUtils;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/dws/connectors/flink/source/DwsRowDataLookupFunction.class */
public class DwsRowDataLookupFunction extends TableFunction<RowData> {
    private static final Logger log = LoggerFactory.getLogger(DwsRowDataLookupFunction.class);
    public static final long RETRY_INTERVAL_TIME = 1000;
    private final String[] keyNames;
    private RowData.FieldGetter[] getters;
    private final String query;
    private final RowType rowType;
    private final ConnectionProvider connectionProvider;
    private PreparedStatement statement;
    private final DwsConnectionOptions connectionOptions;
    private final DwsReadOptions readOptions;
    private TableSchema tableSchema;

    public DwsRowDataLookupFunction(DwsConnectionOptions dwsConnectionOptions, DwsReadOptions dwsReadOptions, String[] strArr, DataType[] dataTypeArr, String[] strArr2, RowType rowType) {
        this.readOptions = dwsReadOptions;
        this.rowType = rowType;
        this.connectionOptions = dwsConnectionOptions;
        this.connectionProvider = new ConnectionProvider(DwsConfig.builder().withUrl(this.connectionOptions.getUrl()).withPassword(this.connectionOptions.getPassword()).withUsername(this.connectionOptions.getUsername()).withDriverName(this.connectionOptions.getDriver()).build());
        this.keyNames = strArr2;
        List asList = Arrays.asList(strArr);
        DataType[] dataTypeArr2 = (DataType[]) Arrays.stream(strArr2).map(str -> {
            Preconditions.checkArgument(asList.contains(str), "keyName %s can't find in fieldNames %s.", new Object[]{str, asList});
            return dataTypeArr[asList.indexOf(str)];
        }).toArray(i -> {
            return new DataType[i];
        });
        if (strArr2.length > 0) {
            this.getters = new RowData.FieldGetter[strArr2.length];
            for (int i2 = 0; i2 < this.getters.length; i2++) {
                this.getters[i2] = RowData.createFieldGetter(dataTypeArr2[i2].getLogicalType(), i2);
            }
        }
        this.query = JdbcUtil.getSelectFromStatement(TableName.valueOf(this.connectionOptions.getTableName()).getFullName(), Arrays.asList(strArr), Arrays.asList(strArr2));
    }

    public void open(FunctionContext functionContext) throws Exception {
        try {
            if (this.tableSchema == null) {
                this.tableSchema = JdbcUtil.getTableSchema(this.connectionProvider.getOrInitConnection(), TableName.valueOf(this.connectionOptions.getTableName()));
            }
            establishConnectionAndStatement();
        } catch (SQLException e) {
            throw new IllegalArgumentException("open failed.", e);
        }
    }

    private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
        this.statement = this.connectionProvider.getOrInitConnection().prepareStatement(this.query);
    }

    private void setParams(RowData rowData) throws SQLException {
        for (int i = 0; i < this.keyNames.length; i++) {
            JdbcUtil.fillPreparedStatement(this.statement, i + 1, this.getters[i].getFieldOrNull(rowData), this.tableSchema.getColumn(this.keyNames[i]));
        }
    }

    public void eval(Object... objArr) {
        GenericRowData of = GenericRowData.of(objArr);
        for (int i = 0; i <= this.readOptions.getMaxRetryTimes(); i++) {
            try {
                this.statement.clearParameters();
                setParams(of);
                ResultSet executeQuery = this.statement.executeQuery();
                Throwable th = null;
                while (executeQuery.next()) {
                    try {
                        try {
                            collect(ConverUtils.resultSetToRowData(executeQuery, this.rowType));
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                            break;
                        }
                    } finally {
                    }
                }
                if (executeQuery != null) {
                    if (0 != 0) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
                return;
            } catch (SQLException e) {
                log.error("JDBC executeBatch error, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.readOptions.getMaxRetryTimes()) {
                    throw new InternalException(ExceptionCode.INTERNAL_ERROR, e);
                }
                try {
                    if (!this.connectionProvider.isConnectionValid()) {
                        this.statement.close();
                        this.connectionProvider.close();
                        establishConnectionAndStatement();
                    }
                    try {
                        Thread.sleep(1000 * i);
                    } catch (InterruptedException e2) {
                        throw new InternalException(ExceptionCode.INTERNAL_ERROR, e2);
                    }
                } catch (ClassNotFoundException | SQLException e3) {
                    log.error("JDBC connection is not valid, and reestablish connection failed", e3);
                    throw new InternalException(ExceptionCode.INTERNAL_ERROR, "Reestablish JDBC connection failed", e3);
                }
            }
        }
    }

    public void close() throws IOException {
        if (this.statement != null) {
            try {
                this.statement.close();
            } catch (SQLException e) {
                log.info("JDBC statement could not be closed: " + e.getMessage());
            } finally {
                this.statement = null;
            }
        }
        this.connectionProvider.close();
    }
}
