postgres

package module
v0.0.0-...-67ab311 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2026 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SyncModeFullRefresh SyncMode = "FULL_REFRESH"
	SyncModeCDC         SyncMode = "CDC"
	SyncModeIncremental SyncMode = "INCREMENTAL"

	ENV_DESTINATION_SCHEMA_NAME = "DESTINATION_SCHEMA_NAME"

	ENV_DATABASE_URL          = "SOURCE_POSTGRES_DATABASE_URL"
	ENV_SYNC_MODE             = "SOURCE_POSTGRES_SYNC_MODE"
	ENV_INCLUDE_TABLES        = "SOURCE_POSTGRES_INCLUDE_TABLES"
	ENV_EXCLUDE_TABLES        = "SOURCE_POSTGRES_EXCLUDE_TABLES"
	ENV_CURSOR_COLUMNS        = "SOURCE_POSTGRES_CURSOR_COLUMNS"        // Incremental sync
	ENV_REPLICATION_SLOT      = "SOURCE_POSTGRES_REPLICATION_SLOT"      // CDC sync
	ENV_IGNORE_UPDATE_COLUMNS = "SOURCE_POSTGRES_IGNORE_UPDATE_COLUMNS" // CDC sync

	// CDC sync
	ENV_NATS_URL                   = "NATS_URL"
	ENV_NATS_STREAM                = "NATS_JETSTREAM_STREAM"
	ENV_NATS_SUBJECT               = "NATS_JETSTREAM_SUBJECT"
	ENV_NATS_CONSUMER_NAME         = "NATS_JETSTREAM_CONSUMER_NAME"
	ENV_NATS_FETCH_TIMEOUT_SECONDS = "NATS_FETCH_TIMEOUT_SECONDS"

	DEFAULT_NATS_FETCH_TIMEOUT_SECONDS = 30
)
View Source
const (
	PG_TRUE            = "YES"
	PG_DATA_TYPE_ARRAY = "ARRAY"
)
View Source
const (
	PG_SCHEMA_PUBLIC     = "public"
	PG_SCHEMA_PG_CATALOG = "pg_catalog"

	PG_CONNECTION_TIMEOUT = 30 * time.Second
	PG_SESSION_TIMEOUT    = "2h"

	POSTGRES_MAX_RETRY_COUNT = 1
)

Variables

This section is empty.

Functions

func RegisterFlags

func RegisterFlags()

Types

type Config

type Config struct {
	CommonConfig          *common.CommonConfig
	DestinationSchemaName string

	SyncMode                    SyncMode
	DatabaseUrl                 string
	IncludeTables               common.Set[string]
	ExcludeTables               common.Set[string]
	CursorColumnNameByTableName map[string]string  // Incremental sync
	ReplicationSlot             string             // CDC sync
	IgnoreUpdateColumns         common.Set[string] // CDC sync
	Nats                        NatsConfig         // CDC sync
}

func LoadConfig

func LoadConfig() *Config

type DebeziumData

type DebeziumData struct {
	Operation string `json:"op"`
}

type DebeziumOperation

type DebeziumOperation string
const (
	InsertOperation   DebeziumOperation = "c"
	UpdateOperation   DebeziumOperation = "u"
	DeleteOperation   DebeziumOperation = "d"
	MessageOperation  DebeziumOperation = "m"
	SnapshotOperation DebeziumOperation = "r"
)

type DebeziumRowData

type DebeziumRowData struct {
	Operation string                 `json:"op"`
	Before    map[string]interface{} `json:"before"`
	After     map[string]interface{} `json:"after"`
	Source    DebeziumRowSource      `json:"source"`
}

type DebeziumRowSource

type DebeziumRowSource struct {
	Schema        string `json:"schema"`
	Table         string `json:"table"`
	CommittedAtNs int64  `json:"ts_ns"`
	Position      int64  `json:"lsn"`
}

type Nats

type Nats struct {
	Config *Config
}

func NewNats

func NewNats(config *Config) *Nats

func (*Nats) Stream

func (nats *Nats) Stream(ctx context.Context) js.Stream

type NatsConfig

type NatsConfig struct {
	Url                 string
	Stream              string
	Subject             string
	ConsumerName        string
	FetchTimeoutSeconds int
}

type PgSchemaColumn

type PgSchemaColumn struct {
	ColumnName          string
	DataType            string
	UdtName             string
	IsNullable          string
	OrdinalPosition     string
	NumericPrecision    string
	NumericScale        string
	DatetimePrecision   string
	Namespace           string
	IsPartOfUniqueIndex bool
	Config              *Config
}

func NewPgSchemaColumn

func NewPgSchemaColumn(config *Config) *PgSchemaColumn

func (*PgSchemaColumn) ToIcebergSchemaColumn

func (pgSchemaColumn *PgSchemaColumn) ToIcebergSchemaColumn() *common.IcebergSchemaColumn

type PgSchemaTable

type PgSchemaTable struct {
	Schema                 string
	Table                  string
	ParentPartitionedTable string
}

func (PgSchemaTable) IcebergParentPartitionedTableName

func (pgSchemaTable PgSchemaTable) IcebergParentPartitionedTableName() string

func (PgSchemaTable) IcebergTableName

func (pgSchemaTable PgSchemaTable) IcebergTableName() string

public.table -> table CustomSchema.TableName -> CustomSchemaTableName

func (PgSchemaTable) String

func (pgSchemaTable PgSchemaTable) String() string

func (PgSchemaTable) ToConfigArg

func (pgSchemaTable PgSchemaTable) ToConfigArg() string

type Postgres

type Postgres struct {
	PostgresClient *common.PostgresClient
	Config         *Config
}

func NewPostgres

func NewPostgres(config *Config) *Postgres

func (*Postgres) Close

func (postgres *Postgres) Close()

func (*Postgres) CreateReplicationSlot

func (postgres *Postgres) CreateReplicationSlot(slotName string)

func (*Postgres) PgSchemaColumns

func (postgres *Postgres) PgSchemaColumns(pgSchemaTable PgSchemaTable, retryCount ...int) []PgSchemaColumn

func (*Postgres) Reconnect

func (postgres *Postgres) Reconnect()

func (*Postgres) ReplicationSlotExists

func (postgres *Postgres) ReplicationSlotExists(slotName string) bool

func (*Postgres) SchemaTables

func (postgres *Postgres) SchemaTables(schema string) []PgSchemaTable

func (*Postgres) Schemas

func (postgres *Postgres) Schemas() []string

type SyncMode

type SyncMode string

type Syncer

type Syncer struct {
	Config       *Config
	Utils        *SyncerUtils
	StorageS3    *common.StorageS3
	DuckdbClient *common.DuckdbClient
}

func NewSyncer

func NewSyncer(config *Config) *Syncer

func (*Syncer) Sync

func (syncer *Syncer) Sync()

type SyncerFullRefresh

type SyncerFullRefresh struct {
	Config       *Config
	Utils        *SyncerUtils
	StorageS3    *common.StorageS3
	DuckdbClient *common.DuckdbClient
}

func NewSyncerFullRefresh

func NewSyncerFullRefresh(config *Config, utils *SyncerUtils, storageS3 *common.StorageS3, duckdbClient *common.DuckdbClient) *SyncerFullRefresh

func (*SyncerFullRefresh) Sync

func (syncer *SyncerFullRefresh) Sync(postgres *Postgres, pgSchemaTables []PgSchemaTable)

type SyncerUtils

type SyncerUtils struct {
	Config       *Config
	StorageS3    *common.StorageS3
	DuckdbClient *common.DuckdbClient
}

func NewSyncerUtils

func NewSyncerUtils(config *Config, storageS3 *common.StorageS3, duckdbClient *common.DuckdbClient) *SyncerUtils

func (*SyncerUtils) DeleteOldTables

func (utils *SyncerUtils) DeleteOldTables(keepIcebergTableNames common.Set[string])

func (*SyncerUtils) ShouldSyncTable

func (utils *SyncerUtils) ShouldSyncTable(pgSchemaTable PgSchemaTable) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL