Сравнение версий

Ключ

  • Эта строка добавлена.
  • Эта строка удалена.
  • Изменено форматирование.

При использовании загрузчика в веб-интерфейсе ViQube-Admin, таблица на ViQube удаляется полностью и загружается снова. Это не всегда приемлемо, т.к. во время этой процедуры могут, например, пропасть данные на дашборде, или загрузка может занять много времени, если таблица содержит большое количество данных. Выходом из данной ситуации является использование ViQube API для выполнения инкрементальной загрузки. Ниже приведен один из возможных вариантов решения данной проблемы.

Для выполнения инкрементальной загрузки:

  1. Создайте загрузчик. Например:

    Блок кода
    curl --request POST \
      --url https://URL/viqube/loaders \
      --header 'Authorization: Bearer TOKEN' \
      --header 'Content-Type: application/json' \
      --header 'X-API-VERSION: 3.10' \
      --data '{
            "database": "ID_БАЗЫ",
            "options": {
                "query": "select ПОЛЕ1, ПОЛЕ2, ... FROM ТАБЛИЦА",
                "type": "JDBC",
                "connection": "jdbc CONNECTION STRING",
                "driver": "com.mysql.jdbc.Driver" (замените на нужный, в зависимости от внешней базы данных)
            },
            "table": "ВИКУБ_ТАБЛИЦА_С_НАСТРОЕННЫМИ_ПОЛЯМИ_ПО_ЧИСЛУ_В_ЗАПРОСЕ"
    }'
Информация

Перед отправкой запроса необходимо получить токен аутентификации.

  1. В ответе на запрос, среди прочего, вы получите идентификатор загрузчика. Например: "id": 12.

  2. Теперь, для обновления данных, вы можете выполнять загрузку в назначенное время:

    Блок кода
    curl --request POST \
      --url https://URL/viqube/loaders/ID_ЗАГРУЗЧИКА/execute \
      --header 'Authorization: Bearer ТОКЕН' \
      --header 'Content-Type: application/json' \
      --header 'X-API-VERSION: 3.10' \
      --data '{
      "command": "start"
    }'
Выборка
hiddentrue

Тип статьи

Пример загрузки данных в ViQube на основании даты модификации записи

Компетенции

...

Python разработчик, аналитик со знанием Python

...

Необходимые права

Права на запись в таблицу ViQube

Версия платформы

2.26

Статус

Статус
colourYellow
title

...

бета

Сложность(синяя звезда)

...

надо попотеть

Полезные ссылки

Дополнительные сведения

Ubuntu 18.04, Python 3.8, Pandas 1.3.4, Sqlalchemy 1.4.27, Requests 2.26.0

📋 Подробное описание

При использовании загрузчика в веб интерфейсе ViQube-Admin таблица на Викубе удаляется полностью и загружается снова.

...

wide

Не всегда это приемлемо, например, во время этой процедуры пропадают данные на дашборде, или загрузка может занимать долгое время, если таблица очень тяжелая. Выход - использования Viqube API для реализации своей инкрементальной загрузки. Приведённый пример демонстрирует один из возможных вариантов решения на основе Python- скрипта.

Подсказка

Исходные данные

Таблица-приёмник ViQube testload
(id : длинное целое
modifiedDate : ДатаВремя,
value : целое)

Таблица-источник в postgres testload
(id : integer,
modifiedDate : timestamp without timezone,
value : integer)

Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.

View file
nametestload.csv

Блок кода
breakoutMode
import pandas as pd
import requests
from sqlalchemy import create_engine

postgreHost = 'localhost'
postgreDB   = 'testdb'
postgreUser = 'postgres'
postgrePort = '5432'
postgrePass = '******'
psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}"

viUrl    = "https://your_url"
viUser   = "admin"
viPass   = "*****"
viDB     = "DB"
viTable  = "testload"
viColumn = "modifiedDate"

chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных

#Запрос версии ViQube
def apiGetViQubeVersion(url):
    response = requests.get(url+"/viqube/version").json()
    return response.get("apiStable")
#Запрос токена
def apiGetAuthToken(url, user, pwd):
    payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}"
    headers = {
        'content-type': "application/x-www-form-urlencoded",
        'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY="
    }
    response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json()
    return response.get("access_token")

#Псевдо SQL запрос к ViQube
def apiGetQuery(url, headers, dbname, query):
    response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers)
    return response.json()
#Удаление записей таблицы по списку идентификаторов первичного ключа
def apiDeleteRecords(url, headers, dbname, tblname, recordIdList):
    response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records",
           json=recordIdList, headers=headers)
    return response
#Загрузка списка значений в таблицу
def apiStoreRecords(url, headers, dbname, tblname, records):
    response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers)
    return response

#Подготавка соединения с базой SQL
engine = create_engine(psqlConnectionString)
conn = engine.connect().execution_options(stream_results=True)
#Подготовка к API запросам к ViQube 
_version = apiGetViQubeVersion(viUrl)  
_token = apiGetAuthToken(viUrl, viUser, viPass)
_headers={
        "Authorization": f"Bearer {_token}",
        "X-API-VERSION": f"{_version}"
}
#Запрос на выбор старшей даты в таблице в ViQube
queryMax={
  "from": viTable,
  "limit": 1,
  "orderby": [
    {
      "column": viColumn,
      "function": "OFF",
      "order": "DESC"
    }
  ]
}
rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax)
#Определение даты, с которой начнётся обновление данных
lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z'
#Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты
sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\''
#Постраничое добавление записей
for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize):
    #Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя
    chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str)
    cdfl = chunk_df.values.tolist()
    #Определение список Id, которые необходимо обновить
    listToDelete=chunk_df['id'].tolist()
    #удаление имеющиеся записи с в соответсвие списку идентификаторов
    if len(listToDelete):
        print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete))
    #Формирование список для дабавления
    ds = {"columns":["id", "value", "modifiedDate"],
          "values": cdfl}
    #Загрузка данных в ViQube
    if len(cdfl):
        print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds))

conn.close()