Comment on page
Example script: Polling for a job status and creating a Docker package
Polls for a status of a data generation job every minute while the job is still running or in the queue.
If the job fails or is canceled, a message appears accordingly.
If the job completed successfully, it creates a SQL dump and a Dockerfile (if it doesn't already exist in the directory) to help distribute the destination database via Docker.
Note: The SQL dump is specific for PostgreSQL databases via pg_dump.
Python
tonic_status_poll.py
# Note that we do not guarantee backwards compatibility for our API.
# Please look at the Swagger API documentation that comes with your instance
# for your exact version's endpoints and parameters. If you are using hosted
# Tonic, you can find the API documentation here:
# https://app.tonic.ai/apidocs/index.html
# Tested 2021.04.12 with Tonic API v199
import os
import pathlib
import collections
import subprocess
import sys
import time
# Be sure to install requests via pip, pipenv, or poetry before running.
import requests
# Tonic Parameters
TONIC_BASE_URL = "http://<<TONIC HOSTNAME>>/"
TONIC_WORKSPACE_ID = "<<TONIC WORKSPACE ID>>"
TONIC_APIKEY = "<<TONIC APIKEY>>"
SOURCE_DB_PASSWORD = "<<DESTINATION DB PASSWORD>>"
# Provide the Tonic Job ID as a command line argument
# (ex. "pip3 tonic_status_poll.py TONIC_JOB_ID")
DatabaseInfo = collections.namedtuple(
"DatabaseInfo", ["server", "port", "username", "database"]
)
class TonicSession:
def __init__(self, base_url, apikey):
self._base_url = base_url
self._session = requests.Session()
self._api_key = apikey
self._session.headers.update({"Authorization": "Apikey {}".format(apikey)})
# Poll for a status of a Tonic job
def get_status(self, job_id):
print("Grabbing job status for job {jobid}...".format(jobid=job_id))
status_url = "{url}api/GenerateData/jobs/{job_id}".format(
url=self._base_url, job_id=job_id
)
while True:
resp = self._session.get(status_url)
if resp.ok:
resp_json = resp.json()
status = resp_json.get("status")
message = resp_json.get("errorMessages")
if status and status in ("Running", "Queued"):
print(
"Job {job_id} is {status}. Waiting 1 minute before "
"checking again".format(job_id=job_id, status=status)
)
time.sleep(60)
print("Checking for job status again... ")
else:
if status and status in ("Failed", "Canceled"):
print(
"Job {job_id} {status} with the following "
"message: {message}".format(
job_id=job_id, status=status, message=message
)
)
if status and status == "Completed":
print("Job {job_id} completed.".format(job_id=job_id))
self.packaging_for_docker(job_id)
break
else:
return resp.raise_for_status()
# Get destination DB connection details from Tonic
def get_db_info(self, workspace_id):
print("Grabbing destination database connection details...")
db_info_url = "{url}api/DataSource?workspaceId={workspace_id}".format(
url=self._base_url, workspace_id=workspace_id
)
resp = self._session.get(db_info_url)
if resp.ok:
db_json = resp.json()
destination_db = DatabaseInfo(
server=db_json["destinationDatabase"]["server"],
port=db_json["destinationDatabase"]["port"],
username=db_json["destinationDatabase"]["username"],
database=db_json["destinationDatabase"]["database"],
)
else:
return resp.raise_for_status()
return destination_db
# Get a SQL dump and generate a Dockerfile for packaging via Docker
# (https://www.tonic.ai/blog/using-docker-to-manage-your-test-database)
# Need to specify destination DB password at the top
def packaging_for_docker(self, job_id):
db_info = self.get_db_info(TONIC_WORKSPACE_ID)
with open("pg_dump_{jobid}.sql".format(jobid=job_id), "wb") as fobj:
os.environ["PGPASSWORD"] = SOURCE_DB_PASSWORD
os.environ["PGHOST"] = "localhost"
os.environ["PGPORT"] = str(db_info.port)
os.environ["PGUSER"] = db_info.username
os.environ["PGDATABASE"] = db_info.database
print("Dump started for {dbname}...".format(dbname=db_info.database))
pgdump_proc = subprocess.Popen(
"pg_dump", stdout=subprocess.PIPE, universal_newlines=True
)
for stdout_line in iter(pgdump_proc.stdout.readline, ""):
fobj.write(stdout_line.encode("utf-8"))
pgdump_proc.stdout.close()
directory = pathlib.Path(fobj.name).parent.absolute()
if "Dockerfile" not in os.listdir(directory):
with open("Dockerfile", "w") as dfile_obj:
dfile_obj.writelines(
[
"FROM postgres:13\n",
"COPY sql/*.sql /docker-entrypoint-initdb.d/",
]
)
dfile_obj.close()
print(
"A SQL dump of the destination DB can be found here: {dir}".format(
dir=directory
)
)
def main():
tonic_job_id = sys.argv[1]
session = TonicSession(TONIC_BASE_URL, TONIC_APIKEY)
session.get_status(tonic_job_id)
print("\nRun this script against another Tonic job ID to poll for its status.")
if __name__ == "__main__":
main()
Last modified 4mo ago