Downstream Data // Extract Transform Load

Downstream Data // Extract Transform Load

Handling third party data via Workers and API calls

It's a common problem for all API engineers, inevitably you will be faced with a time when you depend on someone else's API, data format, or some legacy team that isn't friendly to your requests. This happens in startup land and it happens in enterprise land. An excellent strategy for this is to wrap their API with a client that can procure the data that you need on a regular basis and store it in your format.

With App DNA, we accommodate this by building a Worker that runs on a schedule or Queues work to be processed by other workers who monitor a queue. In this example, we will demonstrate a simple Crypto Data use case where we fetch data from Dune. This is a popular community-focused startup that is aiding in making on-chain crypto data available to be queried in a SQL-like interface. They have recently been launching an API to let users integrate with these Queries by ID.

In this article, you will:

  1. Find a useful simple approach to integrate with your queries by their ID using a well-written Dune Wrapper.

  2. Learn how to wrap that unit of work in a Worker

  3. Expose that Worker to Kubernetes and have it regularly process your data

Integrate with Dune API

I usually start with proof that the API I want to invoke is going to work, and a great way to do that is with a Unit Test in Python.

import logging
import sys
import unittest

from dna.config import DUNE_API_KEY

from dune_client.client import DuneClient
from dune_client.query import Query

logger = logging.getLogger()
logger.level = logging.DEBUG


class TestDuneAPI(unittest.TestCase):

    def test_wrapper_query(self):
        stream_handler = logging.StreamHandler(sys.stdout)
        logger.addHandler(stream_handler)
        query = Query(
            name="Aggregate Query",
            query_id=1463956
        )
        dune = DuneClient(DUNE_API_KEY)
        results = dune.refresh(query)
        logger.info(results)


if __name__ == '__main__':
    unittest.main()

This unit test is not asserting anything as I'm sure you have noticed. However, it is using this awesome library called Dune Client that was written by the folks over at Cow Protocol. Thanks for this, it made the job way easier.

Dune is willing to run a SQL query for you on their data warehouse but sometimes these queries can take minutes or tens of minutes to run. This means you have to poll for your result through their Async API. Dune Client wraps this for you and handles the polling. You can see this in the query by id 1463956 above if you give it a shot. It'll take a few minutes to complete. Bear in mind you'll need a DUNE_API key for this to work and you can store that in the dna.config or hard code it in your test locally to try it out.

Model the Results

Well, this is great, we can get data out of Dune, now what!? In my use case, I wanted to build aggregated on-chain and off-chain metric charts. So I created a simple RawMetric data type. This allows me to store time series data of any source in one place that I can then re-materialize into other views later. If you took a look at my last article, you know I use SQL Alchemy in App DNA. So here is that simple data model I'll use to store recorded raw metrics.

import uuid
import logging

from sqlalchemy import Column, ForeignKey, Float, DateTime
from sqlalchemy.dialects.postgresql import UUID

from dna.common.model import Base, HasCreateTime
from dna.common.model.db.dna_reference_master_model import DigitalAssetMetric

logger = logging.getLogger('dna.model.db.onchain_model')
logger.setLevel(level=logging.INFO)
logging.basicConfig()


class RecordedRawMetric(Base, HasCreateTime):
    __tablename__ = "onchain_recorded_raw_metric"
    metric_uuid = Column(UUID(as_uuid=True), ForeignKey(DigitalAssetMetric.uuid), nullable=False)
    record_date = Column(DateTime(timezone=True), nullable=False)
    value = Column(Float, index=False, nullable=False)

    def __init__(self, metric_uuid, record_date, value):
        super().__init__()
        self.metric_uuid = metric_uuid
        self.record_date = record_date
        self.value = value

    __mapper_args__ = {
        "primary_key": [metric_uuid, record_date]
    }

Now if I want to process a bunch of Dune Queries into the Raw Metrics I have to execute a query and then save the results. This is how I did that:

@managed_session
def do_handle_dune_query_for_metric(metric, start_date, end_date, session=None):
    query = Query(
        name="Aggregate Query",
        query_id=metric.api_query_url,
        params=[
            QueryParameter.date_type(
                name='1. start_date',
                value=query_friendly_date_str(start_date)),
            QueryParameter.date_type(
                name='2. end_date',
                value=query_friendly_date_str(end_date))
        ]
    )
    dune = DuneClient(DUNE_API_KEY)
    results = dune.refresh(query)
    if results.state in ExecutionState.terminal_states():
        print(f"Saving results for query {metric.name}")
        exec_result = results.result
        result_rows = exec_result.rows

        insert_or_update_values = []
        for row in result_rows:
            db_metric = RecordedRawMetric(
                metric_uuid=metric.uuid,
                record_date=datetime.fromisoformat(row['day'][:-1]).astimezone(timezone.utc),
                value=float(row['value'])
            )
            insert_or_update_values.append(db_metric)
        upsert_recorded_raw_metrics(metrics=insert_or_update_values, session=session)

        session.commit()

It's kind of weird but I parameterized my Dune API with String IDs that are numbered so that they look pleasing when viewing in the Dune interface as well like this:

You can fork or take a look at the query here.

Build a worker

Now to come at this from the angle of work-to-be-done. We were scheduling this or controlling the invocation via an Administrative function that "schedules" or queues work. This is done via Celery in Python.

The individual unit of work is handled by this celery task:

@celery.task(
    bind=True,
    base=TaskBase,
    serializer='pickle',
    name='dna.onchain.periodic.task.handle_dune_query_for_metric')
def handle_dune_query_for_metric(self, metric, start_date, end_date):
    do_handle_dune_query_for_metric(metric, start_date, end_date)

The orchestration of all the work-to-be-done is handled by this task:

@celery.task(
    bind=True,
    base=TaskBase,
    serializer='pickle',
name='dna.onchain.periodic.admin.schedule_backfill_all_digital_asset_metrics')
def schedule_backfill_all_digital_asset_metrics(self):
    do_handle_schedule_backfill_digital_asset(
        async_mode=True,
        async_dune_callback=handle_dune_query_for_metric,
        async_l2beat_callback=handle_l2beat_query_for_metric,
        session=self.session
    )

So the entry point is into schedule_backfill_all_digital_asset_metrics which invokes the function:

@managed_session
def do_handle_schedule_backfill_digital_asset(
        async_mode=False,
        async_dune_callback=None,
        async_l2beat_callback=None,
        session=None):
    """
    Schedules backfill for all digital assets and metrics

    1. Find a list of all digital assets
    2. Find all metrics for each digital asset
        1. Determine last end time (query raw table)
        2. If no end time, query between DEFAULT_START and NOW()
    3. Request raw metrics for each metric and time frame
    """
    projects = session.query(DigitalAssetProject).all()
    dune_source = find_source_by_name(source_name="Dune", session=session)
    for project in projects:
        print(f"Project uuid={project.uuid} => {project.project_name}")
        dune_project_metrics = find_metrics_for_project_by_source(project, dune_source, session)
        _handle_dune_project_metrics(async_dune_callback, async_mode, dune_project_metrics, session)

As you can see that delegates to a function with _handle_dune_project_metrics:

def _handle_dune_project_metrics(async_callback, async_mode, dune_project_metrics, session):
    if len(dune_project_metrics):
        for metric in dune_project_metrics:
            start_date, end_date = find_bounding_dates_for_metric(metric, session)
            print(f"Project metric {metric.name} and {metric.api_query_url}")
            print(f"Start date: {start_date}    End Date: {end_date}")
            if async_mode:
                async_callback.delay(metric, start_date, end_date)
            else:
                do_handle_dune_query_for_metric(metric, start_date, end_date, session)

This in turn helps us schedule a unit-of-work with our worker nodes (which were the first declared function in this section). This allows us to run multiple queries at once on as many Kubernetes Pods as we deem appropriate to schedule to run on our Kubernetes cluster.

Setup Kubernetes

Now for the bring it home moment, how does it all get scheduled?

apiVersion: apps/v1
kind: Deployment
metadata:
  name: foliofficient-onchain-worker
spec:
  revisionHistoryLimit: 1
  selector:
    matchLabels:
      app: foliofficient
      tier: backend
      role: backend-worker
  replicas: 4
  template:
    metadata:
      labels:
        app: foliofficient
        tier: backend
        role: backend-worker
    spec:
      containers:
        - name: onchain-worker
          image: <yoink_pointer_to_your_app_dna_image>
          imagePullPolicy: Always
          command: [
            "/usr/local/bin/celery",
            "--workdir", "/usr/src/app",
            "-A", "dna",
            "--config", "dna.periodic.celery_config",
            "worker", "-E",
            "-Q", "onchain",
            "-O", "fair"
          ]
          workingDir: /usr/src/app
          envFrom:
            - configMapRef:
                name: foliofficient-app-config
            - configMapRef:
                name: foliofficient-worker
          resources:
            requests:
              cpu: 200m
              memory: 200Mi

The meat and potatoes of this worker, the command that is being run:

"/usr/local/bin/celery",
            "--workdir", "/usr/src/app",
            "-A", "dna",
            "--config", "dna.periodic.celery_config",
            "worker", "-E",
            "-Q", "onchain",
            "-O", "fair"

This tells our DNA to create a worker and use the dna.periodic.celery_config and process the onchain Queue. If you need help configuring Celery hit me back in the comments and maybe I'll do a post on that in the future.

Conclusion

I know this isn't enough to get you up and running with your own working AppDNA process, but my hope is that it'll inspire you to work on your stack for transforming and loading data into your application.