修复flink-jdbc含有schema的表报不存在

2022-08-01,,,,

博客背景

最近,使用flink读取postgresql的表数据时,代码一直报错,提示表不存在。通过debug发现,flink-jdbc的JDBCDialect类存在一点问题,现记录如下。

环境说明

flink版本:1.9.1
scala版本:2.12
postgresql数据库表结构:
ipark_datacenter:
​    --ods
        – t_test(id, name)

测试代码

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;

/**
 * @Author: maozl
 * @Date: 2019/10/17 17:14
 * @Description:
 */
public class FlinkJdbcDialectBugShow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //jdbc
        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://127.0.0.1:65432/ipark_datacenter")
                .setUsername("root")
                .setPassword("admin")
                .setTableName("ods.t_test")
                .build();
        TableSchema tableSchema = TableSchema.builder()
                .fields(new String[]{"id", "name"}, new DataType[]{TypeConversions.fromLegacyInfoToDataType(BasicTypeInfo.INT_TYPE_INFO), TypeConversions.fromLegacyInfoToDataType(BasicTypeInfo.STRING_TYPE_INFO)})
                .build();

        JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();

        tEnv.registerTableSource("userInfo", jdbcTableSource);

        tEnv.scan("userInfo").printSchema();
        Table query = tEnv.sqlQuery("select * from userInfo");
        DataStream<Row> rs = tEnv.toAppendStream(query, Row.class);
        rs.print();

        env.execute();
    }

}

代码执行报错,报错信息如下

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
    at FlinkJdbcDialectBugShow.main(FlinkJdbcDialectBugShow.java:53)
Caused by: java.lang.Exception: java.lang.IllegalArgumentException: open() failed.ERROR: relation "ods.t_test" does not exist
  位置:26
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: open() failed.ERROR: relation "ods.t_test" does not exist
  位置:26
    at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:250)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: org.postgresql.util.PSQLException: ERROR: relation "ods.t_test" does not exist
  位置:26
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
    at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:143)
    at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:106)
    at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:247)
    ... 4 more

报错信息提示 relation “ods.t_test” does not exist,但实际上我已经在ods这个schema下建了t_test这个表的。

Debug排查错误原因

  1. 首先,我在JdbcTableSource里找到下面这行代码。代码里的查询语句,解析成了SELECT “id”, “name” FROM “ods.t_test”。我们看到,这个sql的字段和表名都多了一层引号。并且,我们把这句sql拿到数据库执行,同样会报表不存在
  2. 通过sql解析函数 ,我们进到了JDBCDialect这个类。里面有个quoteIdentifier(tableName)方法,就是用来处理表名的
  3. quoteIdentifier代码内容如下。方法的介绍告诉我们,这个方法是为了防止字段、表名等含有保留字段,或者含有引号,它直接在它外层加上了引号。通过这个方法,我们的表名ods.t_test就变成了"ods.t_test"
  4. 上面的代码,如果针对没有schema的table,是能正常运行的。但如果含有schema,则会报表不存在。知道了原因,解决起来就方便了,我们只需要对表名进行一下处理,然后替换一下获取表名的方法就行了
    default String quoteTablename(String tableName){
        if(tableName.contains(".")){
            String[] strs = tableName.split("\\.");
            return Arrays.stream(strs).map(s -> quoteIdentifier(s)).collect(Collectors.joining("."));
        }else {
            return quoteIdentifier(tableName);
        }
    }
    

JDBCDialet 完整代码

package org.apache.flink.api.java.io.jdbc.dialect;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * @Author: maozl01
 * @Date: 2020/7/17 17:14
 * @Description: JDBCDialect bug fix
 */
public interface JDBCDialect extends Serializable {

    /**
     * Check if this dialect instance can handle a certain jdbc url.
     * @param url the jdbc url.
     * @return True if the dialect can be applied on the given jdbc url.
     */
    boolean canHandle(String url);

    /**
     * @return the default driver class name, if user not configure the driver class name,
     * then will use this one.
     */
    default Optional<String> defaultDriverName() {
        return Optional.empty();
    }

    /**
     * Quotes the identifier. This is used to put quotes around the identifier in case the column
     * name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
     * Default using double quotes {@code "} to quote.
     */
    default String quoteIdentifier(String identifier) {
        return "\"" + identifier + "\"";
    }

    /**
     * Get dialect upsert statement, the database has its own upsert syntax, such as Mysql
     * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
     *
     * @return None if dialect does not support upsert statement, the writer will degrade to
     * the use of select + update/insert, this performance is poor.
     */
    default Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        return Optional.empty();
    }

    default String quoteTablename(String tableName){
        if(tableName.contains(".")){
            String[] strs = tableName.split("\\.");
            return Arrays.stream(strs).map(s -> quoteIdentifier(s)).collect(Collectors.joining("."));
        }else {
            return quoteIdentifier(tableName);
        }
    }
    /**
     * Get row exists statement by condition fields. Default use SELECT.
     */
    default String getRowExistsStatement(String tableName, String[] conditionFields) {
        String fieldExpressions = Arrays.stream(conditionFields)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(" AND "));

        return "SELECT 1 FROM " + quoteTablename(tableName) + " WHERE " + fieldExpressions;
    }

    /**
     * Get insert into statement.
     */
    default String getInsertIntoStatement(String tableName, String[] fieldNames) {
        String columns = Arrays.stream(fieldNames)
                .map(this::quoteIdentifier)
                .collect(Collectors.joining(", "));
        String placeholders = Arrays.stream(fieldNames)
                .map(f -> "?")
                .collect(Collectors.joining(", "));
        return "INSERT INTO " + quoteTablename(tableName) +
                "(" + columns + ")" + " VALUES (" + placeholders + ")";
    }

    /**
     * Get update one row statement by condition fields, default not use limit 1,
     * because limit 1 is a sql dialect.
     */
    default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
        String setClause = Arrays.stream(fieldNames)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(", "));
        String conditionClause = Arrays.stream(conditionFields)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(" AND "));
        return "UPDATE " + quoteTablename(tableName) +
                " SET " + setClause +
                " WHERE " + conditionClause;
    }

    /**
     * Get delete one row statement by condition fields, default not use limit 1,
     * because limit 1 is a sql dialect.
     */
    default String getDeleteStatement(String tableName, String[] conditionFields) {
        String conditionClause = Arrays.stream(conditionFields)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(" AND "));
        return "DELETE FROM " + quoteTablename(tableName) + " WHERE " + conditionClause;
    }

    /**
     * Get select fields statement by condition fields. Default use SELECT.
     */
    default String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
        String selectExpressions = Arrays.stream(selectFields)
                .map(this::quoteIdentifier)
                .collect(Collectors.joining(", "));
        String fieldExpressions = Arrays.stream(conditionFields)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(" AND "));
        return "SELECT " + selectExpressions + " FROM " +
                quoteTablename(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
    }
}

本文地址:https://blog.csdn.net/u014730001/article/details/107356393

《修复flink-jdbc含有schema的表报不存在.doc》

下载本文的Word格式文档,以方便收藏与打印。