Bash script to upload from Google Cloud Storage to Google BigQuery

The following Bash script is using the Google bq.py tool to upload logs files (statistics) from Google Cloud to Google BigQuery. The script uploading several logs files to one table and has the ability to manipulate the data and insert it to new table at BigQuery.
For uploading the logs to the Google Cloud please visit bash script to upload logs to google cloud storage/.

Files are stored in directories in the following pattern: backet/environment/log_type/Ymd_H/.
The script uploads all files in a Cloud directory to a table in Google Bigquery based on the path environment name and path date.
The script picks the directory that contains the oldest file from all files in the Cloud for this log type. This directory should contain only files that their updated time exceeded some minimum time (to prevent uploading directory that some loggers still write in to it).

  1. Renaming files in order for new running script will not touch these files that already in process.
  2. Get the renamed files list and validate. Call Google Cloud and get all files in that directory in a loop until number elements are as required (sometimes cloud fails to return num elements immediately after they changed).
  3. Create a table in Google Bigquery. The table name will consist of log type, date with hour, current timestamp and a rand number (e.g. 2015_02_15-15_1424349352_1989).
  4. Insert all files logs to the new created table in Google Bigquery and validate number of rows in new table is the same as the sum files rows numbers (each file consist the number of its rows in its name).
  5. Select into permanent hourly table and validate rows count. This step query the table which contains the uploaded logs and insert the computed results into a new table but in different dataset(the same name but different dataset).
  6. Remove old table and move uploaded logs files at Cloud to a different bucket at Cloud with some Time To Live (TTL).
#!/bin/bash
 
###### ABOUT ######
# Author: Ilan Hazan
# This Bash script uploads logs from Google Cloud to Google BigQuery. 
# Files are stored in directories in the following pattern: <backet>/<environment>/<log_type>/<Ymd_H>/
# The script uploads all files in a Cloud directory to a table in Google Bigquery based on the path environment name and path date.
# The script picks the directory that contains the oldest file from all files in the Cloud for this log type. This directory should contain only files that their updated time exceeded some minimum time (to prevent uploading directory that some loggers still write in to it).
# 1. Renaming files in order for new running script will not touch these files that already in process.
# 2. Get the renamed files list and validate. Call Google Cloud and get all files in that directory in a loop until number elements are as required (sometimes cloud fails to return num elements immediately after they changed).
# 3. Create a table in Google Bigquery. The table name will consist of log type, date with hour, current timestamp and a rand number (e.g. <log_type>2015_02_15-15_1424349352_1989).
# 4. Insert all files logs to the new created table in Google Bigquery and validate number of rows in new table is the same as the sum files rows numbers (each file consist the number of its rows in its name).
# 5. Select into permanent hourly table and validate rows count. This step query the table which contains the uploaded logs and insert the computed results into a new table but in different dataset(the same name but different dataset). 
# 6. Remove old table and move uploaded logs files at Cloud to a different bucket at Cloud with some Time To Live (TTL).  
# 
 
##### MUST DEFINE GLOBALS BEFORE CALLING THIS SCRIPT #####
#DB="statsqa"
#Subject="fictivious"
#SqlToRun="select * from FromTableHoldPlace"
###############################
 
# gs://dont_touch_it/statsqa/fictivious/2015_02_15-23/nginx_fictivious.23.log.6099.2015_02_15.logger-i-cb452c35.csv.candidate
 
##### FUNCTIONS GLOBALS #####
GSUTILPath="/opt/anyclip/gsutil/gsutil"
BQPath="/opt/anyclip/<%=node["anyclip"]["bigquery"]["filebase"]%>/bq.py"
SchemaPath="/opt/anyclip/bin/"
 
###### CONFIGURATIONS GLOBALS ######
MinTimeUpdatedDiff=$((60*20))
CloudCandidateFilesExtension="candidate"
CloudReadyFilesExtension="gz"
TempBacket="dont_touch_it"
# plaese note that the ArchiveBacket has TTL of 14 days
ArchiveBacket="archive_3rd_party"
TempDataset="${DB}uploads"
FinalDataset="${DB}hourly"
SleepTime="2m"
LoopTries=3
NumIterationsOnCloudDirectories=10
Schema="$SchemaPath$Subject.schema"
CloudPath="gs://$TempBacket/$DB/$Subject"
# In sql you should put this string instead of table name
FromTableHoldPlace="FromTableHoldPlace"
 
 
######## FUNCTIONS ###########
 
 
# these are essentails for the fail function
# $(funcname) creates a shubshell. "exit 1" aborts that subshell and not the primary one. 
trap "exit 1" TERM
export TOP_PID=$$
 
function fail() {
    exitcode=$1
    shift
    echo "exited with level $exitcode, error was '$@'" >&2
    kill -s TERM $TOP_PID
}
 
 
 
# Output logs to Stderr with date 
# $1 output string
# Return: None
function myLogger(){
	echo "$(date) $1 " >&2
}
 
# Convert date as string to seconds from Epoch
# $1 date as String
# Return: date as num seconds from Epoch 
function date2stamp () {
    date --utc --date "$1" +%s
}
 
 
 
# Calculate the number of days between 2 dates
# Usage:
# -s in sec. | -m in min. | -h in hours  | -d in days (default)
#    dateDiff -s "2006-10-01-01" "2006-10-03-01"
#    dateDiff -m "2006-10-01-01" "2006-10-03-01"
#    dateDiff -h "2006-10-01-01" "2006-10-03-01"
#    dateDiff -d "2006-10-01-01" "2006-10-03-01"
#    dateDiff  "2006-10-01-01" "2006-10-03-01"
dateDiff (){
    case $1 in
        -s)   sec=1;      shift;;
        -m)   sec=60;     shift;;
        -h)   sec=3600;   shift;;
        -d)   sec=86400;  shift;;
        *)    sec=86400;;
    esac
    dte1=$(date2stamp $1)
    dte2=$(date2stamp $2)
    diffSec=$((dte2-dte1))
    if ((diffSec < 0)); then abs=-1; else abs=1; fi
    echo $((diffSec/sec*abs))
}
 
# extract Date part from Cloud File's details
# $1 the Cloud File's details
# Return: date part
function getDateFromRow(){
	local line=$1
	local ret=$(echo "$line" | awk -F '[\t ]+' '{ print $2 }')
	echo $ret
}
 
# extract file path part from Cloud File's details
# $1 the Cloud File's details
# Return: file path part
function getFilePathFromRow(){
	local line=$1
	local ret=$(echo "$line" | awk -F '[\t ]+' '{ print $3 }')
	echo $ret
}
 
# extract num rows from file name (it is written in the middle of the file name)
# $1 the file name
# Return: num rows
function getRowCountFromFilePath(){
	local line=$1
	local ret=$(echo "$line" | awk -F '.' '{ print $4 }')
	echo $ret
}
 
 
# get the file details line which describes the LEAST updated file 
# $1 cloud files list
# Return: the file details line which describes the least updated file 
function getMinDateLineFromCloudList(){
	local cloudFilesList=$1
	# get current epoch date by seconds
	local minDate=$(date --utc   +%s)
	local tmpLineDateTxt=""
	local tmpLineDate=0
	local retLine=""
 
	# need to get the least updated file
	while read -r line; do
		# get time from file
		#myLogger "Line: $line"
		tmpLineDateTxt=$(getDateFromRow "$line")
		#myLogger "date is $tmpLineDateTxt Line: $line "
		tmpLineDate=$(date2stamp $tmpLineDateTxt)
		if (( tmpLineDate < minDate )); then
			minDate=$tmpLineDate
			retLine=$line
		fi
		done <<< "$cloudFilesList"
	echo "$retLine"
}
 
# get the file details line which describes the MOST updated file 
# $1 cloud files list
# Return: the file details line which describes the most updated file 
function getMaxDateLineFromCloudList(){
	local cloudFilesList=$1
	# get current epoch date by seconds
	local maxDate=0
	local tmpLineDateTxt=""
	local tmpLineDate=0
	local retLine=""
 
	# need to get the least updated file
	while read -r line; do
		# get time from file
		#myLogger "Line: $line"
		tmpLineDateTxt=$(getDateFromRow "$line")
		#myLogger "date is $tmpLineDateTxt Line: $line "
		tmpLineDate=$(date2stamp $tmpLineDateTxt)
		if (( tmpLineDate > maxDate )); then
			maxDate=$tmpLineDate
			retLine=$line
		fi
		done <<< "$cloudFilesList"
	echo "$retLine"
}
 
# extract file directory from Cloud File's details
# $1 the Cloud File's details
# Return: file directory
function getCloudDirectoryFromRow(){
	local row=$1
	local filePath=$(getFilePathFromRow "$row")
	myLogger "filePath $filePath"
	echo $(dirname "$filePath")
}
 
# remove last extension of a file name
# $1 the file name
# Return: the file name without last extension
function removeLastExtension(){
	local fileName=$1
	echo ${fileName%.*}
}
 
# Go over Cloud files list and replace the files extension
# $1 the list of files
# $2 the new extension
# Return: None 	
function replaceFilesExtension(){
	local cloudFilesList=$1
	local newExtension=$2
 
	local filePath=""
	local filePathWithoutExtension=""
	local newFilePath=""
 
	while read -r line; do
		filePath=$(getFilePathFromRow "$line")
		filePathWithoutExtension=$(removeLastExtension "$filePath")
		newFilePath="$filePathWithoutExtension.$newExtension"
		$GSUTILPath mv "$filePath" "$newFilePath"
		done <<< "$cloudFilesList"
}
 
# Sum the num rows from all the Cloud file list 
# $1 the list of files
# Return: the summary
function getRowCountFromCloudList(){
	local cloudFilesList=$1
	local retSum=0
	local tmpRowNum=0
	while read -r line; do
		FilePath=$(getFilePathFromRow "$line")
		tmpRowNum=$(getRowCountFromFilePath "$FilePath")
		((retSum=retSum+tmpRowNum))
		done <<< "$cloudFilesList"
	echo "$retSum"
}
 
# Get number of elements in Cloud file list 
# $1 the list of files
# Return: count
function getNumElementsFromCloudList(){
	local cloudFilesList=$1
	local retCount=0
	while read -r line; do
		((retCount=retCount+1))
		done <<< "$cloudFilesList"
	echo "$retCount"
}
 
# Validate that two cloud list num elements are equal
# $1 the first list of files
# $1 the second list of files
# Return: None
# Fails: in case the counts are not equal
function validateElementsNumberInLists(){
	local cloudFilesList1=$1
	local cloudFilesList2=$2
	local count1=0
	local count2=0
 
	count1=$(getNumElementsFromCloudList "$cloudFilesList1")
	count2=$(getNumElementsFromCloudList "$cloudFilesList2")
 
	if [ "$count1" -ne "$count2" ] ; then
		myLogger "Error: the two cloud list num elements are NOT equal."
		myLogger "First list ($count1): $cloudFilesList1"
		myLogger "Second list ($count2): $cloudFilesList2"
		fail 1 "Error: the two cloud list num elements are NOT equal: $count1 $count2"
	else
		myLogger "The first cloud list num elements is $count1 while the second list num elements is: $count2."
	fi
}
 
 
 
# extract files names (with paths) from Cloud file list and return as one line comma separated list
# $1 the list of files
# Return: comma separated files list 
function getCommaSeperatedFilesFromCloudList(){
	local cloudFilesList=$1
	local retList=""
	local seperator=""
	local filePath=""
 
	myLogger "The list is: $cloudFilesList"
	while read -r line; do
		filePath=$(getFilePathFromRow "$line")
		retList+="$seperator$filePath"
		seperator=","
		done <<< "$cloudFilesList"
	myLogger "The comma separated list is: $retList"
	echo "$retList"
}
 
 
# get BigQuery table details
# $1 table name (including dataset)
# Return: table details
function getShowTableAsJson(){
	local tableName=$1
	local bqText="";
	local responseCode=0
 
	bqText=$($BQPath --format=json show "$tableName")
	responseCode=$?
	#myLogger "getShowTableAsJson: BQ returned Code: $responseCode  text: $bqText"
	echo "$bqText"
	return $responseCode
}
 
# Get field value from JSON string
# $1 Json string
# $2 field name
# Return: field value
function extractValueFromJson(){
	local jsonStr=$1
	local fieldName=$2
	# GNU grep has the -P option for perl-style regexes, and the -o option to print only what matches the pattern. 
	# The \K is the short-form (and more efficient form) of (?<=pattern) which you use as a zero-width look-behind assertion before the text you want to output. (?=pattern) can be used as a zero-width look-ahead assertion after the text you want to output.
	echo $(echo $jsonStr | grep -Po "\"$fieldName\":\"\K[0-9]+(?=\")")
}
 
# Get BigQuery table's num rows
# $1 table name (including dataset)
# Return: table rows number
# Fails: in case of error
function getBQTableNumRows(){
	local tableName=$1
	local loopTries=$2
	local sleepTime=$3
 
	local ret=0
	local bqText=""
	local returnedResponse=0
 
	bqText=$(getShowTableAsJson "$tableName")
	returnedResponse=$?
	local var=0
	while [ $returnedResponse -ne 0 -a $var -lt $loopTries ]; do
			((var=var+1))
			myLogger "Error: table $tableName fail to get its num rows. response code: $returnedResponse with message: $bqText for $var time"
			sleep $sleepTime
			bqText=$(getShowTableAsJson "$tableName")
			returnedResponse=$?
	done
 
	if [[ $returnedResponse -ne 0 ]] ; then
		myLogger "Error: table $tableName fail to get its num rows. response code: $returnedResponse with message: $bqText"
		fail $returnedResponse "Error: table $tableName fail to get its num rows."
	fi
	ret=$(extractValueFromJson "$bqText" "numRows")
	echo $ret
}
 
 
 
# Get BigQuery table's num rows
# $1 table name (including dataset)
# Return: table rows number. In case of error return 0
function getSafeBQTableNumRows(){
	local tableName=$1
	local ret=0
	local bqText="";
	local responseCode=0
 
	bqText=$(getShowTableAsJson "$tableName")
	responseCode=$?
	if [[ $responseCode -ne 0 ]] ; then
		ret=0
	else
		ret=$(extractValueFromJson "$bqText" "numRows")
	fi
	echo $ret
}
 
# Validate that a BigQuery table exist
# $1 table name (including dataset)
# Return: None
# Fails: in case the table is not exist or in case of error
function validateBQTableNotExist(){
	local tableName=$1
	local bqText="";
	local responseCode=0
 
	bqText=$(getShowTableAsJson $tableName)
	responseCode=$?
	myLogger "validateBQTableNotExist: getShowTableAsJson returned Code for table $tableName: $responseCode  text: $bqText"
	if [[ $responseCode -eq 0 ]]; then
		myLogger "Error: table $tableName exist but it shouldnt be"
		fail 1 "Error: table $tableName exist but it shouldnt be"
	fi
}
 
# Create new BigQuery table
# $1 table name (including dataset)
# $2 dataset 
# Return: None
# Fails: in case the table already exist
function createNewTable(){
	local tableName=$1
	local schema=$2
	local loopTries=$3
	local sleepTime=$4
 
	local bqText=""
	local returnedResponse=0
 
	myLogger "Going to run: $BQPath mk $tableName $schema"
	bqText=$($BQPath mk $tableName $schema)
	returnedResponse=$?
	local var=0
	while [ $returnedResponse -ne 0 -a $var -lt $loopTries ]; do
			((var=var+1))
			myLogger "Error: create new table $tableName failed with response code $returnedResponse and with message: $bqText for $var time"
			sleep $sleepTime
			bqText=$($BQPath mk $tableName $schema)
			returnedResponse=$?
	done
 
	myLogger "createNewTable: BQ returened Code: $returnedResponse  text: $bqText"
	if [[ $returnedResponse -ne 0 ]] ; then
		myLogger "Error: create new table $tableName failed with response code $returnedResponse and with message: $bqText"
		fail $returnedResponse "Error: create new table $tableName failed: $bqText"
	fi
}
 
# Validate BigQuery table's row count
# $1 table name (including dataset)
# $2 desired row count
# Return: None
# Fails: in case row count not match
function validateRowCount(){
	local tableName=$1
	local cloudRowCount=$2
	local loopTries=$3
	local sleepTime=$4
 
	local tableRowCount=0
	tableRowCount=$(getBQTableNumRows "$tableName" $loopTries $sleepTime)
	if [[ $tableRowCount -ne $cloudRowCount ]]; then
		myLogger "Error: Row count is not equal after insertion. Found: $tableRowCount Should be: $cloudRowCount"
		fail 1 "Error: Row count is not equal after insertion. Found: $tableRowCount Should be: $cloudRowCount"
	fi
	myLogger "validateRowCount: the BQ table $tableName has $tableRowCount rows as expected"
}
 
# Insert list of files into BiGQuery table. Also validates row count
# $1 table name (including dataset)
# $2 Cloud Files list
# $3 loop tries - number of tries before declaring failure
# $4 sleep time - sleep between tries
# Globals:
#     BQPath - 
# Return: None. 
# Fail: upon failure or if row count not match
# Note: table name must be unique. It is used as job_id
function insertListToBigQueryTable (){
	local tableName=$1
	local cloudFilesList=$2
	local loopTries=$3
	local sleepTime=$4
 
	myLogger "before making comma separated list is: $cloudFilesList"
	# Must do comma separated list of files instead of using wild chars because it can be a bug that if two processes rename the files together they may upload the same files. 
	# In comma separated list, each process has its own list (*.candidate ) that it works on.
	local commaSeperatedFiles=$(getCommaSeperatedFilesFromCloudList "$cloudFilesList")
	local totalRowCount=$(getRowCountFromCloudList "$cloudFilesList")
 
	myLogger "Comma separated list: $commaSeperatedFiles"
	myLogger "Total row count: $totalRowCount"
	# prepare Job_Id. The name of the table with the dataset is unique
	local jobId=${tableName//./_}
	local tableRows=0
	# Notes:
	# code 1: BigQuery error in load operation: Internal Error
	# code 2: 
	#	- BigQuery error in load operation: Already Exists: Job
	# 	- BigQuery error in load operation: Error processing job ...: Not found: URI
	#waiting on real job returns 0. Even though the process before failed because files not found.  
	#waiting on non existing job returns 2
	local returnedResponse=0
	local var=0
 
	myLogger "Going to run: $BQPath --job_id=${jobId}_${var} -q load --source_format=CSV $tableName $commaSeperatedFiles "
	$BQPath --job_id="${jobId}_${var}" -q load --source_format=CSV "$tableName" "$commaSeperatedFiles"
	returnedResponse=$?
	myLogger "response code from BQ upload is: $returnedResponse"
	while [ $returnedResponse -ne 0 -a $returnedResponse -ne 2 -a $var -lt $loopTries ]; do
		((var=var+1))
		myLogger "Got $var time error from BQ load with response code $returnedResponse. Going to sleep for $sleepTime"
		sleep $sleepTime
		myLogger "Going to run: $BQPath --job_id=${jobId}_${var} -q load --replace --source_format=CSV $tableName $commaSeperatedFiles "
		$BQPath --job_id="${jobId}_${var}" -q load --replace --source_format=CSV "$tableName" "$commaSeperatedFiles"
		returnedResponse=$?
	done
	if [[ $returnedResponse -eq 2 ]] ; then
		myLogger "Waiting for job ${jobId}_${var} to finish. Going to run: $BQPath wait ${jobId}_${var}"
		$BQPath wait ${jobId}_${var}
		returnedResponse=$?
		if [[ $returnedResponse -ne 0 ]] ; then
			myLogger "The command $BQPath wait ${jobId}_${var} was failed!"
			fail $returnedResponse "The command $BQPath wait ${jobId}_${var} was failed!"
		fi
	elif [ $returnedResponse -ne 0 ] ; then
		myLogger "uploading to BQ table $tableName was failed"
		fail $returnedResponse "uploading to BQ table $tableName was failed"
	else
		# it was a success
		myLogger "uploading to BQ table $tableName succeeded"
	fi
	# need to validate!
	myLogger "going to validate $tableName num rows is equal to $totalRowCount"
	validateRowCount "$tableName" $totalRowCount $loopTries $sleepTime
	myLogger "table $tableName validated!"
}
 
# Query the BigQuery table and insert results to new table. Also validates row count
# $1 source table name (including dataset)
# $2 destination table name (including dataset)
# $3 loop tries - number of tries before declaring failure
# $4 sleep time - sleep between tries
# Globals:
#     BQPath - 
#     FromTableHoldPlace - the text that is an hold place for table name in sql.
# Return: None. 
# Fail: upon failure or if row count not match
# Note: destination table name must be unique. It is used as job_id
function selectIntoPermanentTable(){
	local sourceTable=$1
	local destinationTable=$2
	local loopTries=$3
	local sleepTime=$4
	# The rawSql not contains the table name which we select from. Need to insert it now.
	local rawSql=$5
	local sqlToRun="${rawSql/$FromTableHoldPlace/$sourceTable}"
	myLogger "Sql to run: $sqlToRun"
 
	local totalDestinationRows=0
	# get lines number from source
	local sourceTableNumRows=$(getBQTableNumRows "$sourceTable" $loopTries $sleepTime)
	# get lines number from destination (0 if not exist)
	local destinationTableNumRowsBeforeInsert=$(getSafeBQTableNumRows "$destinationTable")
	# do select insert	
 
	# prepare Job_Id. The name of the table with the dataset is unique
	local jobId=${destinationTable//./_}
	local returnedResponse=0
	local var=0
	local garbage=""
	# Example: /opt/anyclip/bigquery-2.0.11/bq.py -q query --batch=true --max_rows=1185 --destination_table=statsqauploads.ilan123 'select * from statsqauploads.fictivious20150219_14_1424955307_22410'
 
	myLogger "Going to run: $BQPath --job_id=${jobId}_${var} -q query --batch=true --allow_large_results --destination_table=$destinationTable '$sqlToRun' "
	garbage=$($BQPath --job_id="${jobId}_${var}" -q query --batch=true --allow_large_results --destination_table="$destinationTable" "$sqlToRun" )
	returnedResponse=$?
	myLogger "response code from BQ upload is: $returnedResponse"
	while [ $returnedResponse -ne 0 -a $returnedResponse -ne 2 -a $var -lt $loopTries ]; do
		((var=var+1))
		myLogger "Got $var time error from BQ query with response code $returnedResponse. Going to sleep for $sleepTime"
		sleep $sleepTime
		myLogger "Going to run: $BQPath --job_id=${jobId}_${var} -q query --batch=true --allow_large_results --destination_table=$destinationTable '$sqlToRun' "
		garbage=$($BQPath --job_id="${jobId}_${var}" -q query --batch=true --allow_large_results --destination_table="$destinationTable" "$sqlToRun" )
		returnedResponse=$?
	done
	if [[ $returnedResponse -eq 2 ]] ; then
		myLogger "Waiting for job ${jobId}_${var} to finish. Going to run: $BQPath wait ${jobId}_${var}"
		$BQPath wait ${jobId}_${var}
		returnedResponse=$?
		if [[ $returnedResponse -ne 0 ]] ; then
			myLogger "The command $BQPath wait ${jobId}_${var} was failed!"
			fail $returnedResponse "The command $BQPath wait ${jobId}_${var} was failed!"
		fi
	elif [ $returnedResponse -ne 0 ] ; then
		myLogger "Select insert from $sourceTable to $destinationTable was failed"
		fail $returnedResponse "Select insert from $sourceTable to $destinationTable was failed"
	else
		# it was a success
		myLogger "Select insert from $sourceTable to $destinationTable succeeded"
	fi
 
	# validate
	((totalDestinationRows=sourceTableNumRows+destinationTableNumRowsBeforeInsert))
	local actualDestinationTableNumRows=$(getBQTableNumRows "$destinationTable" $loopTries $sleepTime)
	myLogger "$sourceTable num rows is: $sourceTableNumRows. $destinationTable num rows before insert was $destinationTableNumRowsBeforeInsert and now it is $actualDestinationTableNumRows"
	if [[ "$actualDestinationTableNumRows" -eq "$destinationTableNumRowsBeforeInsert" ]]; then
		# sometimes the select insert produces less results rows from input rows. So need to test that something new inserted and not test for equality. 
		myLogger "Error: Row count after select-insert is not match. Found: $actualDestinationTableNumRows Should be:$totalDestinationRows"
		fail 1 "Error: Row count after select-insert is not match"
	fi
}
 
 
# Delete the BigQuery table. Also validates
# $1 table name (including dataset)
# $2 loop tries - number of tries before declaring failure
# $3 sleep time - sleep between tries
# Globals:
#     BQPath - 
# Return: None. 
# Fail: upon failure
# Note: table name must be unique. It is used as job_id
function removeBigQueryTable(){
	local tableName=$1
	local loopTries=$2
	local sleepTime=$3
 
	# prepare Job_Id. The name of the table with the dataset is unique
	local jobId="delete"${tableName//./_}
	local returnedResponse=0
	local var=0
 
	myLogger "Going to run: $BQPath --job_id=${jobId}_${var} rm -t -f $tableName"
	$BQPath --job_id="${jobId}_${var}" rm -t -f "$tableName"
	returnedResponse=$?
	myLogger "response code from BQ upload is: $returnedResponse"
	while [ $returnedResponse -ne 0 -a $returnedResponse -ne 2 -a $var -lt $loopTries ]; do
		((var=var+1))
		myLogger "Got $var time error from BQ rm with response code $returnedResponse. Going to sleep for $sleepTime"
		sleep $sleepTime
		myLogger "Going to run: $BQPath --job_id=${jobId}_${var} rm -t -f $tableName"
		$BQPath --job_id="${jobId}_${var}" rm -t -f "$tableName"
		returnedResponse=$?
	done
	if [[ $returnedResponse -eq 2 ]] ; then
		myLogger "Waiting for job ${jobId}_${var} to finish. Going to run: $BQPath wait ${jobId}_${var}"
		$BQPath wait ${jobId}_${var}
		returnedResponse=$?
		if [[ $returnedResponse -ne 0 ]] ; then
			myLogger "The command $BQPath wait ${jobId}_${var} was failed!"
			fail $returnedResponse "The command $BQPath wait ${jobId}_${var} was failed!"
		fi
	elif [ $returnedResponse -ne 0 ] ; then
		myLogger "Deleting $tableName was failed"
		fail $returnedResponse "Deleting $tableName was failed"
	else
		# it was a success
		myLogger "Deleting $tableName succeeded"
	fi
 
	# validate. fail if table exist
	myLogger "validating $tableName is not exist"
	validateBQTableNotExist "$tableName"
}
 
 
# Move a Cloud file to another destination. Used to archive files
# $1 source file path (including backet)
# $2 destination folder (including backet). MUST END WITH /
# $3 loop tries - number of tries before declaring failure
# $4 sleep time - sleep between tries
# Globals:
#     GSUTILPath - 
# Return: None.
# Fails: upon failure 
function moveACloudFile(){
	local sourcePath=$1
	local destinationPath=$2
	local loopTries=$3
	local sleepTime=$4
 
	local returnedResponse=0
 
	myLogger "going to run: $GSUTILPath mv $sourcePath $destinationPath "
	$GSUTILPath mv "$sourcePath" "$destinationPath"
	returnedResponse=$?
	local var=0
	while [ $returnedResponse -ne 0 -a $var -lt $loopTries ]; do
			((var=var+1))
			myLogger "Error: $fileName Fail uploading to Google Cloud $filePath with response code $returnedResponse for $var time" ; date
			sleep $sleepTime
			$GSUTILPath mv "$sourcePath" "$destinationPath"
			returnedResponse=$?
	done
 
	if [[ $returnedResponse -ne 0 ]] ; then
		myLogger "Error:  move file $sourcePath to $destinationPath failed with response code $returnedResponse after $loopTries times"
		fail 1 "could not move file $sourcePath to $destinationPath"
	fi
	myLogger "File $sourcePath successfully moved to $destinationPath"
}
 
 
# Move a Cloud file to another destination. Used to archive files
# $1 archive backet
# $2 Cloud Files list
# $3 loop tries - number of tries before declaring failure
# $4 sleep time - sleep between tries 
# Return: None.
# Fails: upon failure 
function archiveCloudFiles(){
	local archiveBacket=$1
	local cloudFilesList=$2
	local loopTries=$3
	local sleepTime=$4
 
	local filePath=""
	local newDirectoryPath=""
	# go over cloud files list and move to arrchive each one of them
	while read -r line; do
		filePath=$(getFilePathFromRow "$line")
		# Take only file directory path, removes the backet name and add the new archive backet
		newDirectoryPath=$(dirname "$filePath")
		newDirectoryPath=${newDirectoryPath#gs://*/}
		newDirectoryPath="gs://$archiveBacket/$newDirectoryPath/"
		moveACloudFile "$filePath" "$newDirectoryPath" $loopTries $sleepTime
		done <<< "$cloudFilesList"
 
}
 
# Get file list from cloud but with loop until getting the required number elements. 
# The loop is to solve situation that cloud not returning update data immediately after changing the data at cloud.
# $1 path to directory at Cloud
# $2 extension of files to search for
# $3 expected elements
# $4 loop tries - number of tries before declaring failure
# $5 sleep time - sleep between tries 
# Return: List of elements.
# Fails: upon failure 
function getFileListWithValidation(){
	local cloudDirPath=$1
	local cloudReadyFilesExtension=$2
	local requiredNumElements=$3
	local loopTries=$4
	local sleepTime=$5
 
	local returnedResponse=0
	local retList=""
	local var=0
 
	myLogger "going to run: $GSUTILPath list -lr  $cloudDirPath/*.$cloudReadyFilesExtension with grep $cloudReadyFilesExtension"
	retList=$($GSUTILPath list -lr  "$cloudDirPath/*.$cloudReadyFilesExtension" | grep "\.$cloudReadyFilesExtension$")
	returnedResponse=$?
	count1=$(getNumElementsFromCloudList "$retList")
	while [[ ($returnedResponse -ne 0 || $count1 -ne $requiredNumElements) && $var -lt $loopTries ]]; do
			((var=var+1))
			myLogger "Error: $fileName Fail getting file list from $cloudDirPath with response code $returnedResponse for $var time" ; date
			sleep $sleepTime
			retList=$($GSUTILPath list -lr  "$cloudDirPath/*.$cloudReadyFilesExtension" | grep "\.$cloudReadyFilesExtension$")
			returnedResponse=$?
			count1=$(getNumElementsFromCloudList "$retList")
	done
 
	if [[ $returnedResponse -ne 0 ]] ; then
		myLogger "Error:  getting list $cloudDirPath/*.$cloudReadyFilesExtension failed with response code $returnedResponse after $loopTries times"
		fail 1 "could not get list $cloudDirPath/*.$cloudReadyFilesExtension"
	fi
	if [[ $count1 -ne $requiredNumElements ]] ; then
		myLogger "Error:  getting $count1 elements from list $cloudDirPath/*.$cloudReadyFilesExtension while required $requiredNumElements"
		fail 1 "could not get $requiredNumElements elements (getting $count1) from list $cloudDirPath/*.$cloudReadyFilesExtension"
	fi
 
	myLogger "validated getting $count1 elements from list $cloudDirPath/*.$cloudReadyFilesExtension while required $requiredNumElements"
	myLogger "File list in function: $retList"
	echo "$retList"
}
 
#########################
# testing crucial globals
if [[ -z "$DB" ]]; then
	myLogger "Error: Should define Global variable: DB"
	exit
fi
if [[ -z "$Subject" ]]; then
	myLogger "Error: Should define Global variable: Subject"
	exit
fi
if [[ -z "$SqlToRun" ]]; then
	myLogger "Error: Should define Global variable: SqlToRun"
	exit
fi
 
 
# Need to test assumptions here!
 
 
 
 
 
 
 
myLogger "Starting Uploading to BigQuery from Cloud script"
 
IterationLoop=0
while [ $IterationLoop -ne $NumIterationsOnCloudDirectories ]; do
	 ((IterationLoop=IterationLoop+1))
	 myLogger "Working on iteration $IterationLoop"
 
	 # Get all flaten files from Cloud. Take only files (remove totals etc...)
	 myLogger "Going to run: $GSUTILPath list -lr  \"$CloudPath/**.$CloudCandidateFilesExtension\" | grep \"\.$CloudCandidateFilesExtension$\""
	 CloudFilesList=$($GSUTILPath list -lr  "$CloudPath/**.$CloudCandidateFilesExtension" | grep "\.$CloudCandidateFilesExtension$")
	 #myLogger "$CloudFilesList"
 
	 # need to test if empty. A string that has only spaces in it will be expanded to an empty string.
	 if [ -z "${CloudFilesList// }" ]; then
		myLogger "CloudFilesList is empty. Exit script."
		exit
	fi
 
	#myLogger "$CloudFilesList"
	LeastUpdatedFileRow=$(getMinDateLineFromCloudList "$CloudFilesList")
	myLogger "The least updated file is: $LeastUpdatedFileRow"
 
	CloudDirPath=$(getCloudDirectoryFromRow "$LeastUpdatedFileRow")
	myLogger "The least updated file directory (CloudDirPath) is $CloudDirPath"
	# The date-hour of the folder is the last directory path
	# e.g. CloudDirPath = gs://dont_touch_it/statsqa/fictivious/2015_02_15-15 
	CloudDirDateWithHour=$(basename $CloudDirPath)
 
	# call Google cloud and get all files in that directory
	DirectoryCandidateFilesList=$($GSUTILPath list -lr  "$CloudDirPath/*.$CloudCandidateFilesExtension" | grep "\.$CloudCandidateFilesExtension$")
	myLogger "The least updated file directory contains the following files (DirectoryCandidateFilesList): $DirectoryCandidateFilesList"
 
	MostUpdatedFileRow=$(getMaxDateLineFromCloudList "$DirectoryCandidateFilesList")
	myLogger "The most updated file in the least updated file directory (MostUpdatedFileRow) is: $MostUpdatedFileRow"
 
	# now get the date and compare to the current date. Use a global of MinTimeUpdatedDiff
	MostUpdatedDateFromRow=$(getDateFromRow "$MostUpdatedFileRow")
	myLogger "MostUpdatedDateFromRow $MostUpdatedDateFromRow"
 
	# BE CAREFULL: CurrentDate is used in the temp table name. Need to create new one for each directory upload!!!
	CurrentDate=$(date --utc   +%s)
	MostUpdatedDateInSeconds=$(date2stamp $MostUpdatedDateFromRow)
	SecondsDiff=$((CurrentDate-MostUpdatedDateInSeconds))
	myLogger "CurrentDate $CurrentDate MostUpdatedDateInSeconds $MostUpdatedDateInSeconds SecondsDiff $SecondsDiff"
 
	if ((SecondsDiff-MinTimeUpdatedDiff < 0)); then
		myLogger "There are no more files older enough to upload. Exit script."
		exit
	fi
 
	# work on that directory
	# table salt. Will be used on candidate files names and on tables names. This is solving situation were several scripts working on the same directory.
	TableSalt=$((  RANDOM % 100000 ))
	UploadFilesExtension="$TableSalt.$CloudReadyFilesExtension"
 
	myLogger "Working on directory: $CloudDirPath with file list: $DirectoryCandidateFilesList"
	# 1. rename files in that directory
	# Go over that directory and rename files extension from $CloudCandidateFilesExtension to $UploadFilesExtension
	myLogger "Going to replace files extensions to *.$UploadFilesExtension"
	replaceFilesExtension "$DirectoryCandidateFilesList" "$UploadFilesExtension"
	myLogger "replaced files extension to *.$UploadFilesExtension"
 
	# 2. getting the renamed files list and validate
	# 2. getting the renamed files
	# call Google cloud and get all files in that directory in a loop until number elements are as required (sometimes cloud fails to return num elements immediately after they changed)
	NumElementsBeforeRenaming=$(getNumElementsFromCloudList "$DirectoryCandidateFilesList")
	DirectoryReadyFilesList=$(getFileListWithValidation "$CloudDirPath" "$UploadFilesExtension" $NumElementsBeforeRenaming $LoopTries $SleepTime)
	myLogger "File list after function getFileListWithValidation is: $DirectoryReadyFilesList"
 
#	DirectoryReadyFilesList=$($GSUTILPath list -lr  "$CloudDirPath/*.$UploadFilesExtension" | grep "\.$UploadFilesExtension$")
#	myLogger "After renaming , the directory contains the following files (DirectoryReadyFilesList): $DirectoryReadyFilesList"
#	# 2.1 validating number of elements in list
#	validateElementsNumberInLists "$DirectoryCandidateFilesList" "$DirectoryReadyFilesList"
#	myLogger "Validated getting the same num elements after renaming extensions"
 
	# 3. create temp unique table name for bigQuery
	# The tables will have the same names but under different dataset
	FinalTableName="${Subject}${CloudDirDateWithHour}_${CurrentDate}_${TableSalt}"
	TempTableName="${Subject}${CloudDirDateWithHour}_${CurrentDate}_${TableSalt}"
	# FinalTableName fictivious2015_02_15-15
	# TempTableName fictivious2015_02_15-15_1424349352_1989
 
	# x. calculate sum num rows for that 
	# IT IS REDUNDANT HERE! it is here for testing this script
	TotalRowCount=$(getRowCountFromCloudList "$DirectoryReadyFilesList")
	myLogger "total row count is: $TotalRowCount"
	myLogger "temp: $TempDataset.$TempTableName   final: $FinalDataset.$FinalTableName"
 
	# 4. insert to table with loops and sleep
	# 4.1 check this kind of table is not exist!!! Fail if exist
	validateBQTableNotExist "$TempDataset.$TempTableName"
	myLogger "validated that $TempDataset.$TempTableName is not exist"
 
	# 4.2 creates new table. fail if already exist!
	createNewTable "$TempDataset.$TempTableName" "$Schema" $LoopTries $SleepTime
	myLogger "table created $TempDataset.$TempTableName with schema $Schema"
 
	# 4.3 insert with loop and sleep. Do it with validation
	myLogger "Going to insert the files into the table $TempDataset.$TempTableName "
	myLogger "File list before calling insertListToBigQueryTable function is: $DirectoryReadyFilesList"
	insertListToBigQueryTable "$TempDataset.$TempTableName" "$DirectoryReadyFilesList" $LoopTries $SleepTime
	myLogger "Done inserting files into the BQ table $TempDataset.$TempTableName "
 
	# 5. select into permanent hourly table and validate rows count
	#####Going to select from BQ table statsqauploads.search20150419_01_1429458098_8961 into another table statsqahourly.search20150419_01_1429458098_8961	
	myLogger "Going to select from BQ table $TempDataset.$TempTableName into another table $FinalDataset.$FinalTableName"
	selectIntoPermanentTable "$TempDataset.$TempTableName" "$FinalDataset.$FinalTableName" $LoopTries $SleepTime "$SqlToRun"
	myLogger "Done inserting data from $TempDataset.$TempTableName into $FinalDataset.$FinalTableName"
 
	# 6. remove old table
	#myLogger "Going to remove table $TempDataset.$TempTableName"
	#removeBigQueryTable "$TempDataset.$TempTableName" $LoopTries $SleepTime
	#myLogger "Done removing table $TempDataset.$TempTableName"
 
	# 7. cleanup
	# mv files to different backet! the archive backet configured to delete old files after 14 days!
	myLogger "Going to archive the files"
	archiveCloudFiles "$ArchiveBacket" "$DirectoryReadyFilesList" $LoopTries $SleepTime
	myLogger "Done archiving"
	# As I tested it, deleting all objects from directory, deletes that directory too. No need to worry about empty directories.
	# I uploaded one/two files to create new directory. I deleted them and the directory was deleted also with all its path.	
 
done
 
myLogger "Stoped Uploading to BigQuery from Cloud script after $IterationLoop loops"
Leave a Reply

*