# Example Azure Function for ingesting survey data from Qualtrics API. # Demonstrates API polling, JSON flattening, and automated loading into SQL Server. # (Connection credentials and tokens are securely stored and omitted here.) # This function is not intended to be invoked directly. Instead it will be # triggered by an orchestrator function. import logging import azure.functions as func import requests import zipfile import json import io, os import sys import re import codecs import pandas as pd import string from datetime import datetime, timedelta from . import rando, config from sqlalchemy import create_engine #AzureSQL query and used by Pandas import urllib #sqlalchemy connection string def main(content: dict) -> func.HttpResponse: logging.info('Python HTTP trigger function processed a request to retrieve survey data.') try: surveyId = content['surveyId'] except KeyError: logging.info('Exception: SurveyId not provided to qualdlActivity') raise Exception('SurveyId not provided to qualdlActivity') if surveyId: output = exportSurvey(config.apiToken, surveyId, dataCenter) return output else: return 'Please pass a surveyId as {"surveyId": ""} in the request body' def sanitize(s): # For use below to clean incoming free-text responses, remove # punctuation like commas that mess up the CSV output # Base string.punctuation yields '''!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~''' # so, modified below slightly PUNCTUATION = r'''!"#$%&()*+,-/:;<=>?@[\]^_`{|}~''' return str(s).translate(str.maketrans('', '', PUNCTUATION)).strip() def exportSurvey(apiToken: str, surveyId: str, dataCenter: str) -> str: # Calls Qualtrics REST API to retrieve survey # results in CSV form as a binary byte stream # Static params fileFormat = "json" requestCheckProgress = 0.0 progressStatus = "inProgress" baseUrl = "https://{0}.qualtrics.com/API/v3/surveys/{1}/export-responses/".format(dataCenter, surveyId) headers = { "content-type": "application/json", "x-api-token": apiToken, } # For a connection to the Sentiment DB server = config01['server'] database = config01['database'] username = config01['username'] password = config01['password'] # Step 1: Creating Data Export downloadRequestUrl = baseUrl downloadRequestPayload = '{"format":"' + fileFormat + '"}' downloadRequestResponse = requests.request("POST", downloadRequestUrl, data=downloadRequestPayload, headers=headers) progressId = downloadRequestResponse.json()["result"]["progressId"] # Step 2: Checking on Data Export Progress and waiting until export is ready while progressStatus != "complete" and progressStatus != "failed": print ("progressStatus=", progressStatus) requestCheckUrl = baseUrl + progressId requestCheckResponse = requests.request("GET", requestCheckUrl, headers=headers) requestCheckProgress = requestCheckResponse.json()["result"]["percentComplete"] print("Download is " + str(requestCheckProgress) + " complete") progressStatus = requestCheckResponse.json()["result"]["status"] #step 2.1: Check for error if progressStatus == "failed": raise Exception("Qualtrics export failed.") fileId = requestCheckResponse.json()["result"]["fileId"] # Step 3: Downloading file requestDownloadUrl = baseUrl + fileId + '/file' requestDownload = requests.request("GET", requestDownloadUrl, headers=headers, stream=True) # Step 4: Unzip the file, load json, denormalize, return as csv with zipfile.ZipFile(io.BytesIO(requestDownload.content)) as zf: for filename in zipfile.ZipFile.namelist(zf): try: data = zf.read(filename).decode('utf-8') except KeyError: raise Exception('ERROR: Did not find %s in zip file' % filename) else: json_as_dict = json.loads(data) # The following determines most of the output schema # called within the meta_fields = ['recipientFirstName', 'recipientLastName', 'recipientEmail', 'startDate', 'endDate', 'status', 'ipAddress', 'progress', 'duration', 'finished', 'recordedDate', '_recordId', 'locationLatitude', 'locationLongitude', 'distributionChannel', 'userLanguage', 'externalDataReference', # The following are created as Embedded Data and # are not part of the standard Qualtrics schema 'imsid', 'ultimateId', 'ultimateName', 'totalArr', 'eventCount', 'source', 'eventId', 'description', 'runId', 'runTimestamp', 'runDateRangeMin', 'runDateRangeMax', 'salesChannel', 'renewalDate', 'siteId', 'category1' ] # PRIMARY routine to flatten each response into N individual records # representing each question/answer pair defined in the survey. # A key is concocted from the survey, response and question ids then # the values, metadata (meta_fields above), and labels are added. data = [] #will contain the table for response in json_as_dict['responses']: #from the D/L, for each survey response for displayedField in response['displayedFields']: #denormalize each to KVP records d = {} #will contain each record d['surveyId'] = surveyId d['responseId'] = response['responseId'] d['questionId'] = displayedField d['answerKey'] = '-'.join([d['surveyId'], d['responseId'], d['questionId']]) try: d['questionValue'] = sanitize(response['values'][displayedField]) #the above maps punctuation to None, then strips whitespace except KeyError: d['questionValue'] = '' try: d['valueLabel'] = response['labels'][displayedField] except KeyError: d['valueLabel'] = '' try: d['statusLabel'] = response['labels']['status'] except KeyError: d['statusLabel'] = '' try: d['finishedLabel'] = response['labels']['finished'] except KeyError: d['finishedLabel'] = '' for field in meta_fields: try: d[field] = response['values'][field] except KeyError: d[field] = '' d['description'] = sanitize(d['description']) #to fix commas in user entered descriptions messing up CSV files data.append(d) #write record to table dict # Create DataFrame from dict df = pd.DataFrame(data=data) params = urllib.parse.quote_plus('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password) engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params) df.to_sql('QualDL', engine, schema='SRC', if_exists='replace', index=False) return True