Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made initial table synchronization parallel. #198

Open
wants to merge 2 commits into
base: REL2_x_STABLE
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 221 additions & 2 deletions expected/column_filter.out
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ CREATE TABLE public.basic_dml (
data text,
something interval
);
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml |
(1 row)

INSERT INTO basic_dml(other, data, something)
VALUES (5, 'foo', '1 minute'::interval),
(4, 'bar', '12 weeks'::interval),
Expand All @@ -17,28 +24,69 @@ VALUES (5, 'foo', '1 minute'::interval),
(1, NULL, NULL);
\c :subscriber_dsn
-- create table on subscriber to receive replicated filtered data from provider
-- there are some extra columns too.
-- there are some extra columns too, and we omit 'other' as a non-replicated
-- table on upstream only.
CREATE TABLE public.basic_dml (
id serial primary key,
data text,
something interval,
subonly integer,
subonly_def integer DEFAULT 99
);
SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml', ARRAY['default']);
SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+-----------+-----------------------------------------+----------------
public | basic_dml | {id,data,something,subonly,subonly_def} | f
(1 row)

SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml |
(1 row)

\c :provider_dsn
-- Fails: the column filter list must include the key
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something}');
ERROR: REPLICA IDENTITY columns must be replicated
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml |
(1 row)

-- Fails: the column filter list may not include cols that are not in the table
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something, nosuchcol}');
ERROR: table public.basic_dml does not have column nosuchcol
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml |
(1 row)

-- At provider, add table to replication set, with filtered columns
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data, something}');
replication_set_add_table
---------------------------
t
(1 row)

SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml | default
(1 row)

SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+-----------+---------------------+----------------
public | basic_dml | {id,data,something} | f
(1 row)

SELECT id, data, something FROM basic_dml ORDER BY id;
id | data | something
----+------+------------------
Expand Down Expand Up @@ -71,6 +119,13 @@ SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_tab
public | basic_dml | {id,data,something,subonly,subonly_def} | f
(1 row)

SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml |
(1 row)

-- data should get replicated to subscriber
SELECT id, data, something FROM basic_dml ORDER BY id;
id | data | something
Expand All @@ -90,14 +145,49 @@ CREATE TABLE public.basic_oids_dml (
data text,
something interval
) with oids ;
SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+----------------+---------------------------+----------------
public | basic_oids_dml | {id,other,data,something} | f
(1 row)

SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_oids_dml'::regclass;
nspname | relname | set_name
---------+----------------+----------
public | basic_oids_dml |
(1 row)

-- Fails: cannot use system column 'oid' explicitly
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{oid, id, data, something}');
ERROR: table public.basic_oids_dml does not have column oid
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_oids_dml'::regclass;
nspname | relname | set_name
---------+----------------+----------
public | basic_oids_dml |
(1 row)

-- WITH OIDS table OK
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{id, data, something}');
replication_set_add_table
---------------------------
t
(1 row)

SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+----------------+---------------------+----------------
public | basic_oids_dml | {id,data,something} | f
(1 row)

SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_oids_dml'::regclass;
nspname | relname | set_name
---------+----------------+----------
public | basic_oids_dml | default
(1 row)

SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------
Expand All @@ -120,6 +210,12 @@ VALUES (5, 'foo', '1 minute'::interval),
(3, 'baz', '2 years 1 hour'::interval),
(2, 'qux', '8 months 2 days'::interval),
(1, NULL, NULL);
SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+----------------+---------------------+----------------
public | basic_oids_dml | {id,data,something} | f
(1 row)

UPDATE basic_oids_dml SET other = '40', data = NULL, something = '3 days'::interval WHERE id = 4;
SELECT * from basic_oids_dml ORDER BY id;
id | other | data | something
Expand Down Expand Up @@ -149,6 +245,129 @@ SELECT id, data, something FROM basic_oids_dml ORDER BY id;
(5 rows)

\c :provider_dsn
-- Adding a table that's already selectively replicated fails
\set VERBOSITY terse
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true);
ERROR: duplicate key value violates unique constraint "replication_set_table_pkey"
\set VERBOSITY default
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml | default
(1 row)

-- So does trying to re-add to change the column set
\set VERBOSITY terse
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data}');
ERROR: duplicate key value violates unique constraint "replication_set_table_pkey"
\set VERBOSITY default
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml | default
(1 row)

-- Shouldn't be able to drop a replicated col in a rel
-- but due to RM#5916 you can
BEGIN;
ALTER TABLE public.basic_dml DROP COLUMN data;
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml | default
(1 row)

SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+----------------+---------------------+----------------
public | basic_oids_dml | {id,data,something} | f
(1 row)

ROLLBACK;
-- Even when wrapped (RM#5916)
BEGIN;
SELECT pglogical.replicate_ddl_command($$
ALTER TABLE public.basic_dml DROP COLUMN data;
$$);
replicate_ddl_command
-----------------------
t
(1 row)

SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml | default
(1 row)

SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+----------------+---------------------+----------------
public | basic_oids_dml | {id,data,something} | f
(1 row)

ROLLBACK;
-- CASCADE should be allowed though
BEGIN;
ALTER TABLE public.basic_dml DROP COLUMN data CASCADE;
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml | default
(1 row)

SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+----------------+---------------------+----------------
public | basic_oids_dml | {id,data,something} | f
(1 row)

SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml | default
(1 row)

ROLLBACK;
BEGIN;
SELECT pglogical.replicate_ddl_command($$
ALTER TABLE public.basic_dml DROP COLUMN data CASCADE;
$$);
replicate_ddl_command
-----------------------
t
(1 row)

SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']);
nspname | relname | att_list | has_row_filter
---------+----------------+---------------------+----------------
public | basic_oids_dml | {id,data,something} | f
(1 row)

SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relid = 'public.basic_dml'::regclass;
nspname | relname | set_name
---------+-----------+----------
public | basic_dml | default
(1 row)

ROLLBACK;
-- We can drop a non-replicated col. We must not replicate this DDL because in
-- this case the downstream doesn't have the 'other' column and apply will
-- fail.
ALTER TABLE public.basic_dml DROP COLUMN other;
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------

(1 row)

\set VERBOSITY terse
SELECT pglogical.replicate_ddl_command($$
DROP TABLE public.basic_dml CASCADE;
Expand Down
48 changes: 47 additions & 1 deletion expected/replication_set.out
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', repli
ERROR: replication set repset_replicate_instrunc cannot be altered to replicate UPDATEs or DELETEs because it contains tables without PRIMARY KEY
SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', replicate_delete := true);
ERROR: replication set repset_replicate_instrunc cannot be altered to replicate UPDATEs or DELETEs because it contains tables without PRIMARY KEY
-- Adding already-added fails
\set VERBOSITY terse
SELECT * FROM pglogical.replication_set_add_table('repset_replicate_all', 'public.test_publicschema');
ERROR: duplicate key value violates unique constraint "replication_set_table_pkey"
\set VERBOSITY default
-- check the replication sets
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3;
Expand Down Expand Up @@ -157,6 +162,48 @@ SELECT nspname, relname, set_name FROM pglogical.tables
--too short
SELECT pglogical.create_replication_set('');
ERROR: replication set name cannot be empty
-- Can't drop table while it's in a repset
DROP TABLE public.test_publicschema;
ERROR: cannot drop table test_publicschema because other objects depend on it
DETAIL: table test_publicschema membership in replication set default_insert_only depends on table test_publicschema
table test_publicschema membership in replication set repset_replicate_all depends on table test_publicschema
HINT: Use DROP ... CASCADE to drop the dependent objects too.
-- Can't drop table while it's in a repset
BEGIN;
SELECT pglogical.replicate_ddl_command($$
DROP TABLE public.test_publicschema;
$$);
ERROR: cannot drop table public.test_publicschema because other objects depend on it
DETAIL: table public.test_publicschema membership in replication set default_insert_only depends on table public.test_publicschema
table public.test_publicschema membership in replication set repset_replicate_all depends on table public.test_publicschema
HINT: Use DROP ... CASCADE to drop the dependent objects too.
CONTEXT: during execution of queued SQL statement:
DROP TABLE public.test_publicschema;

ROLLBACK;
-- Can CASCADE though, even outside ddlrep
BEGIN;
DROP TABLE public.test_publicschema CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table test_publicschema membership in replication set default_insert_only
drop cascades to table test_publicschema membership in replication set repset_replicate_all
ROLLBACK;
-- ... and can drop after repset removal
SELECT pglogical.replication_set_remove_table('repset_replicate_all', 'public.test_publicschema');
replication_set_remove_table
------------------------------
t
(1 row)

SELECT pglogical.replication_set_remove_table('default_insert_only', 'public.test_publicschema');
replication_set_remove_table
------------------------------
t
(1 row)

BEGIN;
DROP TABLE public.test_publicschema;
ROLLBACK;
\set VERBOSITY terse
SELECT pglogical.replicate_ddl_command($$
DROP TABLE public.test_publicschema CASCADE;
Expand All @@ -165,7 +212,6 @@ SELECT pglogical.replicate_ddl_command($$
DROP TABLE public.test_nopkey CASCADE;
DROP TABLE public.test_unlogged CASCADE;
$$);
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to table normalschema.test_normalschema
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to table "strange.schema-IS".test_strangeschema
Expand Down
Loading