Перейти до основного вмісту

Керування обчислювальними та інформаційними ресурсами Qiskit Serverless

Версії пакетів

Код на цій сторінці розроблявся з використанням таких залежностей. Рекомендуємо використовувати ці або новіші версії.

qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0

За допомогою Qiskit Serverless можна керувати обчислювальними та інформаційними ресурсами у межах свого патерну Qiskit, включаючи CPU, QPU та інші обчислювальні прискорювачі.

Встановлення детальних статусів

Робочі навантаження Serverless проходять кілька етапів у рамках робочого процесу. За замовчуванням доступні такі статуси через job.status():

  • QUEUED: робоче навантаження стоїть у черзі на класичні ресурси
  • INITIALIZING: робоче навантаження налаштовується
  • RUNNING: робоче навантаження зараз виконується на класичних ресурсах
  • DONE: робоче навантаження успішно завершено

Також можна встановлювати власні статуси, що детальніше описують конкретний етап робочого процесу, як показано нижче.

# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path

Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py

from qiskit_serverless import update_status, Job

## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)

## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)

## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)

## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)

## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py

Після успішного завершення цього робочого навантаження (викликом save_result()) статус автоматично зміниться на DONE.

Паралельні робочі процеси

Для класичних задач, що піддаються розпаралелюванню, використовуй декоратор @distribute_task, щоб визначити необхідні обчислювальні ресурси для виконання задачі. Для початку пригадай приклад transpile_remote.py з теми Напиши свою першу програму Qiskit Serverless і скористайся таким кодом.

Наведений нижче код передбачає, що ти вже зберіг свої облікові дані.

%%writefile ./source_files/transpile_remote.py

from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task

service = QiskitRuntimeService()

@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py

У цьому прикладі функція transpile_remote() декорована @distribute_task(target={"cpu": 1}). При запуску це створює асинхронний паралельний воркер-таск з одним ядром CPU і повертає посилання для відстеження воркера. Щоб отримати результат, передай посилання у функцію get(). Це дозволяє запускати кілька паралельних задач:

%%writefile --append ./source_files/transpile_remote.py

from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job

# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation

update_status(Job.OPTIMIZING_HARDWARE)

start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]

transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata

result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}

save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.

def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid

title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")

test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

Дослідження різних конфігурацій задач

За допомогою @distribute_task() можна гнучко розподіляти CPU, GPU та пам'ять для своїх задач. Для Qiskit Serverless на IBM Quantum® Platform кожна програма оснащена 16 ядрами CPU та 32 ГБ оперативної пам'яті, які можна динамічно розподіляти за потребою.

Ядра CPU можна виділяти як цілими одиницями, так і у вигляді дробових часток, як показано нижче.

Пам'ять виділяється у байтах. Нагадаємо: 1024 байти = 1 кілобайт, 1024 кілобайти = 1 мегабайт, 1024 мегабайти = 1 гігабайт. Щоб виділити 2 ГБ пам'яті для воркера, потрібно вказати "mem": 2 * 1024 * 1024 * 1024.

%%writefile --append ./source_files/transpile_remote.py

@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

Керування даними у програмі

Qiskit Serverless дозволяє керувати файлами в каталозі /data для всіх твоїх програм. Є кілька обмежень:

  • Підтримуються лише файли tar та h5
  • Це лише плоске сховище /data, підкаталоги типу /data/folder/ не підтримуються

Нижче показано, як завантажувати файли. Переконайся, що ти авторизований у Qiskit Serverless за допомогою свого облікового запису IBM Quantum (інструкції — у розділі Розгортання на IBM Quantum Platform).

import tarfile
from qiskit_serverless import IBMServerlessClient

# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()

# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)

# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'

Далі можна переглянути всі файли у каталозі data. Ці дані доступні для всіх програм.

serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']

Це можна зробити з програми: використай file_download() для завантаження файлу в середовище програми та розпакуй tar.

%%writefile ./source_files/extract_tarfile.py

import tarfile
from qiskit_serverless import IBMServerlessClient

serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)

with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()

На цьому етапі програма може взаємодіяти з файлами так само, як із локальним експериментом. file_upload(), file_download() та file_delete() можна викликати як із локального експерименту, так і з завантаженої програми — для зручного та гнучкого керування даними.

Наступні кроки

Рекомендації