Seamless migration of machine learning codebase from the local or EC2 to Databricks PySpark Clusters(Part III: Job management for all-purpose clusters via CLI and API)

Punchh Technology Blog
7 min readApr 9, 2020

Author: Tian Lan

After you transformed your codebase to a PySpark adaptable and prepared an all-purpose cluster, it is time to create and submit your job for development and testing.

To review the previous sections of this topic, here is the Table of Contents for the related blogs:

  1. Write the Spark User-Defined Function and codebase adaptor (Part I link)
  2. Manage Databricks all-purpose clusters via CLI and API (Part II link)
  3. Build, synchronize and submit job scripts at local (Part III)
  4. Manage Databricks job clusters exclusively via CLI and API (Part IV link)

Create a Spark Notebook locally

If you are familiar with Databricks UI, you may notice that their development environment is heavily relying on the notebook-style coding. A notebook is a fantastic development interface because it is highly interactive, however, if you are talking about submitting a production job, this could be quite inconvenient. For example, it is difficult for a notebook to accept runtime arguments like a Python script could do otherwise. You can easily write a Python script to accept runtime arguments like this:

create_job.py

from spark_mypythonML.spark_data_gen import mainmain(business_id=business_id,
date_range=date_range,
out_dir=out_dir)
if __name__ == '__main__':
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--business_id")
parser.add_argument("--date_range")
parser.add_argument("--out_dir")
args = parser.parse_args()
args = vars(args)
main(**args)
# you can call the job like
# python create_job.py --business_id=12345 --date_range="2019-01-01,2020-01-01" --out_dir XXXXX

When you are creating the job for submission, in this case, a notebook, it would be a huge inconvenience if you cannot easily assign values to the variables, for example, if you have 1000 businesses to run, you may not want to create 1000 copies of the above codes and manually change to different business_id.

To make the notebook accept a realtime argument, I used the following trick to mimic the argument parser. First, I have a notebook job template that specifies the placeholders for the runtime variables:

run_template_data_gen.py

from spark_mypythonML.spark_data_gen import mainbusiness_id=BUSINESS_ID
date_range="DATE_RANGE"
out_dir="OUT_DIR"
main(business_id=int(business_id),
date_range=date_range,
out_dir=out_dir)

Then, when you create the job, you have a companion code to replace the template notebook with your desired values at the runtime:

create_job_from_template.py

import os
import argparse
MOUNT_NAME = "XXXXXXX"parser = argparse.ArgumentParser()
parser.add_argument("--business_id")
parser.add_argument("--date_range")
parser.add_argument("--out_dir", default="/mnt/%s/spark_features" % MOUNT_NAME)
args = parser.parse_args()template_dir = "%s/run_templates" % os.path.dirname(os.path.realpath(__file__))
run_dir = "%s/run" % os.path.abspath(os.path.join(__file__ ,"../.."))
inputfile = "%s/run_template_data_gen.py" % template_dir
outputfile = "%s/%s" % (run_dir, args.output_filename)
with open(inputfile) as f:
file_str = f.read()
file_str_replaced = file_str.replace("BUSINESS_ID", args.business_id).\
replace("DATE_RANGE", args.date_range).\
replace("OUT_DIR", args.out_dir)
with open(outputfile, "w") as f:
f.write(file_str_replaced)

Now, on the surface, creating a job script looks exactly the same as the old create_job.py. In the end, the following script create_notebooks.sh is the only portion that an end-user needs to actively take care of, for example, he/she can add as many jobs as he/she likes.

create_notebooks.sh

python spark_runner/create/create_job_from_template.py --output_filename spark_data_gen_12345.py --business_id=12345 --date_range="2019-01-01,2020-01-01"

Synchronize Spark Notebook with the Remote Cluster

A Databricks Workspace is an environment for accessing all of your Databricks assets. The Workspace organizes objects (notebooks, libraries, and experiments) into folders, and provides access to data and computational resources. In the following script, Databricks Workspace CLI is utilized to push all your local notebooks to the cluster. You can see that I also pay attention to the local logging because this is a good practice to always bookkeeping the job creation pipeline.

workspace.sh

WORKSPACEPATH=/Users/${account}/workspace/PROJECT_NAMElocal_timestamp=$(date "+DATE: %Y-%m-%d%nTIME: %H:%M:%S")
echo "----------------------------------------------------------------------------------" | tee -a ./spark_runner/run/run.log
echo ${local_timestamp} | tee -a ./spark_runner/run/run.log
echo "Push local run files to remote workspace notebooks" 2>&1 | tee -a ./spark_runner/run/run.log
databricks workspace import_dir -o ./spark_runner/run ${WORKSPACEPATH} 2>&1 | tee -a ./spark_runner/run/run.log

If you execute the above two scripts

create_notebooks.sh
workspace.sh

You will record the following log

bash create_notebooks.sh
bash workspace.sh
----------------------------------------------------------------------------------
DATE: 2020-04-09 TIME: 15:51:04
Push local run files to remote workspace notebooks
./spark_runner/run/run.log does not have a valid extension of .scala, .py, .sql, .SQL, .r, .R, .ipynb, .html. Skip this file and continue.
./spark_runner/run/spark_data_gen_12345.py -> YOUR_DBRICKS_ACCOUNT/workspace/PROJECT_NAME/spark_data_gen_12345

and if you look at the Databricks UI as a monitor, you will convince yourself that the notebooks are indeed created inside its Workspace:

Submit and log Job via Job API

At this point, you can, of course, run the job as a notebook directly from the Databricks UI, but this is not our goal. We want to run the job from the local and now Job API comes into play. The following script is handling this task. Essentially, there are three steps (1) go through the local job folder to find all the notebook jobs we just created in the previous step and replace the local directory with the remote Workspace directory as the target job directory. (2) submit job requests via API (3) get the job running status every few seconds via API. By the way, to make your job secure, it is a good practice not to expose your credentials to the program.

executenotebook_allpurpose_cluster.py

#!/usr/bin/python3
import json
import requests
import os
import time
from spark_common.utils import get_security_value, get_cluster_id


def main(user="",
token_file="",
cluster_name="",
workspacepath="",
outfilepath=""):

# Generate array from walking local path
localpath = "%s/run" % os.path.dirname(os.path.realpath(__file__))

token = get_security_value(token_file, user, "token")
server = get_security_value(token_file, user, "server")
clusterid = get_cluster_id(token_file, user, cluster_name)
notebooks = []
for path, subdirs, files in os.walk(localpath):
for name in files:
fullpath = path + '/' + name
# removes localpath to repo but keeps workspace path
fullworkspacepath = workspacepath + path.replace(localpath, '')

name, file_extension = os.path.splitext(fullpath)
if file_extension.lower() in ['.scala', '.sql', '.r', '.py']:
row = [fullpath, fullworkspacepath, 1]
notebooks.append(row)

# run each element in list
for notebook in notebooks:
nameonly = os.path.basename(notebook[0])
workspacepath = notebook[1]

name, file_extension = os.path.splitext(nameonly)

# workpath removes extension
fullworkspacepath = workspacepath + '/' + name

print('Running job for:' + fullworkspacepath)
timeout = 3600
values = {'run_name': name, 'existing_cluster_id': clusterid, 'timeout_seconds': timeout, 'notebook_task': {'notebook_path': fullworkspacepath}}

resp = requests.post(server + '/api/2.0/jobs/runs/submit',
data=json.dumps(values),
auth=("token", token)
)
print(server + '/api/2.0/jobs/runs/submit')
print(values)
runjson = resp.text
print("runjson:" + runjson)
d = json.loads(runjson)
runid = d['run_id']

i=0
waiting = True
while waiting:
time.sleep(10)
jobresp = requests.get(server + '/api/2.0/jobs/runs/get?run_id='+str(runid),
data=json.dumps(values),
auth=("token", token)
)
jobjson = jobresp.text
print("jobjson:" + jobjson)
j = json.loads(jobjson)
current_state = j['state']['life_cycle_state']
runid = j['run_id']
if current_state in ['TERMINATED', 'INTERNAL_ERROR', 'SKIPPED'] or i >= timeout/10:
break
i=i+1

if outfilepath != '':
file = open(outfilepath + '/' + str(runid) + '.json', 'w')
file.write(json.dumps(j))
file.close()


if __name__ == '__main__':
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--token_file")
parser.add_argument("--user")
parser.add_argument("--cluster_name", help="cluster alias name")
parser.add_argument("--workspacepath", help="workspacepath)
parser.add_argument("--outfilepath", default="%s/joblogs" % os.path.dirname(os.path.realpath(__file__)), help="outfilepath for writing response")
args = parser.parse_args()
args = vars(args)

main(**args)

if you run it, you should get the following response (I have masked a few sensitive information) in your log every 10 seconds, you can also monitor the job status and some interesting Spark metrics via Databricks UI with the job id returned by the API. By the end of the log, you may notice that I have a clear_localnotebooks.sh, this is a simple script to remove all local notebooks and log this deletion so that next time no conflicting or redundant jobs would still exist.

runjson:{"run_id":11625}jobjson:{"job_id":4317,"run_id":11625,"number_in_job":1,"state":{"life_cycle_state":"RUNNING","state_message":"In run"},"task":{"notebook_task":{"notebook_path":"XXXXX"}},"cluster_spec":{"existing_cluster_id":"XXXXX"},"cluster_instance":{"cluster_id":"XXXXX","spark_context_id":"XXXXX"},"start_time":1586474824371,"setup_duration":2000,"execution_duration":0,....."run_type":"SUBMIT_RUN"}...jobjson:{"job_id":4317,"run_id":11625,"number_in_job":1,"state":{"life_cycle_state":"TERMINATED","result_state":"SUCCESS","state_message":""},......,"run_type":"SUBMIT_RUN"}bash clear_localnotebooks.sh

Now the machine learning jobs available from your old codebase can be scripted, submitted, run and logged using a similar interface as your original codebase did. You have a very similar script to create your job, then you can have a simple bash script to manage your job at remote clusters.

Summary

The following workflow chart summarizes everything: essentially, we have a UDF layer to transform the codebase to adapt to the PySpark environment, we have a job manager to emulate the job creation as your old codebase did and finally we use CLI and API to manage the remote jobs and clusters.

All these tasks can be simply achieved by the following 6 lines of bash scripts, and more importantly, a user only needs to interact with create_notebooks.sh to create and register jobs as he/she should always do. With all other components entirely hidden, the user only needs to call the ML codebase at local and gets the job done at the remote cluster.

configure-allpurpose-databricks:
bash sh/uninstall.sh
bash sh/install.sh
create-notebooks:
bash sh/create_notebooks.sh
bash sh/workspace.sh
execute-allpurpose-jobs:
bash sh/execute_notebooks.sh
bash sh/clear_localnotebooks.sh

Next blog, I am going to talk about how to manage a job cluster and submit jobs right there. A Job cluster is a much cheaper setting than an all-purpose cluster and hence is considered to be the best candidate for production jobs. If you understand the infrastructure I discussed in this and the previous sections, it should be pretty straightforward to combine them together and come up with a good strategy.

About Punchh

Headquartered in San Mateo, CA, Punchh is the world leader in innovative digital marketing products for brick and mortar retailers, combining AI and machine learning technologies, mobile-first expertise, and Omni-Channel communications designed to dramatically increase lifetime customer value. Leading global chains in the restaurant, health and beauty sectors rely on Punchh to grow revenue by building customer relationships at every stage, from anonymous, to known, to brand loyalists, including more than 100 different chains representing more than $12 billion in annual spend.

About the Author

Dr. Tian Lan is Tech Lead of A.I. at Punchh, where he leads the development of large-scale and distributed machine learning for recommender systems and personalized marketing.

--

--

Punchh Technology Blog

Punchh is a marketing & data platform. In the blog site, we will share our learnings from data and technology.