Catalog(目录)提供了关于数据库、表格和访问数据所需的信息的元数据,以及统一的 API 来管理元数据,验证连接,让元数据对 Sources(数据源)、Sinks(数据汇)和 Web 可访问。
01
Catalog功能的重要性
02
Catalog API
Java
public interface CatalogFactory extends Factory { String factoryIdentifier(); OptionRule optionRule(); Catalog createCatalog(String catalogName, ReadonlyConfig options); } public interface Catalog extends AutoCloseable { void open() throws CatalogException; void close() throws CatalogException; }
java
public interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 数据库 // -------------------------------------------------------------------------------------------- String getDefaultDatabase() throws CatalogException; boolean databaseExists(String databaseName) throws CatalogException; List<String> listDatabases() throws CatalogException; void createDatabase(String databaseName, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException; void dropDatabase(String databaseName, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException; }
java
public interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 表格 // -------------------------------------------------------------------------------------------- List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException; boolean tableExists(TablePath tablePath) throws CatalogException; CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException; void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException; void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; }
03
MySQL catalog
conf
[source/sink] { [connector-factory-id] { catalog { factory = "MySQL" username = "test" password = "123456" base-url = "jdbc:mysql://localhost:5432/db" table-names = [ "db.table" ] } } }
sql
env { "job.mode"=STREAMING "job.name"="cdc_mysql_to_mysql" "checkpoint.interval"="2000" "custom_parameters"="" } source { MySQL-CDC { parallelism = 1 catalog { factory = "MySQL" # 默认情况下,Catalog 将使用与连接器同名的选项 } username = "mysqluser" password = "mysqlpw" database-names = ["seatunnel-test"] table-pattern = "seatunnel-test\.orders_\d+" base-url = "jdbc:mysql://localhost:54508/seatunnel-test" } } sink { jdbc { url = "jdbc:mysql://localhost:4000/test" driver = "com.mysql.cj.jdbc.Driver" catalog { factory = "MySQL" username = "root" password = "" base-url = "jdbc:mysql://localhost:4000/test" table-pattern = "seatunnel-test2\.orders_\d+" } user = "root" password = "" query = "insert into sink(age, name) values(?,?)" } }
04
未来规划
Apache SeaTunnel