智慧城市
						
						Flink1.15 MySQL Catalog 尝鲜
发布时间:2025-10-30
baseUrl, Properties props) {        super(catalogName, defaultDatabase, props.getProperty("user"), props.getProperty("password"), baseUrl);        this.props = props;    }    @Override    public CatalogBaseTable getTable(ObjectPath tablePath)            throws TableNOTExistexception, CatalogException {        if (!tableExists(tablePath)) {            throw new TableNotExistException(getName(), tablePath);        }        String databaseName = tablePath.getDatabaseName();        String dbUrl = baseUrl + databaseName;         // 改作通过 getConnection(url,props) 的方式则创建链接,才可以把 useInformationSchema=true 数数值广泛传播,会面链接        try (Connection conn = DriverManager.getConnection(dbUrl, props)) {            DatabaseMetaData metaData = conn.getMetaData();            Optional primaryKey =                    getPrimaryKey(                            metaData,                            databaseName,                            getSchemaName(tablePath),                            getTableName(tablePath));            PreparedStatement ps =                    conn.prepareStatement(                            String.format("SELECT * FROM %s;", getSchemaTableName(tablePath)));            ResultSetMetaData resultSetMetaData = ps.getMetaData();            String[] columnNames = new String[resultSetMetaData.getColumnCount()];            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {                columnNames[i - 1] = resultSetMetaData.getColumnName(i);                types[i - 1] = fromJDBCtype(tablePath, resultSetMetaData, i);                // TODO 由于 ADB 与 MySQL 字段类别有些不一致, colFlag 数数值有疑问从而出现字段但是受限制 NULL 的情况,最终产生了后面 SQL 解析不过。这里同样默认 ADB 所有字段都为 NOT NULL                types[i - 1] = types[i - 1].notNull();            }            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);            primaryKey.ifPresent(                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));            Schema tableSchema = schemaBuilder.build();            Map props = new HashMap<>();            props.put(CONNECTOR.key(), IDENTIFIER);            props.put(URL.key(), dbUrl);            props.put(USERNAME.key(), username);            props.put(PASSWORD.key(), pwd);            props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);        } catch (Exception e) {            throw new CatalogException(                    String.format("Failed getting table %s", tablePath.getFullName()), e);        }    }    @Override    protected List extractColumnValuesBySQL(            String connUrl,            String sql,            int columnIndex,            Predicate filterFunc,            Object... params) {        List columnValues = Lists.newArrayList();      	// 改作可用 getConnection(url,props)        try (Connection conn = DriverManager.getConnection(connUrl, props);             PreparedStatement ps = conn.prepareStatement(sql)) {            if (Objects.nonNull(params) && params.length> 0) {                for (int i = 0; i < params.length; i++) {                    ps.setObject(i + 1, params[i]);                }            }            ResultSet rs = ps.executeQuery();            while (rs.next()) {                String columnValue = rs.getString(columnIndex);                if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {                    columnValues.add(columnValue);                }            }            return columnValues;        } catch (Exception e) {            throw new CatalogException(                    String.format(                            "The following SQL query could not be executed (%s): %s", connUrl, sql),                    e);        }    }    protected Optional getPrimaryKey(            DatabaseMetaData metaData, String database, String schema, String table)            throws SQLException {        ResultSet rs = metaData.getPrimaryKeys(database, schema, table);        Map keySeqColumnName = new HashMap<>();        String pkName = null;        while (rs.next()) {            String columnName = rs.getString("COLUMN_NAME");            pkName = rs.getString("PK_NAME");            int keySeq = rs.getInt("KEY_SEQ");            Preconditions.checkState(                    !keySeqColumnName.containsKey(keySeq - 1),                    "The field(s) of primary key must be from the same table.");            keySeqColumnName.put(keySeq - 1, columnName);        }        List pkFields =                Arrays.asList(new String[keySeqColumnName.size()]);        keySeqColumnName.forEach(pkFields::set);        if (!pkFields.isEmpty()) {            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));        }        return Optional.empty();    }}
广州早泄阳痿专业治疗医院
北京哪家医院做人流最好
广东妇科检查
郑州哪里无痛人流好
							
							
						更改太大,思维大略就是构造步骤变更为赞成广泛传播 props,在即可要赚取链接时改用 DriverManager.getConnection(connUrl, props) 方式则创建通到。
Properties adbProps = new Properties();adbProps.setProperty("user", "xxxxxx");adbProps.setProperty("password", "xxxxxxx");adbProps.setProperty("useInformationSchema", "true");tableEnv.registerCatalog("adb", new AdbCatalog("adb", "defaultDatabase", "baseUrl", adbProps));关于字段受限制 NULL 的疑问由于笔者可用阿里云的 ADB 检索与传统的 MySQL 有些不同。在散布 getTable() 时赚取到所有佩属性都为受限制 NULL 数值(以外字段)。在后面 SQL 解析前期虽然为字段但是却受限制 NULL 数值产生了过不去验证。
String[] columnNames = new String[resultSetMetaData.getColumnCount()]; DataType[] types = new DataType[resultSetMetaData.getColumnCount()]; for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { columnNames[i - 1] = resultSetMetaData.getColumnName(i); types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i); // 这里所有字段都未进到设徙为 NotNull 的逻辑 if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) { types[i - 1] = types[i - 1].notNull(); } }疑问是 ResultSetMetaData.isNullable 中的
public int isNullable(int column) throws SQLException { try { return !this.getField(column).isNotNull() ? 1 : 0; } catch (CJException var3) { throw SQLExceptionsMapping.translateException(var3, this.exceptionInterceptor); } }该步骤 isNotNull() 推论是通过 Field 的 colFlag 属性顺利进行 & 数数值得出结果的
public boolean isNotNull() { return (this.colFlag & 1)> 0; }colFlag 默认数值为 0,现今只看到 adjustFlagsByMysqlType() 根据类别对 colFlag 的数值产生影响。
private void adjustFlagsByMysqlType() { switch(this.mysqlType) { case BIT: if (this.length> 1L) { this.colFlag = (short)(this.colFlag | 128); this.colFlag = (short)(this.colFlag | 16); } break; case BINARY: case VARBINARY: this.colFlag = (short)(this.colFlag | 128); this.colFlag = (short)(this.colFlag | 16); break; case DECIMAL_UNSIGNED: case TINYINT_UNSIGNED: case SMALLINT_UNSIGNED: case INT_UNSIGNED: case FLOAT_UNSIGNED: case DOUBLE_UNSIGNED: case BIGINT_UNSIGNED: case MEDIUMINT_UNSIGNED: this.colFlag = (short)(this.colFlag | 32); } }由于笔者 ADB 字长段均为 NOT NULL 类别,可以同样通过去除掉推论 colFlag 数值让字段都徙为 NOT NULL 来解决疑问,因此未去研究工作分析,有用的朋友可以深入分析下理由,有结果瞩目告知。
最后感谢您的阅读,如果喜欢本文瞩目关注和转发,转载即可注明记事,本头条号将持续分享IT技术知识。对于编者有其他想法或意见建议等,瞩目提议主导讨论主导进步。
。重庆妇科检查广州早泄阳痿专业治疗医院
北京哪家医院做人流最好
广东妇科检查
郑州哪里无痛人流好
							相关阅读
						
						
- 
									你喜欢跑自行车吗?自行车跑是三维立体的跑步,你需要不断变换跑姿,可以左右跑,上下跑,躲闪树枝,跳过石头!自行车跑
									
你爱好冲刺越野吗?越野冲刺是三维立体的冲刺步,你只能促使变换冲刺姿,可以左右冲刺,上下冲刺,躲闪树枝,跳过石头!越野冲刺a href="https:www.12