Seamless migration of machine learning codebase from the local or EC2 to Databricks PySpark Clusters(Part IV: Job clusters API)

Punchh Technology Blog
4 min readApr 10, 2020

Author: Tian Lan

The job cluster is created ad-hoc when you run a job and the cluster is terminated when the job is complete. You cannot restart a job cluster. While the all-purpose cluster is great for development, testing, and coding collaboration, the one-time job cluster is significantly cheaper to run than an all-purpose cluster. Therefore the job cluster is especially suitable for production jobs.

To review the previous sections of this topic, here is the Table of Contents for the related blogs:

  1. Write the Spark User-Defined Function and codebase adaptor (Part I link)
  2. Manage Databricks all-purpose clusters via CLI and API (Part II link)
  3. Build, synchronize and submit job scripts at local (Part III link)
  4. Manage Databricks job clusters exclusively via CLI and API (Part IV)

Compared with the all-purpose cluster, managing a job cluster and submitting jobs is actually a bit simpler. You do not need to worry about the library conflict issue from the previously installed packages because the cluster is new, so you do not need to spend time detecting or uninstalling any package. You can embed your library installation request inside the job submission. As for job creation, there is no difference at all.

Package the Codebase and Copy to DBFS

So the first step is packaging the machine learning codebase as we did before, then we just push them to DBFS

update_wheel.sh

whl_file={modulename}-${version}-py3-none-any.whl
rm -rf dist
python setup.py sdist bdist_wheel
dbfs cp --overwrite ./dist/${whl_file} ${dbfs_dir}

New Cluster Setup

We need to have a configure so that Databricks knows what kind of job cluster you want to use and a few essential settings including AWS attributes. You also need to specify what packages you need to install (such as PyPI packages and wheel file you just uploaded). Here I wrote a dblibrary module to manage it, essentially it just returns all required libraries in a JSON format following the Databricks API convention. That’s it, the rest of the code is the same as executenotebook_allpurpose_cluster.sh

executenotebook_job_cluster.py

#!/usr/bin/python3
import json
import requests
import os
import time
from spark_runner import dblibrary
from spark_common.utils import get_security_value
# the bold highlight is the Json configure for Job cluster
CFG = {
"new_cluster": {
"spark_version": "6.4.x-scala2.11",
"node_type_id": "i3.xlarge",
"aws_attributes": {
"availability": "SPOT_WITH_FALLBACK",
"instance_profile_arn": ""
},
"num_workers": 2
},
"spark_conf": {
"spark.sql.crossJoin.enabled": "true",
"spark.databricks.service.server.enabled": "true",
},
"libraries": dblibrary.CFG["libraries"]
}



def main(user="",
token_file="",
node_type=None,
num_workers=None,
workspacepath="",
outfilepath=""):

if node_type is not None:
CFG["node_type_id"] = node_type
if num_workers is not None:
CFG["num_workers"] = num_workers
CFG["new_cluster"]["aws_attributes"]["instance_profile_arn"] = get_security_value(token_file, user, "iam_role")
server = get_security_value(token_file, user, "server")

# Generate array from walking local path
localpath = "%s/run" % os.path.dirname(os.path.realpath(__file__))

token = json.load(open(token_file, "r"))[user]["token"]
notebooks = []
for path, subdirs, files in os.walk(localpath):
for name in files:
fullpath = path + '/' + name
# removes localpath to repo but keeps workspace path
fullworkspacepath = workspacepath + path.replace(localpath, '')

name, file_extension = os.path.splitext(fullpath)
if file_extension.lower() in ['.scala', '.sql', '.r', '.py']:
row = [fullpath, fullworkspacepath, 1]
notebooks.append(row)

# run each element in list
for notebook in notebooks:
nameonly = os.path.basename(notebook[0])
workspacepath = notebook[1]

name, file_extension = os.path.splitext(nameonly)

# workpath removes extension
fullworkspacepath = workspacepath + '/' + name

print('Running job for:' + fullworkspacepath)
timeout = 3600
values = {'run_name': name, 'timeout_seconds': timeout, 'notebook_task': {'notebook_path': fullworkspacepath}}
CFG.update(values)
resp = requests.post(server + '/api/2.0/jobs/runs/submit',
data=json.dumps(CFG),
auth=("token", token)
)
print(server + '/api/2.0/jobs/runs/submit')
print(CFG)
runjson = resp.text
print("runjson:" + runjson)
d = json.loads(runjson)
runid = d['run_id']

i=0
waiting = True
while waiting:
time.sleep(10)
jobresp = requests.get(server + '/api/2.0/jobs/runs/get?run_id='+str(runid),
data=json.dumps(values),
auth=("token", token)
)
jobjson = jobresp.text
print("jobjson:" + jobjson)
j = json.loads(jobjson)
current_state = j['state']['life_cycle_state']
runid = j['run_id']
if current_state in ['TERMINATED', 'INTERNAL_ERROR', 'SKIPPED'] or i >= timeout/10:
break
i=i+1

if outfilepath != '':
file = open(outfilepath + '/' + str(runid) + '.json', 'w')
file.write(json.dumps(j))
file.close()


if __name__ == '__main__':
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--token_file")
parser.add_argument("--user")
parser.add_argument("--node_type", help="node type, such as i3.xlarge")
parser.add_argument("--num_workers", help= "num of workers", type=int)
parser.add_argument("--workspacepath", help="workspacepath")
parser.add_argument("--outfilepath", default="%s/joblogs" % os.path.dirname(os.path.realpath(__file__)), help="outfilepath for writing response")
args = parser.parse_args()
args = vars(args) # convert to dict

main(**args)

The following commands finish the entire workflow from job creation, cluster management, and job submission. Here run_job_cluster.sh calls the executenotebook_job_cluster.py, and the rest have been discussed in previous blogs already. Again, we see that a user only needs to interact with create_notebooks.sh to create and register jobs as he/she should always do. With all other components entirely hidden, the user only needs to call the ML codebase at local and gets the job done at the remote cluster.

execute-adhoc-jobs:
bash sh/create_notebooks.sh
bash sh/workspace.sh
bash sh/update_wheel.sh
bash sh/run_job_cluster.sh
bash sh/clear_localnotebooks.sh

And you should get the following response log (I have masked sensitive information). At the moment when the job finished, the cluster will be immediately terminated.

jobjson:{"job_id":4318,"run_id":11641,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"XXXXXX"}},"cluster_spec":{"new_cluster":{"spark_version":"6.4.x-scala2.11","aws_attributes":{"zone_id":"us-east-1e","availability":"SPOT_WITH_FALLBACK","instance_profile_arn":"XXXXXXXXXX"},"node_type_id":"i3.xlarge","enable_elastic_disk":false,"num_workers":2},"libraries":[{"pypi":{"package":"scikit-learn==0.22.1"}},{"pypi":{"package":"s3fs"}},{"pypi":{"package":"pymysql"}},{"pypi":{"package":"scipy"}},{"pypi":{"package":"pandas"}},.......]},"cluster_instance":{"cluster_id":"XXXXX"},"start_time":1586500427510,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"XXXXXX","run_name":"XXXXXXXXXX","run_page_url":"XXXXXX","run_type":"SUBMIT_RUN"}......
......
jobjson:{"job_id":4318,"run_id":11641,"number_in_job":1,"state":{"life_cycle_state":"RUNNING","state_message":"In run"}......jobjson:{"job_id":4318,"run_id":11641,"number_in_job":1,"state":{"life_cycle_state":"TERMINATED","result_state":"SUCCESS","state_message":""}

About Punchh

Headquartered in San Mateo, CA, Punchh is the world leader in innovative digital marketing products for brick and mortar retailers, combining AI and machine learning technologies, mobile-first expertise, and Omni-Channel communications designed to dramatically increase lifetime customer value. Leading global chains in the restaurant, health and beauty sectors rely on Punchh to grow revenue by building customer relationships at every stage, from anonymous, to known, to brand loyalists, including more than 100 different chains representing more than $12 billion in annual spend.

About the Author

Dr. Tian Lan is Tech Lead of A.I. at Punchh, where he leads the development of large-scale and distributed machine learning for recommender systems and personalized marketing.

--

--

Punchh Technology Blog

Punchh is a marketing & data platform. In the blog site, we will share our learnings from data and technology.