Сравнение версий

Ключ

  • Эта строка добавлена.
  • Эта строка удалена.
  • Изменено форматирование.

...

Подсказка

Исходные данные

Таблица-приёмник ViQube testload(id:длинное целое

modifiedDate: ДатаВремя, value:целое)

Таблица-источник в postgres testload(id:integer,

modifiedDate:timestamp without timezone, value:integer)

View file
nametestload.csv

Блок кода
breakoutModewide
import pandas as pd
import requests
from sqlalchemy import create_engine

postgreHost = 'localhost'
postgreDB   = 'testdb'
postgreUser = 'postgres'
postgrePort = '5432'
postgrePass = '******'
psqlConnectionString = f"postgresql://{postgreUser}:{postgrePass}@{postgreHost}:{postgrePort}/{postgreDB}"

viUrl    = "https://your_url"
viUser   = "admin"
viPass   = "*****"
viDB     = "DB"
viTable  = "testload"
viColumn = "modifiedDate"

chunkSize = 100 #строк из запроса на странице при работе с большими объемами данных

#Запрос версии ViQube
def apiGetViQubeVersion(url):
    response = requests.get(url+"/viqube/version").json()
    return response.get("apiStable")
#Запрос токена
def apiGetAuthToken(url, user, pwd):
    payload = f"grant_type=password&scope=openid+profile+email+roles+viqube_api+viqubeadmin_api+core_logic_facade+dashboards_export_service+script_service+migration_service_api+data_collection&response_type=id_token+token&username={user}&password={pwd}"
    headers = {
        'content-type': "application/x-www-form-urlencoded",
        'authorization': "Basic cHVibGljX3JvX2NsaWVudDpAOVkjbmckXXU+SF4zajY="
    }
    response = requests.post(url+"/idsrv/connect/token", data=payload, headers=headers).json()
    return response.get("access_token")

#Псевдо SQL запрос к ViQube
def apiGetQuery(url, headers, dbname, query):
    response = requests.post(url+f"/viqube/databases/{dbname}/query", json=query, headers=headers)
    return response.json()
#Удаление записей таблицы по списку идентификаторов первичного ключа
def apiDeleteRecords(url, headers, dbname, tblname, recordIdList):
    response = requests.delete(url+f"/viqube/databases/{dbname}/tables/{tblname}/records",
           json=recordIdList, headers=headers)
    return response
#Загрузка списка значений в таблицу
def apiStoreRecords(url, headers, dbname, tblname, records):
    response = requests.post(url+f"/viqube/databases/{dbname}/tables/{tblname}/records", json=records, headers=headers)
    return response

#Подготавка соединения с базой SQL
engine = create_engine(psqlConnectionString)
conn = engine.connect().execution_options(stream_results=True)
#Подготовка к API запросам к ViQube 
_version = apiGetViQubeVersion(viUrl)  
_token = apiGetAuthToken(viUrl, viUser, viPass)
_headers={
        "Authorization": f"Bearer {_token}",
        "X-API-VERSION": f"{_version}"
}
#Запрос на выбор старшей даты в таблице в ViQube
queryMax={
  "from": viTable,
  "limit": 1,
  "orderby": [
    {
      "column": viColumn,
      "function": "OFF",
      "order": "DESC"
    }
  ]
}
rQuery = apiGetQuery(viUrl, _headers, viDB, queryMax)
#Определение даты, с которой начнётся обновление данных
lastDate = rQuery['values'][0][1] if len(rQuery['values']) else '2022-01-02T00:00:00Z'
#Запрос в SQL базе на выборку обновлённых записей с момента запрашиваемой даты
sqlQuery = 'SELECT "id","value", "modifiedDate" FROM "public"."testload" WHERE "modifiedDate">\''+lastDate+'\''
#Постраничое добавление записей
for chunk_df in pd.read_sql(sqlQuery, conn, chunksize=chunkSize):
    #Тип данных преобразуется к строке, а ViQube самостоятельно конвертирует в ДатаВремя
    chunk_df['modifiedDate'] = chunk_df['modifiedDate'].astype(str)
    cdfl = chunk_df.values.tolist()
    #Определение список Id, которые необходимо обновить
    listToDelete=chunk_df['id'].tolist()
    #удаление имеющиеся записи с в соответсвие списку идентификаторов
    if len(listToDelete):
        print(apiDeleteRecords(viUrl, _headers, viDB, viTable, listToDelete))
    #Формирование список для дабавления
    ds = {"columns":["id", "value", "modifiedDate"],
          "values": cdfl}
    #Загрузка данных в ViQube
    if len(cdfl):
        print(apiStoreRecords(viUrl, _headers, viDB, viTable, ds))

conn.close()