Documentation
¶
Index ¶
- Constants
- func RegisterFlags()
- type Config
- type DebeziumData
- type DebeziumOperation
- type DebeziumRowData
- type DebeziumRowSource
- type Nats
- type NatsConfig
- type PgSchemaColumn
- type PgSchemaTable
- type Postgres
- func (postgres *Postgres) Close()
- func (postgres *Postgres) CreateReplicationSlot(slotName string)
- func (postgres *Postgres) PgSchemaColumns(pgSchemaTable PgSchemaTable, retryCount ...int) []PgSchemaColumn
- func (postgres *Postgres) Reconnect()
- func (postgres *Postgres) ReplicationSlotExists(slotName string) bool
- func (postgres *Postgres) SchemaTables(schema string) []PgSchemaTable
- func (postgres *Postgres) Schemas() []string
- type SyncMode
- type Syncer
- type SyncerFullRefresh
- type SyncerUtils
Constants ¶
View Source
const ( SyncModeFullRefresh SyncMode = "FULL_REFRESH" SyncModeCDC SyncMode = "CDC" SyncModeIncremental SyncMode = "INCREMENTAL" ENV_DESTINATION_SCHEMA_NAME = "DESTINATION_SCHEMA_NAME" ENV_DATABASE_URL = "SOURCE_POSTGRES_DATABASE_URL" ENV_SYNC_MODE = "SOURCE_POSTGRES_SYNC_MODE" ENV_INCLUDE_TABLES = "SOURCE_POSTGRES_INCLUDE_TABLES" ENV_EXCLUDE_TABLES = "SOURCE_POSTGRES_EXCLUDE_TABLES" ENV_CURSOR_COLUMNS = "SOURCE_POSTGRES_CURSOR_COLUMNS" // Incremental sync ENV_REPLICATION_SLOT = "SOURCE_POSTGRES_REPLICATION_SLOT" // CDC sync ENV_IGNORE_UPDATE_COLUMNS = "SOURCE_POSTGRES_IGNORE_UPDATE_COLUMNS" // CDC sync // CDC sync ENV_NATS_URL = "NATS_URL" ENV_NATS_STREAM = "NATS_JETSTREAM_STREAM" ENV_NATS_SUBJECT = "NATS_JETSTREAM_SUBJECT" ENV_NATS_CONSUMER_NAME = "NATS_JETSTREAM_CONSUMER_NAME" ENV_NATS_FETCH_TIMEOUT_SECONDS = "NATS_FETCH_TIMEOUT_SECONDS" DEFAULT_NATS_FETCH_TIMEOUT_SECONDS = 30 )
View Source
const ( PG_TRUE = "YES" PG_DATA_TYPE_ARRAY = "ARRAY" )
View Source
const ( PG_SCHEMA_PUBLIC = "public" PG_SCHEMA_PG_CATALOG = "pg_catalog" PG_CONNECTION_TIMEOUT = 30 * time.Second PG_SESSION_TIMEOUT = "2h" POSTGRES_MAX_RETRY_COUNT = 1 )
Variables ¶
This section is empty.
Functions ¶
func RegisterFlags ¶
func RegisterFlags()
Types ¶
type Config ¶
type Config struct {
CommonConfig *common.CommonConfig
DestinationSchemaName string
SyncMode SyncMode
DatabaseUrl string
IncludeTables common.Set[string]
ExcludeTables common.Set[string]
CursorColumnNameByTableName map[string]string // Incremental sync
ReplicationSlot string // CDC sync
IgnoreUpdateColumns common.Set[string] // CDC sync
Nats NatsConfig // CDC sync
}
func LoadConfig ¶
func LoadConfig() *Config
type DebeziumData ¶
type DebeziumData struct {
Operation string `json:"op"`
}
type DebeziumOperation ¶
type DebeziumOperation string
const ( InsertOperation DebeziumOperation = "c" UpdateOperation DebeziumOperation = "u" DeleteOperation DebeziumOperation = "d" MessageOperation DebeziumOperation = "m" SnapshotOperation DebeziumOperation = "r" )
type DebeziumRowData ¶
type DebeziumRowData struct {
Operation string `json:"op"`
Before map[string]interface{} `json:"before"`
After map[string]interface{} `json:"after"`
Source DebeziumRowSource `json:"source"`
}
type DebeziumRowSource ¶
type NatsConfig ¶
type PgSchemaColumn ¶
type PgSchemaColumn struct {
ColumnName string
DataType string
UdtName string
IsNullable string
OrdinalPosition string
NumericPrecision string
NumericScale string
DatetimePrecision string
Namespace string
IsPartOfUniqueIndex bool
Config *Config
}
func NewPgSchemaColumn ¶
func NewPgSchemaColumn(config *Config) *PgSchemaColumn
func (*PgSchemaColumn) ToIcebergSchemaColumn ¶
func (pgSchemaColumn *PgSchemaColumn) ToIcebergSchemaColumn() *common.IcebergSchemaColumn
type PgSchemaTable ¶
func (PgSchemaTable) IcebergParentPartitionedTableName ¶
func (pgSchemaTable PgSchemaTable) IcebergParentPartitionedTableName() string
func (PgSchemaTable) IcebergTableName ¶
func (pgSchemaTable PgSchemaTable) IcebergTableName() string
public.table -> table CustomSchema.TableName -> CustomSchemaTableName
func (PgSchemaTable) String ¶
func (pgSchemaTable PgSchemaTable) String() string
func (PgSchemaTable) ToConfigArg ¶
func (pgSchemaTable PgSchemaTable) ToConfigArg() string
type Postgres ¶
type Postgres struct {
PostgresClient *common.PostgresClient
Config *Config
}
func NewPostgres ¶
func (*Postgres) CreateReplicationSlot ¶
func (*Postgres) PgSchemaColumns ¶
func (postgres *Postgres) PgSchemaColumns(pgSchemaTable PgSchemaTable, retryCount ...int) []PgSchemaColumn
func (*Postgres) ReplicationSlotExists ¶
func (*Postgres) SchemaTables ¶
func (postgres *Postgres) SchemaTables(schema string) []PgSchemaTable
type Syncer ¶
type Syncer struct {
Config *Config
Utils *SyncerUtils
StorageS3 *common.StorageS3
DuckdbClient *common.DuckdbClient
}
type SyncerFullRefresh ¶
type SyncerFullRefresh struct {
Config *Config
Utils *SyncerUtils
StorageS3 *common.StorageS3
DuckdbClient *common.DuckdbClient
}
func NewSyncerFullRefresh ¶
func NewSyncerFullRefresh(config *Config, utils *SyncerUtils, storageS3 *common.StorageS3, duckdbClient *common.DuckdbClient) *SyncerFullRefresh
func (*SyncerFullRefresh) Sync ¶
func (syncer *SyncerFullRefresh) Sync(postgres *Postgres, pgSchemaTables []PgSchemaTable)
type SyncerUtils ¶
type SyncerUtils struct {
Config *Config
StorageS3 *common.StorageS3
DuckdbClient *common.DuckdbClient
}
func NewSyncerUtils ¶
func NewSyncerUtils(config *Config, storageS3 *common.StorageS3, duckdbClient *common.DuckdbClient) *SyncerUtils
func (*SyncerUtils) DeleteOldTables ¶
func (utils *SyncerUtils) DeleteOldTables(keepIcebergTableNames common.Set[string])
func (*SyncerUtils) ShouldSyncTable ¶
func (utils *SyncerUtils) ShouldSyncTable(pgSchemaTable PgSchemaTable) bool
Click to show internal directories.
Click to hide internal directories.