import sys
sys.path.insert(0,'../lib')
import os
from generallib import *
from pygraphviz import *
graph_path='/home/anaconda/anaconda3/bin'
if not graph_path in os.environ["PATH"]:
os.environ["PATH"] += os.pathsep + graph_path
connection = getConnection()
display(md('# Pipelines Status Dashboard'))
display(md("Compiled "+datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+" UTC."))
def getDagStatus(id):
pass
def computeStyler(node):
node.attr['style'] = 'filled'
node.attr['fillcolor'] = '#A0FFA0'
def storageStyler(node):
node.attr['shape'] = 'circle'
node.attr['style'] = 'filled'
node.attr['fillcolor'] = '#E0E0E0'
def reportStyler(node):
node.attr['shape'] = 'square'
def dagStyler(node):
pass
def spStyler(node):
node.attr['shape'] = 'circle'
def rdbmsStyler(node):
pass
def gdriveStyler(node):
node.attr['fillcolor'] = '#A0A0FF'
def jlStyler(node):
pass
def gdsStyler(node):
pass
def externalStyler(node):
node.attr['fillcolor'] = '#A0A0FF'
def downstreamFailureStyler(node):
node.attr['color'] = '#FFA040'
node.attr['penwidth'] = 4.0
def failureStyler(node):
node.attr['color'] = '#FF4040'
node.attr['penwidth'] = 4.0
def successStyler(node):
pass
"""
node.attr['image'] = 'pipelinesOk.png'
node.attr['imagepos'] = 'tc'
node.attr['imagescale'] = True
"""
stylerMap = {
'compute':computeStyler,
'storage':storageStyler,
'report':reportStyler,
'dag':dagStyler,
'sp':spStyler,
'rdbms':rdbmsStyler,
'gdrive':gdriveStyler,
'jl':jlStyler,
'gds':gdsStyler,
'external':externalStyler,
}
def getChildrenQuery(parentId):
return f"""
select
p.*
from
pipelines p
inner join pipelines_map pm on p.id=pm.child_id
where
pm.parent_id={parentId}
"""
def getStatusStyler(meta,success,downstreamSuccess):
if not success:
return failureStyler
elif not downstreamSuccess:
return downstreamFailureStyler
else:
return successStyler
def getSuccess(meta):
if meta[1] in successMap[meta[4]]:
return successMap[meta[4]][meta[1]]()
else:
return True
def buildGraph(graph,parentId,downstreamSuccess):
c = connection.cursor()
if parentId is None:
query = """
select
p.*
from
pipelines p
left join pipelines_map pm on p.id=pm.child_id
where
pm.child_id is null
"""
parentId = 0
else:
query = getChildrenQuery(parentId)
c.execute(query)
children = c.fetchall()
finalSuccess = True
for child in children:
graph.add_edge(parentId,child[0])
n = graph.get_node(child[0])
# n.attr['xlabel'] = child[1]
n.attr['label'] = child[4] + ': ' + child[5] + "\n" + child[1]
n.attr['height'] = 0.5
n.attr['width'] = 0.8
n.attr['shape'] = 'rectangle'
n.attr['fontsize'] = 12
n.attr['penwidth'] = 1.0
# n.attr['comment'] = child[2]
if child[4] == 'compute':
if child[3] == 1:
n.attr['tooltip'] = 'Deterministic: yes\n'
else:
n.attr['tooltip'] = 'Deterministic: NO\n'
if child[2] != "":
n.attr['tooltip'] = str(n.attr['tooltip']) + child[2]
if child[4] in stylerMap:
stylerMap[child[4]](n)
if child[5] in stylerMap:
stylerMap[child[5]](n)
childSuccess = getSuccess(child)
getStatusStyler(child,childSuccess,downstreamSuccess)(n)
finalSuccess = buildGraph(graph,child[0],downstreamSuccess and childSuccess) and finalSuccess and childSuccess
return finalSuccess
def successRdbmsTableRows(sqlPortion):
query = f"""
select count(*) from {sqlPortion}
"""
cursor = connection.cursor()
cursor.execute(query)
result = cursor.fetchone()[0]
return result > 0
def successDag(dagName):
query = f"""
select status from airflow.pipelines_airflow where dag='{dagName}' order by runTime desc limit 1
"""
cursor = connection.cursor()
cursor.execute(query)
result = cursor.fetchone()
return result is None or result[0] == 'SUCCESS'
def getSuccessRdbmsTableRowsCallback(sqlPortion):
def g():
return successRdbmsTableRows(sqlPortion)
return g
def getSuccessDagCallback(dagName):
def g():
return successDag(dagName)
return g
successMap = {
'compute':{
'import_covid_data':getSuccessDagCallback('import_covid_data'),
'import_covid_news':getSuccessDagCallback('import_covid_news'),
'gapi_upload_covidus':getSuccessDagCallback('gapi_upload_covidus'),
'verticals_daily':getSuccessDagCallback('verticals_daily'),
},
'storage':{
'covidMetrics':getSuccessRdbmsTableRowsCallback('airflow.covidMetrics where dataDate = current_date - interval if(hour(current_timestamp) >= 10,1,2) day'),
'covidMetricsUs':getSuccessRdbmsTableRowsCallback('airflow.covidMetricsUs where dataDate = current_date - interval if(hour(current_timestamp) >= 10,1,2) day'),
'Verticals Metrics':getSuccessRdbmsTableRowsCallback('verticals.metric_data where interval_id = verticals.getIntervalIdFromDatetime(current_date - interval if(hour(current_timestamp) >= 12,1,2) day)'),
'Verticals Journal':getSuccessRdbmsTableRowsCallback('verticals.journal where interval_id = verticals.getIntervalIdFromDatetime(current_date - interval if(hour(current_timestamp) >= 9,1,2) day)'),
'Verticals Analyses':getSuccessRdbmsTableRowsCallback('verticals.metric_analysis where interval_id >= verticals.getIntervalIdFromDatetime(current_date - interval if(hour(current_timestamp) >= 12,1,2) day)'),
},
'report':{
}
}
a = AGraph(strict=True,directed=True)
a.node_attr['label'] = 'Data Processing Stack'
a.node_attr['width'] = 10
a.node_attr['shape'] = 'rectangle'
a.node_attr['imagepath'] = '.:/home/anaconda/jupyter/general'
success = buildGraph(a,None,True)
if success:
display(md('## Status looks GOOD.'))
else:
display(md('## There are some issues with the pipelines.'))
a.draw('pip.svg',prog='dot')
Elements outlined in red show indications of failure. Orange indicates presumed downstream failure.