Example script: Polling for a job status and creating a Docker package

Polls for a status of a data generation job every minute while the job is still running or in the queue.

If the job fails or is canceled, a message appears accordingly.

If the job completed successfully, it creates a SQL dump and a Dockerfile (if it doesn't already exist in the directory) to help distribute the destination database via Docker.

You can read more in our blog about using Docker to manage your databases.

Note: The SQL dump is specific for PostgreSQL databases via pg_dump.

tonic_status_poll.py
# Note that we do not guarantee backwards compatibility for our API.
# Please look at the Swagger API documentation that comes with your instance
# for your exact version's endpoints and parameters. If you are using hosted
# Tonic, you can find the API documentation here:

# https://app.tonic.ai/apidocs/index.html

# Tested 2021.04.12 with Tonic API v199

import os
import pathlib
import collections
import subprocess
import sys
import time

# Be sure to install requests via pip, pipenv, or poetry before running.
import requests


# Tonic Parameters
TONIC_BASE_URL = "http://<<TONIC HOSTNAME>>/"
TONIC_WORKSPACE_ID = "<<TONIC WORKSPACE ID>>"
TONIC_APIKEY = "<<TONIC APIKEY>>"
SOURCE_DB_PASSWORD = "<<DESTINATION DB PASSWORD>>"

# Provide the Tonic Job ID as a command line argument
# (ex. "pip3 tonic_status_poll.py TONIC_JOB_ID")

DatabaseInfo = collections.namedtuple(
    "DatabaseInfo", ["server", "port", "username", "database"]
)


class TonicSession:
    def __init__(self, base_url, apikey):
        self._base_url = base_url
        self._session = requests.Session()
        self._api_key = apikey
        self._session.headers.update({"Authorization": "Apikey {}".format(apikey)})

    # Poll for a status of a Tonic job
    def get_status(self, job_id):
        print("Grabbing job status for job {jobid}...".format(jobid=job_id))
        status_url = "{url}api/GenerateData/jobs/{job_id}".format(
            url=self._base_url, job_id=job_id
        )

        while True:
            resp = self._session.get(status_url)

            if resp.ok:
                resp_json = resp.json()
                status = resp_json.get("status")
                message = resp_json.get("errorMessages")

                if status and status in ("Running", "Queued"):
                    print(
                        "Job {job_id} is {status}. Waiting 1 minute before "
                        "checking again".format(job_id=job_id, status=status)
                    )
                    time.sleep(60)
                    print("Checking for job status again... ")
                else:
                    if status and status in ("Failed", "Canceled"):
                        print(
                            "Job {job_id} {status} with the following "
                            "message: {message}".format(
                                job_id=job_id, status=status, message=message
                            )
                        )
                    if status and status == "Completed":
                        print("Job {job_id} completed.".format(job_id=job_id))
                        self.packaging_for_docker(job_id)
                    break
            else:
                return resp.raise_for_status()

    # Get destination DB connection details from Tonic
    def get_db_info(self, workspace_id):
        print("Grabbing destination database connection details...")
        db_info_url = "{url}api/DataSource?workspaceId={workspace_id}".format(
            url=self._base_url, workspace_id=workspace_id
        )
        resp = self._session.get(db_info_url)

        if resp.ok:
            db_json = resp.json()
            destination_db = DatabaseInfo(
                server=db_json["destinationDatabase"]["server"],
                port=db_json["destinationDatabase"]["port"],
                username=db_json["destinationDatabase"]["username"],
                database=db_json["destinationDatabase"]["database"],
            )
        else:
            return resp.raise_for_status()

        return destination_db

    # Get a SQL dump and generate a Dockerfile for packaging via Docker
    # (https://www.tonic.ai/blog/using-docker-to-manage-your-test-database)
    # Need to specify destination DB password at the top
    def packaging_for_docker(self, job_id):
        db_info = self.get_db_info(TONIC_WORKSPACE_ID)

        with open("pg_dump_{jobid}.sql".format(jobid=job_id), "wb") as fobj:
            os.environ["PGPASSWORD"] = SOURCE_DB_PASSWORD
            os.environ["PGHOST"] = "localhost"
            os.environ["PGPORT"] = str(db_info.port)
            os.environ["PGUSER"] = db_info.username
            os.environ["PGDATABASE"] = db_info.database

            print("Dump started for {dbname}...".format(dbname=db_info.database))

            pgdump_proc = subprocess.Popen(
                "pg_dump", stdout=subprocess.PIPE, universal_newlines=True
            )
            for stdout_line in iter(pgdump_proc.stdout.readline, ""):
                fobj.write(stdout_line.encode("utf-8"))
            pgdump_proc.stdout.close()

            directory = pathlib.Path(fobj.name).parent.absolute()

            if "Dockerfile" not in os.listdir(directory):
                with open("Dockerfile", "w") as dfile_obj:
                    dfile_obj.writelines(
                        [
                            "FROM postgres:13\n",
                            "COPY sql/*.sql /docker-entrypoint-initdb.d/",
                        ]
                    )
                    dfile_obj.close()

        print(
            "A SQL dump of the destination DB can be found here: {dir}".format(
                dir=directory
            )
        )


def main():
    tonic_job_id = sys.argv[1]
    session = TonicSession(TONIC_BASE_URL, TONIC_APIKEY)
    session.get_status(tonic_job_id)
    print("\nRun this script against another Tonic job ID to poll for its status.")


if __name__ == "__main__":
    main()

Last updated