In [1]:
import sys
sys.path.insert(0,'../lib')
import os
from generallib import *
from pygraphviz import *

graph_path='/home/anaconda/anaconda3/bin'
if  not graph_path in os.environ["PATH"]: 
    os.environ["PATH"] += os.pathsep + graph_path

connection = getConnection()
In [2]:
display(md('# Pipelines Status Dashboard'))

Pipelines Status Dashboard


In [3]:
display(md("Compiled "+datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+" UTC."))

Compiled 2020-10-31 02:20:04 UTC.

In [4]:
def getDagStatus(id):
    pass

def computeStyler(node):
    node.attr['style'] = 'filled'
    node.attr['fillcolor'] = '#A0FFA0'

def storageStyler(node):
    node.attr['shape'] = 'circle'
    node.attr['style'] = 'filled'
    node.attr['fillcolor'] = '#E0E0E0'

def reportStyler(node):
    node.attr['shape'] = 'square'

def dagStyler(node):
    pass

def spStyler(node):
    node.attr['shape'] = 'circle'

def rdbmsStyler(node):
    pass

def gdriveStyler(node):
    node.attr['fillcolor'] = '#A0A0FF'

def jlStyler(node):
    pass

def gdsStyler(node):
    pass

def externalStyler(node):
    node.attr['fillcolor'] = '#A0A0FF'

def downstreamFailureStyler(node):
    node.attr['color'] = '#FFA040'
    node.attr['penwidth'] = 4.0

def failureStyler(node):
    node.attr['color'] = '#FF4040'
    node.attr['penwidth'] = 4.0

def successStyler(node):
    pass

"""
    node.attr['image'] = 'pipelinesOk.png'
    node.attr['imagepos'] = 'tc'
    node.attr['imagescale'] = True
"""

stylerMap = {
    'compute':computeStyler,
    'storage':storageStyler,
    'report':reportStyler,
    'dag':dagStyler,
    'sp':spStyler,
    'rdbms':rdbmsStyler,
    'gdrive':gdriveStyler,
    'jl':jlStyler,
    'gds':gdsStyler,
    'external':externalStyler,
}
In [5]:
def getChildrenQuery(parentId):
    return f"""
    select
        p.*
    from
        pipelines p
        inner join pipelines_map pm on p.id=pm.child_id
    where
        pm.parent_id={parentId}
    """

def getStatusStyler(meta,success,downstreamSuccess):
    if not success:
        return failureStyler
    elif not downstreamSuccess:
        return downstreamFailureStyler
    else:
        return successStyler

def getSuccess(meta):
    if meta[1] in successMap[meta[4]]:
        return successMap[meta[4]][meta[1]]()
    else:
        return True

def buildGraph(graph,parentId,downstreamSuccess):
    c = connection.cursor()
    if parentId is None:
        query = """
        select
            p.*
        from
            pipelines p
            left join pipelines_map pm on p.id=pm.child_id
        where
            pm.child_id is null
        """
        parentId = 0
    else:
        query = getChildrenQuery(parentId)
    c.execute(query)
    children = c.fetchall()
    finalSuccess = True
    for child in children:
        graph.add_edge(parentId,child[0])
        n = graph.get_node(child[0])
        # n.attr['xlabel'] = child[1]
        n.attr['label'] = child[4] + ': ' + child[5] + "\n" + child[1]
        n.attr['height'] = 0.5
        n.attr['width'] = 0.8
        n.attr['shape'] = 'rectangle'
        n.attr['fontsize'] = 12
        n.attr['penwidth'] = 1.0
        # n.attr['comment'] = child[2]
        if child[4] == 'compute':
            if child[3] == 1:
                n.attr['tooltip'] = 'Deterministic: yes\n'
            else:
                n.attr['tooltip'] = 'Deterministic: NO\n'
        if child[2] != "":
            n.attr['tooltip'] = str(n.attr['tooltip']) + child[2]
        if child[4] in stylerMap:
            stylerMap[child[4]](n)
        if child[5] in stylerMap:
            stylerMap[child[5]](n)
        childSuccess = getSuccess(child)
        getStatusStyler(child,childSuccess,downstreamSuccess)(n)
        finalSuccess = buildGraph(graph,child[0],downstreamSuccess and childSuccess) and finalSuccess and childSuccess 
    return finalSuccess
In [6]:
def successRdbmsTableRows(sqlPortion):
    query = f"""
        select count(*) from {sqlPortion}
    """
    cursor = connection.cursor()
    cursor.execute(query)
    result = cursor.fetchone()[0]
    return result > 0

def successDag(dagName):
    query = f"""
    select status from airflow.pipelines_airflow where dag='{dagName}' order by runTime desc limit 1
    """
    cursor = connection.cursor()
    cursor.execute(query)
    result = cursor.fetchone()
    return result is None or result[0] == 'SUCCESS'

def getSuccessRdbmsTableRowsCallback(sqlPortion):
    def g():
        return successRdbmsTableRows(sqlPortion)
    return g

def getSuccessDagCallback(dagName):
    def g():
        return successDag(dagName)
    return g

successMap = {
    'compute':{
        'import_covid_data':getSuccessDagCallback('import_covid_data'),
        'import_covid_news':getSuccessDagCallback('import_covid_news'),
        'gapi_upload_covidus':getSuccessDagCallback('gapi_upload_covidus'),
        'verticals_daily':getSuccessDagCallback('verticals_daily'),
    },
    'storage':{
        'covidMetrics':getSuccessRdbmsTableRowsCallback('airflow.covidMetrics where dataDate = current_date - interval if(hour(current_timestamp) >= 10,1,2) day'),
        'covidMetricsUs':getSuccessRdbmsTableRowsCallback('airflow.covidMetricsUs where dataDate = current_date - interval if(hour(current_timestamp) >= 10,1,2) day'),
        'Verticals Metrics':getSuccessRdbmsTableRowsCallback('verticals.metric_data where interval_id = verticals.getIntervalIdFromDatetime(current_date - interval if(hour(current_timestamp) >= 12,1,2) day)'),
        'Verticals Journal':getSuccessRdbmsTableRowsCallback('verticals.journal where interval_id = verticals.getIntervalIdFromDatetime(current_date - interval if(hour(current_timestamp) >= 9,1,2) day)'),
        'Verticals Analyses':getSuccessRdbmsTableRowsCallback('verticals.metric_analysis where interval_id >= verticals.getIntervalIdFromDatetime(current_date - interval if(hour(current_timestamp) >= 12,1,2) day)'),
    },
    'report':{
    }
}


a = AGraph(strict=True,directed=True)
a.node_attr['label'] = 'Data Processing Stack'
a.node_attr['width'] = 10
a.node_attr['shape'] = 'rectangle'
a.node_attr['imagepath'] = '.:/home/anaconda/jupyter/general'

success = buildGraph(a,None,True)

if success:
    display(md('## Status looks GOOD.'))
else:
    display(md('## There are some issues with the pipelines.'))

a.draw('pip.svg',prog='dot')

Status looks GOOD.

Elements outlined in red show indications of failure. Orange indicates presumed downstream failure.