您当前的位置:首页 >> 智慧城市
智慧城市

Flink1.15 MySQL Catalog 尝鲜

发布时间:2025-10-30

baseUrl, Properties props) { super(catalogName, defaultDatabase, props.getProperty("user"), props.getProperty("password"), baseUrl); this.props = props; } @Override public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNOTExistexception, CatalogException { if (!tableExists(tablePath)) { throw new TableNotExistException(getName(), tablePath); } String databaseName = tablePath.getDatabaseName(); String dbUrl = baseUrl + databaseName; // 改作通过 getConnection(url,props) 的方式则创建链接,才可以把 useInformationSchema=true 数数值广泛传播,会面链接 try (Connection conn = DriverManager.getConnection(dbUrl, props)) { DatabaseMetaData metaData = conn.getMetaData(); Optional primaryKey = getPrimaryKey( metaData, databaseName, getSchemaName(tablePath), getTableName(tablePath)); PreparedStatement ps = conn.prepareStatement( String.format("SELECT * FROM %s;", getSchemaTableName(tablePath))); ResultSetMetaData resultSetMetaData = ps.getMetaData(); String[] columnNames = new String[resultSetMetaData.getColumnCount()]; DataType[] types = new DataType[resultSetMetaData.getColumnCount()]; for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { columnNames[i - 1] = resultSetMetaData.getColumnName(i); types[i - 1] = fromJDBCtype(tablePath, resultSetMetaData, i); // TODO 由于 ADB 与 MySQL 字段类别有些不一致, colFlag 数数值有疑问从而出现字段但是受限制 NULL 的情况,最终产生了后面 SQL 解析不过。这里同样默认 ADB 所有字段都为 NOT NULL types[i - 1] = types[i - 1].notNull(); } Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types); primaryKey.ifPresent( pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); Schema tableSchema = schemaBuilder.build(); Map props = new HashMap<>(); props.put(CONNECTOR.key(), IDENTIFIER); props.put(URL.key(), dbUrl); props.put(USERNAME.key(), username); props.put(PASSWORD.key(), pwd); props.put(TABLE_NAME.key(), getSchemaTableName(tablePath)); return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props); } catch (Exception e) { throw new CatalogException( String.format("Failed getting table %s", tablePath.getFullName()), e); } } @Override protected List extractColumnValuesBySQL( String connUrl, String sql, int columnIndex, Predicate filterFunc, Object... params) { List columnValues = Lists.newArrayList(); // 改作可用 getConnection(url,props) try (Connection conn = DriverManager.getConnection(connUrl, props); PreparedStatement ps = conn.prepareStatement(sql)) { if (Objects.nonNull(params) && params.length> 0) { for (int i = 0; i < params.length; i++) { ps.setObject(i + 1, params[i]); } } ResultSet rs = ps.executeQuery(); while (rs.next()) { String columnValue = rs.getString(columnIndex); if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { columnValues.add(columnValue); } } return columnValues; } catch (Exception e) { throw new CatalogException( String.format( "The following SQL query could not be executed (%s): %s", connUrl, sql), e); } } protected Optional getPrimaryKey( DatabaseMetaData metaData, String database, String schema, String table) throws SQLException { ResultSet rs = metaData.getPrimaryKeys(database, schema, table); Map keySeqColumnName = new HashMap<>(); String pkName = null; while (rs.next()) { String columnName = rs.getString("COLUMN_NAME"); pkName = rs.getString("PK_NAME"); int keySeq = rs.getInt("KEY_SEQ"); Preconditions.checkState( !keySeqColumnName.containsKey(keySeq - 1), "The field(s) of primary key must be from the same table."); keySeqColumnName.put(keySeq - 1, columnName); } List pkFields = Arrays.asList(new String[keySeqColumnName.size()]); keySeqColumnName.forEach(pkFields::set); if (!pkFields.isEmpty()) { pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName; return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields)); } return Optional.empty(); }}

更改太大,思维大略就是构造步骤变更为赞成广泛传播 props,在即可要赚取链接时改用 DriverManager.getConnection(connUrl, props) 方式则创建通到。

Properties adbProps = new Properties();adbProps.setProperty("user", "xxxxxx");adbProps.setProperty("password", "xxxxxxx");adbProps.setProperty("useInformationSchema", "true");tableEnv.registerCatalog("adb", new AdbCatalog("adb", "defaultDatabase", "baseUrl", adbProps));关于字段受限制 NULL 的疑问

由于笔者可用阿里云的 ADB 检索与传统的 MySQL 有些不同。在散布 getTable() 时赚取到所有佩属性都为受限制 NULL 数值(以外字段)。在后面 SQL 解析前期虽然为字段但是却受限制 NULL 数值产生了过不去验证。

String[] columnNames = new String[resultSetMetaData.getColumnCount()]; DataType[] types = new DataType[resultSetMetaData.getColumnCount()]; for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { columnNames[i - 1] = resultSetMetaData.getColumnName(i); types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i); // 这里所有字段都未进到设徙为 NotNull 的逻辑 if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) { types[i - 1] = types[i - 1].notNull(); } }

疑问是 ResultSetMetaData.isNullable 中的

public int isNullable(int column) throws SQLException { try { return !this.getField(column).isNotNull() ? 1 : 0; } catch (CJException var3) { throw SQLExceptionsMapping.translateException(var3, this.exceptionInterceptor); } }

该步骤 isNotNull() 推论是通过 Field 的 colFlag 属性顺利进行 & 数数值得出结果的

public boolean isNotNull() { return (this.colFlag & 1)> 0; }

colFlag 默认数值为 0,现今只看到 adjustFlagsByMysqlType() 根据类别对 colFlag 的数值产生影响。

private void adjustFlagsByMysqlType() { switch(this.mysqlType) { case BIT: if (this.length> 1L) { this.colFlag = (short)(this.colFlag | 128); this.colFlag = (short)(this.colFlag | 16); } break; case BINARY: case VARBINARY: this.colFlag = (short)(this.colFlag | 128); this.colFlag = (short)(this.colFlag | 16); break; case DECIMAL_UNSIGNED: case TINYINT_UNSIGNED: case SMALLINT_UNSIGNED: case INT_UNSIGNED: case FLOAT_UNSIGNED: case DOUBLE_UNSIGNED: case BIGINT_UNSIGNED: case MEDIUMINT_UNSIGNED: this.colFlag = (short)(this.colFlag | 32); } }

由于笔者 ADB 字长段均为 NOT NULL 类别,可以同样通过去除掉推论 colFlag 数值让字段都徙为 NOT NULL 来解决疑问,因此未去研究工作分析,有用的朋友可以深入分析下理由,有结果瞩目告知。

最后

感谢您的阅读,如果喜欢本文瞩目关注和转发,转载即可注明记事,本头条号将持续分享IT技术知识。对于编者有其他想法或意见建议等,瞩目提议主导讨论主导进步。

重庆妇科检查
广州早泄阳痿专业治疗医院
北京哪家医院做人流最好
广东妇科检查
郑州哪里无痛人流好

上一篇: DAY3|探秘531兵工厂,小记者变身小勇士上演真人CS“热血战”

下一篇: 真正聪明的人:暑假底下,朋友圈有“三不晒”

友情链接