Инкрементальная загрузка
Тип статьи | Пример загрузки данных в ViQube на основании даты модификации записи |
---|---|
Компетенции | Python разработчик, аналитик со знанием Python |
Необходимые права | Права на запись в таблицу ViQube |
Версия платформы | 2.26 |
Статус | бета |
Сложность | надо попотеть |
Полезные ссылки | |
Дополнительные сведения | Ubuntu 18.04, Python 3.8, Pandas 1.3.4, Sqlalchemy 1.4.27, Requests 2.26.0 |
Подробное описание
При использовании загрузчика в веб интерфейсе ViQube-Admin таблица на Викубе удаляется полностью и загружается снова. Не всегда это приемлемо, например, во время этой процедуры пропадают данные на дашборде, или загрузка может занимать долгое время, если таблица очень тяжелая. Выход - использования Viqube API для реализации своей инкрементальной загрузки. Приведённый пример демонстрирует один из возможных вариантов решения на основе Python- скрипта.
Исходные данные
Таблица-приёмник ViQube testload
(id : длинное целое
modifiedDate : ДатаВремя,
value : целое)
Таблица-источник в postgres testload
(id : integer,
modifiedDate : timestamp without timezone,
value : integer)
Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.
import pandas as pd
import requests
from sqlalchemy import create_engine
postgreHost = 'localhost'
postgreDB = 'testdb'
postgreUser = 'postgres'
postgrePort = '5432'
postgrePass = '******'
psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}"
viUrl = "https://your_url"
viUser = "admin"
viPass = "*****"
viDB = "DB"
viTable = "testload"
viColumn = "modifiedDate"
chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных
#Запрос версии ViQube
def apiGetViQubeVersion(url):
response = requests.get(url+"/viqube/version").json()
return response.get("apiStable")
#Запрос токена
def apiGetAuthToken(url, user, pwd):
payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}"
headers = {
'content-type': "application/x-www-form-urlencoded",
'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY="
}
response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json()
return response.get("access_token")
#Псевдо SQL запрос к ViQube
def apiGetQuery(url, headers, dbname, query):
response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers)
return response.json()
#Удаление записей таблицы по списку идентификаторов первичного ключа
def apiDeleteRecords(url, headers, dbname, tblname, recordIdList):
response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records",
json=recordIdList, headers=headers)
return response
#Загрузка списка значений в таблицу
def apiStoreRecords(url, headers, dbname, tblname, records):
response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers)
return response
#Подготавка соединения с базой SQL
engine = create_engine(psqlConnectionString)
conn = engine.connect().execution_options(stream_results=True)
#Подготовка к API запросам к ViQube
_version = apiGetViQubeVersion(viUrl)
_token = apiGetAuthToken(viUrl, viUser, viPass)
_headers={
"Authorization": f"Bearer {_token}",
"X-API-VERSION": f"{_version}"
}
#Запрос на выбор старшей даты в таблице в ViQube
queryMax={
"from": viTable,
"limit": 1,
"orderby": [
{
"column": viColumn,
"function": "OFF",
"order": "DESC"
}
]
}
rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax)
#Определение даты, с которой начнётся обновление данных
lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z'
#Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты
sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\''
#Постраничое добавление записей
for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize):
#Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя
chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str)
cdfl = chunk_df.values.tolist()
#Определение список Id, которые необходимо обновить
listToDelete=chunk_df['id'].tolist()
#удаление имеющиеся записи с в соответсвие списку идентификаторов
if len(listToDelete):
print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete))
#Формирование список для дабавления
ds = {"columns":["id", "value", "modifiedDate"],
"values": cdfl}
#Загрузка данных в ViQube
if len(cdfl):
print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds))
conn.close()