При использовании загрузчика в веб-интерфейсе ViQube-Admin, таблица на ViQube удаляется полностью и загружается снова. Это не всегда приемлемо, т.к. во время этой процедуры могут, например, пропасть данные на дашборде, или загрузка может занять много времени, если таблица содержит большое количество данных. Выходом из данной ситуации является использование ViQube API для выполнения инкрементальной загрузки. Ниже приведен один из возможных вариантов решения данной проблемы.
Для выполнения инкрементальной загрузки:
Создайте загрузчик. Например:
curl --request POST \ --url https://URL/viqube/loaders \ --header 'Authorization: Bearer TOKEN' \ --header 'Content-Type: application/json' \ --header 'X-API-VERSION: 3.10' \ --data '{ "database": "ID_БАЗЫ", "options": { "query": "select ПОЛЕ1, ПОЛЕ2, ... FROM ТАБЛИЦА", "type": "JDBC", "connection": "jdbc CONNECTION STRING", "driver": "com.mysql.jdbc.Driver" }, "table": "ВИКУБ_ТАБЛИЦА_С_НАСТРОЕННЫМИ_ПОЛЯМИ_ПО_ЧИСЛУ_В_ЗАПРОСЕ" }'
Перед отправкой запроса необходимо получить токен аутентификации.
В ответе на запрос, среди прочего, вы получите идентификатор загрузчика. Например:
"id": 12
.Теперь, для обновления данных, вы можете выполнять загрузку в назначенное время:
curl --request POST \ --url https://URL/viqube/loaders/ID_ЗАГРУЗЧИКА/execute \ --header 'Authorization: Bearer ТОКЕН' \ --header 'Content-Type: application/json' \ --header 'X-API-VERSION: 3.10' \ --data '{ "command": "start" }'
Исходные данные
Таблица-приёмник ViQube testload
(id : длинное целое
modifiedDate : ДатаВремя,
value : целое)
Таблица-источник в postgres testload
(id : integer,
modifiedDate : timestamp without timezone,
value : integer)
Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.
import pandas as pd import requests from sqlalchemy import create_engine postgreHost = 'localhost' postgreDB = 'testdb' postgreUser = 'postgres' postgrePort = '5432' postgrePass = '******' psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}" viUrl = "https://your_url" viUser = "admin" viPass = "*****" viDB = "DB" viTable = "testload" viColumn = "modifiedDate" chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных #Запрос версии ViQube def apiGetViQubeVersion(url): response = requests.get(url+"/viqube/version").json() return response.get("apiStable") #Запрос токена def apiGetAuthToken(url, user, pwd): payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}" headers = { 'content-type': "application/x-www-form-urlencoded", 'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY=" } response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json() return response.get("access_token") #Псевдо SQL запрос к ViQube def apiGetQuery(url, headers, dbname, query): response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers) return response.json() #Удаление записей таблицы по списку идентификаторов первичного ключа def apiDeleteRecords(url, headers, dbname, tblname, recordIdList): response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=recordIdList, headers=headers) return response #Загрузка списка значений в таблицу def apiStoreRecords(url, headers, dbname, tblname, records): response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers) return response #Подготавка соединения с базой SQL engine = create_engine(psqlConnectionString) conn = engine.connect().execution_options(stream_results=True) #Подготовка к API запросам к ViQube _version = apiGetViQubeVersion(viUrl) _token = apiGetAuthToken(viUrl, viUser, viPass) _headers={ "Authorization": f"Bearer {_token}", "X-API-VERSION": f"{_version}" } #Запрос на выбор старшей даты в таблице в ViQube queryMax={ "from": viTable, "limit": 1, "orderby": [ { "column": viColumn, "function": "OFF", "order": "DESC" } ] } rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax) #Определение даты, с которой начнётся обновление данных lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z' #Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\'' #Постраничое добавление записей for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize): #Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str) cdfl = chunk_df.values.tolist() #Определение список Id, которые необходимо обновить listToDelete=chunk_df['id'].tolist() #удаление имеющиеся записи с в соответсвие списку идентификаторов if len(listToDelete): print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete)) #Формирование список для дабавления ds = {"columns":["id", "value", "modifiedDate"], "values": cdfl} #Загрузка данных в ViQube if len(cdfl): print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds)) conn.close()