本文将介绍如何在FlinkSQL中使用基于jdbc的自定义Catalog。
Catalog是一个用于管理数据库和表的元数据存储系统。在FlinkSQL中,Catalog用于描述Flink集群中的数据库和表。FlinkSQL支持多种类型的Catalog,如默认基于内存的GenericInMemoryCatalog,基于Hive的HiveCatalog、基于JDBC的MySqlCatalog和PostgresCatalog等。
GenericInMemoryCatalog:基于内存,所有数据库和表信息都存储在内存中,重启后即消失HiveCatalog:数据存储在hive的元数据中,需要部署hive服务才能用MySqlCatalog和PostgresCatalog: 基于jdbc,只能读取表结构,对应的jdbc数据库中有哪些表就只能用哪些表,无法创建。在FlinkSQL中,除了使用默认的Catalog外,我们还可以通过实现自定义的Catalog来管理数据源和表。自定义Catalog可以更灵活地管理数据库和表,自定义持久化逻辑,满足不同场景下的需求。本文将介绍如何使用基于jdbc的自定义Catalog。
首先,我们需要实现一个继承自JdbcCatalog的自定义Catalog。JdbcCatalog是Flink SQL中内置的一个基于JDBC的Catalog实现,我们可以通过继承该类,重写其中的方法来实现自定义的Catalog。
自定义Catalog需要继承JdbcCatalog类,并重写其中的方法来实现自定义的逻辑。在打开和关闭Catalog时,我们可以实现自定义的初始化和资源释放逻辑。在listTables和getTable方法中,我们需要实现自定义的表列表查询和表查询逻辑。除此之外,还可以重写其他方法来满足不同的需求。
基于mysql,如果要使用其他数据库相应调整ddl即可
数据库(Database):
CREATE TABLE `metadata_database`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`create_person` varchar(100) DEFAULT '' COMMENT '创建人',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_person` varchar(100) DEFAULT '' COMMENT '最后更新人',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除',
`database_name` varchar(100) DEFAULT NULL COMMENT '数据库名',
`comment` varchar(200) DEFAULT NULL COMMENT '备注信息',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `uni_database_name` (`database_name`, `type`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC COMMENT ='元数据_数据库
数据表(Table):
CREATE TABLE `metadata_table`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`create_person` varchar(100) DEFAULT '' COMMENT '创建人',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_person` varchar(100) DEFAULT '' COMMENT '最后更新人',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除',
`database_name` varchar(100) DEFAULT NULL COMMENT '数据库名',
`database_id` bigint(20) DEFAULT NULL COMMENT '数据库id',
`table_name` varchar(100) NOT NULL,
`comment` varchar(200) DEFAULT NULL COMMENT '备注',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `idx_table_name` (`table_name`, `database_name`, `type`),
KEY `idx_database_id` (`database_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC COMMENT ='元数据_数据表';
字段信息(Column)
CREATE TABLE `metadata_column`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`create_person` varchar(100) DEFAULT '' COMMENT '创建人',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_person` varchar(100) DEFAULT '' COMMENT '最后更新人',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除',
`column_name` varchar(100) DEFAULT NULL COMMENT '字段名',
`column_type` varchar(100) DEFAULT NULL COMMENT '字段类型',
`column_length` int(11) DEFAULT NULL COMMENT '字段长度',
`precision` int(11) DEFAULT NULL COMMENT '字段精度',
`table_id` bigint(20) DEFAULT NULL COMMENT '表id',
`database_id` bigint(20) DEFAULT NULL COMMENT '数据库id',
`nullable` tinyint(1) DEFAULT NULL COMMENT '是否可空',
`scale` int(11) DEFAULT NULL COMMENT '小数位数',
`comment` varchar(200) DEFAULT NULL COMMENT '备注信息',
`primary_key` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否是主键',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_database_id` (`database_id`),
KEY `idx_table_id` (`table_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC COMMENT ='元数据_字段';
属性信息(Properties)
CREATE TABLE `metadata_properties`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`create_person` varchar(100) DEFAULT '' COMMENT '创建人',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_person` varchar(100) DEFAULT '' COMMENT '最后更新人',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除',
`key` varchar(100) DEFAULT NULL COMMENT 'key',
`value` varchar(300) DEFAULT NULL COMMENT 'value',
`data_id` bigint(20) DEFAULT NULL COMMENT '表id',
`data_type` varchar(20) DEFAULT NULL COMMENT '数据类型',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `uni_data_key` (`data_id`, `key`, `data_type`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC COMMENT ='元数据_数据表属性';
水印信息(Watermark)
CREATE TABLE `metadata_watermark`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`create_person` varchar(100) DEFAULT '' COMMENT '创建人',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_person` varchar(100) DEFAULT '' COMMENT '最后更新人',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除',
`column_name` varchar(100) DEFAULT NULL COMMENT '字段名',
`expression` varchar(500) DEFAULT NULL COMMENT '表达式',
`table_id` bigint(20) DEFAULT NULL COMMENT '表id',
`database_id` bigint(20) NOT NULL COMMENT '数据库id',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='元数据_数据表_水印配置';
函数信息(Function)
CREATE TABLE `metadata_function`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`create_person` varchar(100) DEFAULT '' COMMENT '创建人',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_person` varchar(100) DEFAULT '' COMMENT '最后更新人',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`dr` tinyint(2) NOT NULL DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除',
`class_name` varchar(300) NOT NULL COMMENT '函数类名',
`function_name` varchar(100) NOT NULL COMMENT '函数名',
`description` varchar(500) DEFAULT NULL COMMENT '函数描述',
`detail_description` varchar(500) DEFAULT NULL COMMENT '函数详细描述',
`is_generic` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否泛型',
`function_language` varchar(100) NOT NULL COMMENT '函数语言',
`database_name` varchar(100) NOT NULL COMMENT '数据库名',
`database_id` bigint(20) NOT NULL COMMENT '数据库id',
PRIMARY KEY (`id`),
UNIQUE KEY `uni_idx_function_name` (`function_name`, `database_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='元数据_函数'