Инкрементальная загрузка

Тип статьи

Пример загрузки данных в ViQube на основании даты модификации записи

Компетенции

Python разработчик, аналитик со знанием Python

Необходимые права

Права на запись в таблицу ViQube

Версия платформы

2.26

Статус

бета

Сложность

надо попотеть

Полезные ссылки

Дополнительные сведения

Ubuntu 18.04, Python 3.8, Pandas 1.3.4, Sqlalchemy 1.4.27, Requests 2.26.0

 

Подробное описание

При использовании загрузчика в веб интерфейсе ViQube-Admin таблица на Викубе удаляется полностью и загружается снова. Не всегда это приемлемо, например, во время этой процедуры пропадают данные на дашборде, или загрузка может занимать долгое время, если таблица очень тяжелая. Выход - использования Viqube API для реализации своей инкрементальной загрузки. Приведённый пример демонстрирует один из возможных вариантов решения на основе Python- скрипта.

Исходные данные

Таблица-приёмник ViQube testload
(id : длинное целое
modifiedDate : ДатаВремя,
value : целое)

Таблица-источник в postgres testload
(id : integer,
modifiedDate : timestamp without timezone,
value : integer)

 

Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.

import pandas as pd import requests from sqlalchemy import create_engine postgreHost = 'localhost' postgreDB = 'testdb' postgreUser = 'postgres' postgrePort = '5432' postgrePass = '******' psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}" viUrl = "https://your_url" viUser = "admin" viPass = "*****" viDB = "DB" viTable = "testload" viColumn = "modifiedDate" chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных #Запрос версии ViQube def apiGetViQubeVersion(url): response = requests.get(url+"/viqube/version").json() return response.get("apiStable") #Запрос токена def apiGetAuthToken(url, user, pwd): payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}" headers = { 'content-type': "application/x-www-form-urlencoded", 'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY=" } response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json() return response.get("access_token") #Псевдо SQL запрос к ViQube def apiGetQuery(url, headers, dbname, query): response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers) return response.json() #Удаление записей таблицы по списку идентификаторов первичного ключа def apiDeleteRecords(url, headers, dbname, tblname, recordIdList): response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=recordIdList, headers=headers) return response #Загрузка списка значений в таблицу def apiStoreRecords(url, headers, dbname, tblname, records): response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers) return response #Подготавка соединения с базой SQL engine = create_engine(psqlConnectionString) conn = engine.connect().execution_options(stream_results=True) #Подготовка к API запросам к ViQube _version = apiGetViQubeVersion(viUrl) _token = apiGetAuthToken(viUrl, viUser, viPass) _headers={ "Authorization": f"Bearer {_token}", "X-API-VERSION": f"{_version}" } #Запрос на выбор старшей даты в таблице в ViQube queryMax={ "from": viTable, "limit": 1, "orderby": [ { "column": viColumn, "function": "OFF", "order": "DESC" } ] } rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax) #Определение даты, с которой начнётся обновление данных lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z' #Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\'' #Постраничое добавление записей for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize): #Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str) cdfl = chunk_df.values.tolist() #Определение список Id, которые необходимо обновить listToDelete=chunk_df['id'].tolist() #удаление имеющиеся записи с в соответсвие списку идентификаторов if len(listToDelete): print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete)) #Формирование список для дабавления ds = {"columns":["id", "value", "modifiedDate"], "values": cdfl} #Загрузка данных в ViQube if len(cdfl): print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds)) conn.close()