Сравнение версий

Ключ

  • Эта строка добавлена.
  • Эта строка удалена.
  • Изменено форматирование.
просто

Тип статьи

Пример загрузки данных в ViQube на основании даты модификации записи

Компетенции

АналитикPython разработчик, аналитик со знанием Python программист

Необходимые права

Права на запись в таблицу ViQube

Версия платформы

2.26

Статус

Статус
colourYellow
titleЧерновикбета

Сложность(синяя звезда)

надо попотеть

Полезные ссылки

Дополнительные сведения

Ubuntu 18.04, Python 3.8, Pandas 1.3.4, Sqlalchemy 1.4.27, Requests 2.26.0

...

При использовании загрузчика в веб интерфейсе ViQube-Admin таблица на Викубе удаляется полностью и загружается снова. Иногда это неоправданно и стоит использовать инкрементальную загрузку новых и измененных записейНе всегда это приемлемо, например, во время этой процедуры пропадают данные на дашборде, или загрузка может занимать долгое время, если таблица очень тяжелая. Выход - использования Viqube API для реализации своей инкрементальной загрузки. Приведённый пример демонстрирует один из возможных вариантов решения на основе Python- скрипта.

Подсказка

Исходные данные

Таблица-приёмник ViQube testload
(id : длинное целое
modifiedDate : ДатаВремя,
value : целое)

Таблица-источник в postgres testload
(id : integer,
modifiedDate : timestamp without timezone,
value : integer)

Для воспроизведения работы скрипта у себя необходимо создать такую же таблицу в своей тестовой СУБД SQL и загрузить в нее данные из приложенной CSV.

View file
nametestload.csv

Блок кода
breakoutModewide
import pandas as pd
import requests
from sqlalchemy import create_engine

postgreHost = 'localhost'
postgreDB   = 'testdb'
postgreUser = 'postgres'
postgrePort = '5432'
postgrePass = '******'
psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}"

viUrl    = "https://your_url"
viUser   = "admin"
viPass   = "*****"
viDB     = "DB"
viTable  = "testload"
viColumn = "modifiedDate"

chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных

#Запрос версии ViQube
def apiGetViQubeVersion(url):
    response = requests.get(url+"/viqube/version").json()
    return response.get("apiStable")
#Запрос токена
def apiGetAuthToken(url, user, pwd):
    payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}"
    headers = {
        'content-type': "application/x-www-form-urlencoded",
        'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY="
    }
    response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json()
    return response.get("access_token")

#Псевдо SQL запрос к ViQube
def apiGetQuery(url, headers, dbname, query):
    response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers)
    return response.json()
#Удаление записей таблицы по списку идентификаторов первичного ключа
def apiDeleteRecords(url, headers, dbname, tblname, recordIdList):
    response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records",
           json=recordIdList, headers=headers)
    return response
#Загрузка списка значений в таблицу
def apiStoreRecords(url, headers, dbname, tblname, records):
    response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers)
    return response

#Подготавка соединения с базой SQL
engine = create_engine(psqlConnectionString)
conn = engine.connect().execution_options(stream_results=True)
#Подготовка к API запросам к ViQube 
_version = apiGetViQubeVersion(viUrl)  
_token = apiGetAuthToken(viUrl, viUser, viPass)
_headers={
        "Authorization": f"Bearer {_token}",
        "X-API-VERSION": f"{_version}"
}
#Запрос на выбор старшей даты в таблице в ViQube
queryMax={
  "from": viTable,
  "limit": 1,
  "orderby": [
    {
      "column": viColumn,
      "function": "OFF",
      "order": "DESC"
    }
  ]
}
rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax)
#Определение даты, с которой начнётся обновление данных
lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z'
#Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты
sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\''
#Постраничое добавление записей
for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize):
    #Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя
    chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str)
    cdfl = chunk_df.values.tolist()
    #Определение список Id, которые необходимо обновить
    listToDelete=chunk_df['id'].tolist()
    #удаление имеющиеся записи с в соответсвие списку идентификаторов
    if len(listToDelete):
        print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete))
    #Формирование список для дабавления
    ds = {"columns":["id", "value", "modifiedDate"],
          "values": cdfl}
    #Загрузка данных в ViQube
    if len(cdfl):
        print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds))

conn.close()