Example script: Polling for a job status and creating a Docker package

Polls for a status of a data generation job every minute while the job is running or in the queue.

If the job fails or is canceled, an appropriate message is displayed.

If the job completes successfully, it creates a SQL dump and, if it does not already exist in the directory, a Dockerfile. The Dockerfile helps you to use Docker to distribute the destination database.

For more information, read our blog about using Docker to manage your databases.

Note that the SQL dump is specific for PostgreSQL databases and uses pg_dump.

Text of the script

tonic_status_poll.py
# Note that our API does not guarantee backward compatibility.
# For the endpoints and parameters for your exact version, view the 
# Swagger API documentation that comes with your instance.
# If you use Structural Cloud, the API documentation is at:

# https://app.tonic.ai/apidocs/index.html

# Tested 2021.04.12 with Tonic API v199

import os
import pathlib
import collections
import subprocess
import sys
import time

# Before you run the script, use pip, pipenv, or poetry to install the requests.
import requests

# Tonic Parameters
TONIC_BASE_URL = "http://<<TONIC HOSTNAME>>/"
TONIC_WORKSPACE_ID = "<<TONIC WORKSPACE ID>>"
TONIC_APIKEY = "<<TONIC APIKEY>>"
SOURCE_DB_PASSWORD = "<<DESTINATION DB PASSWORD>>"

# Provide the Structural job identifier as a command line argument
# (ex. "pip3 tonic_status_poll.py TONIC_JOB_ID")

DatabaseInfo = collections.namedtuple(
    "DatabaseInfo", ["server", "port", "username", "database"]
)

class TonicSession:
    def __init__(self, base_url, apikey):
        self._base_url = base_url
        self._session = requests.Session()
        self._api_key = apikey
        self._session.headers.update({"Authorization": "Apikey {}".format(apikey)})

    # Poll for a status of a Structural job
    def get_status(self, job_id):
        print("Grabbing job status for job {jobid}...".format(jobid=job_id))
        status_url = "{url}api/GenerateData/jobs/{job_id}".format(
            url=self._base_url, job_id=job_id
        )

        while True:
            resp = self._session.get(status_url)

            if resp.ok:
                resp_json = resp.json()
                status = resp_json.get("status")
                message = resp_json.get("errorMessages")

                if status and status in ("Running", "Queued"):
                    print(
                        "Job {job_id} is {status}. Waiting 1 minute before "
                        "checking again".format(job_id=job_id, status=status)
                    )
                    time.sleep(60)
                    print("Checking for job status again... ")
                else:
                    if status and status in ("Failed", "Canceled"):
                        print(
                            "Job {job_id} {status} with the following "
                            "message: {message}".format(
                                job_id=job_id, status=status, message=message
                            )
                        )
                    if status and status == "Completed":
                        print("Job {job_id} completed.".format(job_id=job_id))
                        self.packaging_for_docker(job_id)
                    break
            else:
                return resp.raise_for_status()

    # Get destination database connection details from Structural
    def get_db_info(self, workspace_id):
        print("Grabbing destination database connection details...")
        db_info_url = "{url}api/DataSource?workspaceId={workspace_id}".format(
            url=self._base_url, workspace_id=workspace_id
        )
        resp = self._session.get(db_info_url)

        if resp.ok:
            db_json = resp.json()
            destination_db = DatabaseInfo(
                server=db_json["destinationDatabase"]["server"],
                port=db_json["destinationDatabase"]["port"],
                username=db_json["destinationDatabase"]["username"],
                database=db_json["destinationDatabase"]["database"],
            )
        else:
            return resp.raise_for_status()

        return destination_db

    # Get a SQL dump and generate a Dockerfile for packaging with Docker
    # (https://www.tonic.ai/blog/using-docker-to-manage-your-test-database)
    # Need to specify the destination DB password at the top
    def packaging_for_docker(self, job_id):
        db_info = self.get_db_info(TONIC_WORKSPACE_ID)
        db_dumpfile="pg_dump_{jobid}.sql".format(jobid=job_id)

        with open(db_dumpfile, "wb") as fobj:
            os.environ["PGPASSWORD"] = SOURCE_DB_PASSWORD
            os.environ["PGHOST"] = "localhost"
            os.environ["PGPORT"] = str(db_info.port)
            os.environ["PGUSER"] = db_info.username
            os.environ["PGDATABASE"] = db_info.database

            print("Dump started for {dbname}...".format(dbname=db_info.database))

            pgdump_proc = subprocess.Popen(
                "pg_dump", stdout=subprocess.PIPE, universal_newlines=True
            )
            for stdout_line in iter(pgdump_proc.stdout.readline, ""):
                fobj.write(stdout_line.encode("utf-8"))
            pgdump_proc.stdout.close()

            directory = pathlib.Path(fobj.name).parent.absolute()

            if "Dockerfile" not in os.listdir(directory):
                with open("Dockerfile", "w") as dfile_obj:
                    dfile_obj.writelines(
                        [
                            "FROM postgres:13\n",
                            "COPY {db_dumpfile} /docker-entrypoint-initdb.d/".format(db_dumpfile=db_dumpfile),
                        ]
                    )
                    dfile_obj.close()

        print(
            "A SQL dump of the destination DB can be found here: {dir}/{db_dumpfile}".format(
                dir=directory,db_dumpfile=db_dumpfile
            )
        )

def main():
    tonic_job_id = sys.argv[1]
    session = TonicSession(TONIC_BASE_URL, TONIC_APIKEY)
    session.get_status(tonic_job_id)
    print("\nRun this script against another Structural job ID to poll for its status.")

if __name__ == "__main__":
    main()

Building a Docker image from the Dockerfile

To use the resulting Dockerfile to build a Docker image:

docker build -t <image_name:tag>

Running the Docker image

To run the image, expose the database on a local port, and, if needed, add a superuser password:

docker run -d -p <local_port>:5432 --name <container_name> -e POSTGRES_PASSWORD=mysecretpassword <image_name:tag>

Connecting to the database

To connect to the database:

psql postgres -p <local_port> -h 127.0.0.1 -U postgres

You are prompted for the superuser password.

Last updated