TOИIC
Search…
Example: Polling for a job status and creating a Docker package
Polls for a status of a data generation job every minute if the job is still running or in the queue. If the job fails or is canceled, a message will appear accordingly. If the job has successfully completed, it'll create a SQL dump and a Dockerfile (if it doesn't already exist in the directory) to help with distributing the destination DB via Docker. You can read more in our blog about using Docker to manage your databases.
Note: The SQL dump is specific for Postgres databases via pg_dump.
Python
tonic_status_poll.py
1
# Note that we do not guarantee backwards compatibility for our API.
2
# Please look at the Swagger API documentation that comes with your instance
3
# for your exact version's endpoints and parameters. If you are using hosted
4
# Tonic, you can find the API documentation here:
5
6
# https://app.tonic.ai/apidocs/index.html
7
8
# Tested 2021.04.12 with Tonic API v199
9
10
import os
11
import pathlib
12
import collections
13
import subprocess
14
import sys
15
import time
16
17
# Be sure to install requests via pip, pipenv, or poetry before running.
18
import requests
19
20
21
# Tonic Parameters
22
TONIC_BASE_URL = "http://<<TONIC HOSTNAME>>/"
23
TONIC_WORKSPACE_ID = "<<TONIC WORKSPACE ID>>"
24
TONIC_APIKEY = "<<TONIC APIKEY>>"
25
SOURCE_DB_PASSWORD = "<<DESTINATION DB PASSWORD>>"
26
27
# Provide the Tonic Job ID as a command line argument
28
# (ex. "pip3 tonic_status_poll.py TONIC_JOB_ID")
29
30
DatabaseInfo = collections.namedtuple(
31
"DatabaseInfo", ["server", "port", "username", "database"]
32
)
33
34
35
class TonicSession:
36
def __init__(self, base_url, apikey):
37
self._base_url = base_url
38
self._session = requests.Session()
39
self._api_key = apikey
40
self._session.headers.update({"Authorization": "Apikey {}".format(apikey)})
41
42
# Poll for a status of a Tonic job
43
def get_status(self, job_id):
44
print("Grabbing job status for job {jobid}...".format(jobid=job_id))
45
status_url = "{url}api/GenerateData/jobs/{job_id}".format(
46
url=self._base_url, job_id=job_id
47
)
48
49
while True:
50
resp = self._session.get(status_url)
51
52
if resp.ok:
53
resp_json = resp.json()
54
status = resp_json.get("status")
55
message = resp_json.get("errorMessages")
56
57
if status and status in ("Running", "Queued"):
58
print(
59
"Job {job_id} is {status}. Waiting 1 minute before "
60
"checking again".format(job_id=job_id, status=status)
61
)
62
time.sleep(60)
63
print("Checking for job status again... ")
64
else:
65
if status and status in ("Failed", "Canceled"):
66
print(
67
"Job {job_id} {status} with the following "
68
"message: {message}".format(
69
job_id=job_id, status=status, message=message
70
)
71
)
72
if status and status == "Completed":
73
print("Job {job_id} completed.".format(job_id=job_id))
74
self.packaging_for_docker(job_id)
75
break
76
else:
77
return resp.raise_for_status()
78
79
# Get destination DB connection details from Tonic
80
def get_db_info(self, workspace_id):
81
print("Grabbing destination database connection details...")
82
db_info_url = "{url}api/DataSource?workspaceId={workspace_id}".format(
83
url=self._base_url, workspace_id=workspace_id
84
)
85
resp = self._session.get(db_info_url)
86
87
if resp.ok:
88
db_json = resp.json()
89
destination_db = DatabaseInfo(
90
server=db_json["destinationDatabase"]["server"],
91
port=db_json["destinationDatabase"]["port"],
92
username=db_json["destinationDatabase"]["username"],
93
database=db_json["destinationDatabase"]["database"],
94
)
95
else:
96
return resp.raise_for_status()
97
98
return destination_db
99
100
# Get a SQL dump and generate a Dockerfile for packaging via Docker
101
# (https://www.tonic.ai/blog/using-docker-to-manage-your-test-database)
102
# Need to specify destination DB password at the top
103
def packaging_for_docker(self, job_id):
104
db_info = self.get_db_info(TONIC_WORKSPACE_ID)
105
106
with open("pg_dump_{jobid}.sql".format(jobid=job_id), "wb") as fobj:
107
os.environ["PGPASSWORD"] = SOURCE_DB_PASSWORD
108
os.environ["PGHOST"] = "localhost"
109
os.environ["PGPORT"] = str(db_info.port)
110
os.environ["PGUSER"] = db_info.username
111
os.environ["PGDATABASE"] = db_info.database
112
113
print("Dump started for {dbname}...".format(dbname=db_info.database))
114
115
pgdump_proc = subprocess.Popen(
116
"pg_dump", stdout=subprocess.PIPE, universal_newlines=True
117
)
118
for stdout_line in iter(pgdump_proc.stdout.readline, ""):
119
fobj.write(stdout_line.encode("utf-8"))
120
pgdump_proc.stdout.close()
121
122
directory = pathlib.Path(fobj.name).parent.absolute()
123
124
if "Dockerfile" not in os.listdir(directory):
125
with open("Dockerfile", "w") as dfile_obj:
126
dfile_obj.writelines(
127
[
128
"FROM postgres:13\n",
129
"COPY sql/*.sql /docker-entrypoint-initdb.d/",
130
]
131
)
132
dfile_obj.close()
133
134
print(
135
"A SQL dump of the destination DB can be found here: {dir}".format(
136
dir=directory
137
)
138
)
139
140
141
def main():
142
tonic_job_id = sys.argv[1]
143
session = TonicSession(TONIC_BASE_URL, TONIC_APIKEY)
144
session.get_status(tonic_job_id)
145
print("\nRun this script against another Tonic job ID to poll for its status.")
146
147
148
if __name__ == "__main__":
149
main()
Copied!
Copy link