Тип статьи

Пример загрузки данных в ViQube на основании даты модификации записи

Компетенции

Python разработчик, аналитик со знанием Python

Необходимые права

Права на запись в таблицу ViQube

Версия платформы

2.26

Статус

Сложность(синяя звезда)

надо попотеть

Полезные ссылки

Ссылки на похожие или дополняющие статьи будь то в Confluence или просто в Интернете.

Дополнительные сведения

Ubuntu 18.04, Python 3.8, Pandas 1.3.4, Sqlalchemy 1.4.27, Requests 2.26.0

(синяя звезда) Подробное описание

При использовании загрузчика в веб интерфейсе ViQube-Admin таблица на Викубе удаляется полностью и загружается снова. Не всегда это приемлемо, например, во время этой процедуры пропадают данные на дашборде, или загрузка может занимать долгое время, если таблица очень тяжелая. Выход - использования Viqube API для реализации своей инкрементальной загрузки. Приведённый пример демонстрирует один из возможных вариантов решения на основе Python- скрипта.

Исходные данные

Таблица-приёмник ViQube testload
(id : длинное целое
modifiedDate : ДатаВремя,
value : целое)

Таблица-источник в postgres testload
(id : integer,
modifiedDate : timestamp without timezone,
value : integer)

note

Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.

Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.

import pandas as pd
import requests
from sqlalchemy import create_engine

postgreHost = 'localhost'
postgreDB   = 'testdb'
postgreUser = 'postgres'
postgrePort = '5432'
postgrePass = '******'
psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}"

viUrl    = "https://your_url"
viUser   = "admin"
viPass   = "*****"
viDB     = "DB"
viTable  = "testload"
viColumn = "modifiedDate"

chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных

#Запрос версии ViQube
def apiGetViQubeVersion(url):
    response = requests.get(url+"/viqube/version").json()
    return response.get("apiStable")
#Запрос токена
def apiGetAuthToken(url, user, pwd):
    payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}"
    headers = {
        'content-type': "application/x-www-form-urlencoded",
        'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY="
    }
    response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json()
    return response.get("access_token")

#Псевдо SQL запрос к ViQube
def apiGetQuery(url, headers, dbname, query):
    response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers)
    return response.json()
#Удаление записей таблицы по списку идентификаторов первичного ключа
def apiDeleteRecords(url, headers, dbname, tblname, recordIdList):
    response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records",
           json=recordIdList, headers=headers)
    return response
#Загрузка списка значений в таблицу
def apiStoreRecords(url, headers, dbname, tblname, records):
    response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers)
    return response

#Подготавка соединения с базой SQL
engine = create_engine(psqlConnectionString)
conn = engine.connect().execution_options(stream_results=True)
#Подготовка к API запросам к ViQube 
_version = apiGetViQubeVersion(viUrl)  
_token = apiGetAuthToken(viUrl, viUser, viPass)
_headers={
        "Authorization": f"Bearer {_token}",
        "X-API-VERSION": f"{_version}"
}
#Запрос на выбор старшей даты в таблице в ViQube
queryMax={
  "from": viTable,
  "limit": 1,
  "orderby": [
    {
      "column": viColumn,
      "function": "OFF",
      "order": "DESC"
    }
  ]
}
rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax)
#Определение даты, с которой начнётся обновление данных
lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z'
#Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты
sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\''
#Постраничое добавление записей
for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize):
    #Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя
    chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str)
    cdfl = chunk_df.values.tolist()
    #Определение список Id, которые необходимо обновить
    listToDelete=chunk_df['id'].tolist()
    #удаление имеющиеся записи с в соответсвие списку идентификаторов
    if len(listToDelete):
        print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete))
    #Формирование список для дабавления
    ds = {"columns":["id", "value", "modifiedDate"],
          "values": cdfl}
    #Загрузка данных в ViQube
    if len(cdfl):
        print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds))

conn.close()