Загрузка данных
Для загрузки данных нужен существующий топик Kafka. Если в брокере сообщений Kafka настроено автоматическое создание топиков, то дополнительные действия не требуются. Иначе необходимо создать топик, если он еще не создан. Подробнее о создании топиков см. в документации Kafka:
- раздел Quick Start,
- раздел Adding and removing topics.
Примечание: загрузка данных возможна только в логическую таблицу. Загрузка данных в логические и материализованные представления недоступна.
Чтобы загрузить данные из внешней информационной системы в логическую таблицу:
- Загрузите данные из внешней информационной системы в топик Kafka.
Данные должны иметь формат, описанный в разделе Формат загрузки данных. - Создайте логическую таблицу, если она еще не создана.
- Создайте внешнюю таблицу загрузки, если она еще не создана.
- Выполните запрос BEGIN DELTA на открытие дельты, если она еще не открыта.
- Выполните запрос INSERT INTO logical_table на загрузку данных из топика в логическую таблицу. В запросе нужно указать внешнюю таблицу загрузки, определяющую параметры загрузки.
После успешного окончания загрузки данных система вернет ответ с пустым объектом ResultSet. - Если необходимо, загрузите другие данные, например в другие логические таблицы.
В рамках одной открытой дельты можно выполнять произвольное количество запросов INSERT INTO logical_table, при этом не допускается загрузка различных состояний одного и того же объекта. - Выполните запрос COMMIT DELTA для сохранения изменений и закрытия дельты.
При успешном выполнении последовательности действий загруженные данные сохраняются в качестве актуальных, а предыдущая версия данных, если такая была, становится архивной. Подробнее о версионировании см. в разделе Версионирование данных.
Пока дельта не закрыта, все изменения данных, выполненные в рамках нее, можно отменить (см. ROLLBACK DELTA). Созданные внешние таблицы загрузки можно использовать повторно или удалить.
Пример
-- выбор логической базы данных sales в качестве базы данных по умолчанию
USE sales
-- создание логической таблицы sales
CREATE TABLE sales (
identification_number INT NOT NULL,
transaction_date TIMESTAMP NOT NULL,
product_code VARCHAR(256) NOT NULL,
product_units INT NOT NULL,
store_id INT NOT NULL,
description VARCHAR(256),
PRIMARY KEY (identification_number)
)
DISTRIBUTED BY (identification_number)
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
identification_number INT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units INT,
store_id INT,
description VARCHAR(256)
)
LOCATION 'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000
-- открытие новой (горячей) дельты
BEGIN DELTA
-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales.sales_ext_upload
-- закрытие дельты (фиксация изменений)
COMMIT DELTA