Link Search Menu Expand Document

Загрузка данных

Содержание раздела
  1. Подготовка к загрузке
  2. Загрузка данных в логическую таблицу
  3. Загрузка данных в standalone-таблицу
  4. Примеры
    1. Загрузка в логическую таблицу
    2. Загрузка в standalone-таблицу

Система позволяет параллельно загружать большие объемы данных.

Можно загружать данные в логические таблицы и standalone-таблицы. Загрузка данных в логические и материализованные представления недоступна.

Под большим объемом данных подразумевается количество записей от нескольких сотен до нескольких миллионов. Для загрузки небольшого объема данных можно использовать функцию обновления данных.

Подготовка к загрузке

Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka не настроено автоматическое создание топиков, нужно создать топики вручную. Чтобы создать топик, следуйте любой из инструкций в документации Kafka:

Рекомендации о разделении данных по топикам см. в разделе Внешняя таблица.

Перед загрузкой данных подготовьте данные и логические сущности:

  1. Загрузите данные из внешней информационной системы в топик Kafka.
    Данные должны соответствовать формату загрузки данных.
  2. Создайте логическую таблицу или standalone-таблицу, если она еще не создана.
  3. Создайте внешнюю таблицу загрузки, если она еще не создана.
  4. Если нужно загрузить данные в standalone-таблицу, создайте внешнюю writable-таблицу.

Загрузка данных в логическую таблицу

Чтобы загрузить данные из внешней информационной системы в логическую таблицу:

  1. Выполните запрос BEGIN DELTA на открытие дельты, если она еще не открыта.
  2. Выполните запрос INSERT SELECT FROM upload_external_table на загрузку данных. В запросе нужно указать внешнюю таблицу загрузки, определяющую параметры загрузки.
  3. Если необходимо, загрузите или обновите другие данные.
    В открытой дельте можно выполнять множество запросов на обновление и загрузку данных. При этом в каждую логическую таблицу в одной дельте можно добавлять записи с разными значениями первичного ключа или полные дубликаты.
  4. Выполните запрос COMMIT DELTA для сохранения изменений и закрытия дельты.

При успешном выполнении действий состояние данных в логических таблицах обновляется, как описано в разделе Версионирование данных.

Пока дельта не закрыта, внесенные изменения можно отменить запросом ROLLBACK DELTA.

Незавершенную операцию по загрузке данных можно перезапустить или отменить. Подробнее о способах обработки незавершенных операций см. в разделе Управление операциями записи.

Созданные внешние таблицы загрузки можно использовать повторно или удалить.

Загрузка данных в standalone-таблицу

Чтобы загрузить данные из внешней информационной системы в standalone-таблицу, выполните запрос INSERT SELECT FROM upload_external_table на загрузку данных. В запросе нужно указать внешнюю таблицу загрузки, определяющую параметры загрузки, а также внешнюю writable-таблицу, ссылающуюся на целевую standalone-таблицу.

При загрузке данных в standalone-таблицу нужно учитывать ограничения таблицы в конкретной СУБД.

Примеры

Загрузка в логическую таблицу

-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;

-- создание логической таблицы
CREATE TABLE sales (
  id INT NOT NULL,
  transaction_date TIMESTAMP NOT NULL,
  product_code VARCHAR(256) NOT NULL,
  product_units INT NOT NULL,
  store_id INT NOT NULL,
  description VARCHAR(256),
  PRIMARY KEY (id)
)
DISTRIBUTED BY (id);

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
  id INT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units INT,
  store_id INT,
  description VARCHAR(256)
)
LOCATION  'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;

-- открытие новой (горячей) дельты
BEGIN DELTA;

-- запуск загрузки данных в логическую таблицу
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- закрытие дельты (фиксация изменений)
COMMIT DELTA;

Загрузка в standalone-таблицу

Загрузка в standalone-таблицу ADP, созданную через систему (см. параметр auto.create.table.enable=true):

-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в ADP
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
  id INT NOT NULL,
  client_id INT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR,
  PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
  id INT NOT NULL,
  client_id INT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR
) 
LOCATION  'kafka://zk1:2181,zk2:2181,zk3:2181/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')

-- запуск загрузки данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;

Подробнее об опции auto.create.table.enable см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.

Загрузка в существующую standalone-таблицу ADG:

-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу ADG
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adg (
id INT NOT NULL,
agreement_id INT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR,
bucket_id INT NOT NULL
)
LOCATION 'core:adg://dtm__marketing__payments';

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id INT NOT NULL,
agreement_id INT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION  'kafka://$kafka/payments'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');

-- запуск загрузки данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adg
INSERT INTO marketing.payments_ext_write_adg SELECT *, 0 as bucket_id FROM marketing.payments_ext_upload;