/
Инкрементальная загрузка

Инкрементальная загрузка

Тип статьи

Пример загрузки данных в ViQube на основании даты модификации записи

Компетенции

Python разработчик, аналитик со знанием Python

Необходимые права

Права на запись в таблицу ViQube

Версия платформы

2.26

Статус

бета

Сложность

надо попотеть

Полезные ссылки

Дополнительные сведения

Ubuntu 18.04, Python 3.8, Pandas 1.3.4, Sqlalchemy 1.4.27, Requests 2.26.0

 

Подробное описание

При использовании загрузчика в веб интерфейсе ViQube-Admin таблица на Викубе удаляется полностью и загружается снова. Не всегда это приемлемо, например, во время этой процедуры пропадают данные на дашборде, или загрузка может занимать долгое время, если таблица очень тяжелая. Выход - использования Viqube API для реализации своей инкрементальной загрузки. Приведённый пример демонстрирует один из возможных вариантов решения на основе Python- скрипта.

Исходные данные

Таблица-приёмник ViQube testload
(id : длинное целое
modifiedDate : ДатаВремя,
value : целое)

Таблица-источник в postgres testload
(id : integer,
modifiedDate : timestamp without timezone,
value : integer)

 

Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.

import pandas as pd import requests from sqlalchemy import create_engine postgreHost = 'localhost' postgreDB = 'testdb' postgreUser = 'postgres' postgrePort = '5432' postgrePass = '******' psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}" viUrl = "https://your_url" viUser = "admin" viPass = "*****" viDB = "DB" viTable = "testload" viColumn = "modifiedDate" chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных #Запрос версии ViQube def apiGetViQubeVersion(url): response = requests.get(url+"/viqube/version").json() return response.get("apiStable") #Запрос токена def apiGetAuthToken(url, user, pwd): payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}" headers = { 'content-type': "application/x-www-form-urlencoded", 'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY=" } response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json() return response.get("access_token") #Псевдо SQL запрос к ViQube def apiGetQuery(url, headers, dbname, query): response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers) return response.json() #Удаление записей таблицы по списку идентификаторов первичного ключа def apiDeleteRecords(url, headers, dbname, tblname, recordIdList): response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=recordIdList, headers=headers) return response #Загрузка списка значений в таблицу def apiStoreRecords(url, headers, dbname, tblname, records): response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers) return response #Подготавка соединения с базой SQL engine = create_engine(psqlConnectionString) conn = engine.connect().execution_options(stream_results=True) #Подготовка к API запросам к ViQube _version = apiGetViQubeVersion(viUrl) _token = apiGetAuthToken(viUrl, viUser, viPass) _headers={ "Authorization": f"Bearer {_token}", "X-API-VERSION": f"{_version}" } #Запрос на выбор старшей даты в таблице в ViQube queryMax={ "from": viTable, "limit": 1, "orderby": [ { "column": viColumn, "function": "OFF", "order": "DESC" } ] } rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax) #Определение даты, с которой начнётся обновление данных lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z' #Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\'' #Постраничое добавление записей for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize): #Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str) cdfl = chunk_df.values.tolist() #Определение список Id, которые необходимо обновить listToDelete=chunk_df['id'].tolist() #удаление имеющиеся записи с в соответсвие списку идентификаторов if len(listToDelete): print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete)) #Формирование список для дабавления ds = {"columns":["id", "value", "modifiedDate"], "values": cdfl} #Загрузка данных в ViQube if len(cdfl): print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds)) conn.close()

Related content

Инкрементальная загрузка
Инкрементальная загрузка
More like this
Инкрементальная загрузка
Инкрементальная загрузка
More like this
Инкрементальная загрузка
Инкрементальная загрузка
More like this
Инкрементальная загрузка
Инкрементальная загрузка
More like this
Инкрементальная загрузка
Инкрементальная загрузка
More like this
Фильтрация числовых показателей
Фильтрация числовых показателей
More like this