Перейти к концу метаданных
Переход к началу метаданных

You are viewing an old version of this content. View the current version.

Сравнить с текущим View Version History

« Предыдущий Версия 5 Следующий »

При использовании загрузчика в веб-интерфейсе ViQube-Admin, таблица на ViQube удаляется полностью и загружается снова. Это не всегда приемлемо, т.к. во время этой процедуры могут, например, пропасть данные на дашборде, или загрузка может занять много времени, если таблица содержит большое количество данных. Выходом из данной ситуации является использование ViQube API для выполнения инкрементальной загрузки. Ниже приведен один из возможных вариантов решения данной проблемы.

Для выполнения инкрементальной загрузки:

  1. Создайте загрузчик. Например:

    curl --request POST \
      --url https://URL/viqube/loaders \
      --header 'Authorization: Bearer TOKEN' \
      --header 'Content-Type: application/json' \
      --header 'X-API-VERSION: 3.10' \
      --data '{
            "database": "ID_БАЗЫ",
            "options": {
                "query": "select ПОЛЕ1, ПОЛЕ2, ... FROM ТАБЛИЦА",
                "type": "JDBC",
                "connection": "jdbc CONNECTION STRING",
                "driver": "com.mysql.jdbc.Driver"
            },
            "table": "ВИКУБ_ТАБЛИЦА_С_НАСТРОЕННЫМИ_ПОЛЯМИ_ПО_ЧИСЛУ_В_ЗАПРОСЕ"
    }'

Перед отправкой запроса необходимо получить токен аутентификации.

  1. В ответе на запрос, среди прочего, вы получите идентификатор загрузчика. Например: "id": 12.

  2. Теперь, для обновления данных, вы можете выполнять загрузку в назначенное время:

    curl --request POST \
      --url https://URL/viqube/loaders/ID_ЗАГРУЗЧИКА/execute \
      --header 'Authorization: Bearer ТОКЕН' \
      --header 'Content-Type: application/json' \
      --header 'X-API-VERSION: 3.10' \
      --data '{
      "command": "start"
    }'

Исходные данные

Таблица-приёмник ViQube testload
(id : длинное целое
modifiedDate : ДатаВремя,
value : целое)

Таблица-источник в postgres testload
(id : integer,
modifiedDate : timestamp without timezone,
value : integer)

Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.

import pandas as pd
import requests
from sqlalchemy import create_engine

postgreHost = 'localhost'
postgreDB   = 'testdb'
postgreUser = 'postgres'
postgrePort = '5432'
postgrePass = '******'
psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}"

viUrl    = "https://your_url"
viUser   = "admin"
viPass   = "*****"
viDB     = "DB"
viTable  = "testload"
viColumn = "modifiedDate"

chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных

#Запрос версии ViQube
def apiGetViQubeVersion(url):
    response = requests.get(url+"/viqube/version").json()
    return response.get("apiStable")
#Запрос токена
def apiGetAuthToken(url, user, pwd):
    payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}"
    headers = {
        'content-type': "application/x-www-form-urlencoded",
        'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY="
    }
    response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json()
    return response.get("access_token")

#Псевдо SQL запрос к ViQube
def apiGetQuery(url, headers, dbname, query):
    response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers)
    return response.json()
#Удаление записей таблицы по списку идентификаторов первичного ключа
def apiDeleteRecords(url, headers, dbname, tblname, recordIdList):
    response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records",
           json=recordIdList, headers=headers)
    return response
#Загрузка списка значений в таблицу
def apiStoreRecords(url, headers, dbname, tblname, records):
    response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers)
    return response

#Подготавка соединения с базой SQL
engine = create_engine(psqlConnectionString)
conn = engine.connect().execution_options(stream_results=True)
#Подготовка к API запросам к ViQube 
_version = apiGetViQubeVersion(viUrl)  
_token = apiGetAuthToken(viUrl, viUser, viPass)
_headers={
        "Authorization": f"Bearer {_token}",
        "X-API-VERSION": f"{_version}"
}
#Запрос на выбор старшей даты в таблице в ViQube
queryMax={
  "from": viTable,
  "limit": 1,
  "orderby": [
    {
      "column": viColumn,
      "function": "OFF",
      "order": "DESC"
    }
  ]
}
rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax)
#Определение даты, с которой начнётся обновление данных
lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z'
#Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты
sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\''
#Постраничое добавление записей
for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize):
    #Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя
    chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str)
    cdfl = chunk_df.values.tolist()
    #Определение список Id, которые необходимо обновить
    listToDelete=chunk_df['id'].tolist()
    #удаление имеющиеся записи с в соответсвие списку идентификаторов
    if len(listToDelete):
        print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete))
    #Формирование список для дабавления
    ds = {"columns":["id", "value", "modifiedDate"],
          "values": cdfl}
    #Загрузка данных в ViQube
    if len(cdfl):
        print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds))

conn.close()

  • Нет меток