Publishing assets to Twitter with Dagster and Hex
How can you automatically share charts with stakeholders to give them an overview of your platform’s data?
I’m excited to be closing the loop on something I’ve been meaning to do for a while: automatically sharing data assets with stakeholders and the public. The discursus project is a data platform to track protest movement activity. And beyond the data it produces, I’ve been meaning to automatically produce and share charts of what we are observing.
Such as this daily summary shared on Twitter.
https://twitter.com/discursus_io/status/1598399160792875011
I say “closing the loop”, because it’s been a work in progress for quite a while now. That simple map automatically shared on Twitter is actually the output of the following architecture.
But today I want to focus on the final 2 components responsible for producing and delivering the assets: Dagster and Hex. And without naming them, I want to acknowledge the incredible community of developers who are always so willing to brainstorm ideas and just be so damn helpful 🙏 Hopefully, by making the discursus data platform open source, I can contribute back a little.
Producing Hex-driven assets in Dagster
Let’s start by zooming out a little and get a bit of perspective on how the assets get produced and eventually shared.
I already took a deep dive into producing those assets in a previous post, but what I want to do now is zoom in on the data apps assets that are being produced.
I currently have a job that runs a refresh of my main discursus dashboard. This runs twice a day, right after the data warehouse has been refreshed. The integration between Dagster and Hex is smooth and effortless with the dagster_hex library provided by the Hex team. But I did do a little bit of tweaking to my implementation.
Dagster did change its conceptual organization around assets instead of jobs and ops (which do still exist though). And I wanted my Hex app refreshes to be treated as assets as they do produce… assets. With that in mind, I refactored the API calls to be treated as assets instead of ops.
@asset(
non_argument_deps = {"dw_data_tests"},
description = "Hex main dashboard refresh",
group_name = "data_apps",
resource_defs = {
'hex_resource': resources.my_resources.my_hex_resource
},
)
def hex_main_dashboard_refresh(context):
hex_output: HexOutput = context.resources.hex_resource.run_and_poll(
project_id = "d6824152-38b4-4f39-8f5e-c3a963cc48c8",
inputs = None,
update_cache = True,
kill_on_timeout = True,
poll_interval = DEFAULT_POLL_INTERVAL,
poll_timeout = None,
)
asset_name = ["hex", hex_output.run_response["projectId"]]
return Output(
value = asset_name,
metadata = {
"run_url": MetadataValue.url(hex_output.run_response["runUrl"]),
"run_status_url": MetadataValue.url(
hex_output.run_response["runStatusUrl"]
),
"trace_id": MetadataValue.text(hex_output.run_response["traceId"]),
"run_id": MetadataValue.text(hex_output.run_response["runId"]),
"elapsed_time": MetadataValue.int(
hex_output.status_response["elapsedTime"]
),
},
)
As I mentioned, the data platform refreshes those assets twice a day, right after the data warehouse has been rebuilt. So I can now keep track of the production of those assets directly in Dagster.
Producing summary assets in Hex
I know there could have been a few ways for me to run notebooks directly from Dagster, but I’m already using Hex for my dashboarding needs, and I just honestly love working with that tool. So the question was now: how can I produce graphs and summary statistics in Hex that I could then manipulate afterwards in Dagster.
Thanks to some community members' help, I was finally able to wrap my head around a solution and piece it together. I’m essentially producing graphs that I can then manipulate as objects in python and save them to S3. I can then read them in Dagster and start packaging them up for sharing.
I created a new Hex app that initially pulls 24 hours’ worth of protest data from my Cube API. I then summarize that data by location and map it using the plotly library.
import plotly.express as px
px.set_mapbox_access_token('xyz')
fig = px.scatter_mapbox(
movement_events_per_location,
title = 'Top protest movements in North American in the past 24 hours',
mapbox_style = 'light',
lat = "action_geo_latitude",
lon = "action_geo_longitude",
color = "movement_name",
opacity = 0.7,
size = "events",
size_max = 15,
zoom = 2,
template = 'seaborn')
fig.show()
That produces the following map.
The second asset I want to produce is just a very simple summary table of the most active protest movements and save it to a pandas dataframe.
Once those 2 assets have been produced, I can upload them to S3 for future use in Dagster, but also to keep a trace of all assets that I’m producing with the data platform.
Here’s how I upload those assets to S3.
import boto3
import kaleido
from datetime import date
# Today's date
today = date.today().strftime("%Y%m%d")
# create a connection to s3
s3 = boto3.resource('s3',
aws_access_key_id = aws_access_key_id,
aws_secret_access_key = aws_secret_access_key)
bucket = "discursus-io"
# save the plot to a static folder
protest_movements_map_file_name = today + '_' + "top_protests_map.png"
fig.write_image(protest_movements_map_file_name)
# save map_data to a static folder
protest_movements_csv_file_name = today + '_' + "top_protests.csv"
movement_events.to_csv(protest_movements_csv_file_name)
# upload image to aws s3
img_data = open(protest_movements_map_file_name, "rb")
s3.Object('discursus-io', 'assets/daily_summary/' + protest_movements_map_file_name).put(Body = img_data, ContentType = "image/png")
# upload csv to aws s3
csv_data = open(protest_movements_csv_file_name, "rb")
s3.Object('discursus-io', 'assets/daily_summary/' + protest_movements_csv_file_name).put(Body = csv_data, ContentType = "text/csv")
Sharing summary assets to Twitter
Great! Now we have fresh summary assets just ready for us to share wherever we wish. For this example, I just want to publish once a day to Twitter a summary of what we’ve been observing on the data platform.
Referencing back again to my assets lineage graph, I now want to run twitter_share_daily_assets.
Again, you can just browse the data platform’s code that shares those assets for yourself, but it’s really nothing more at this point than reading from S3, producing some dynamic text and sending it all to Twitter.
@asset(
non_argument_deps = {"hex_daily_assets_refresh"},
description = "Twitter share of daily summary assets",
group_name = "data_apps",
resource_defs = {
'aws_resource': resources.my_resources.my_aws_resource,
'twitter_resource': resources.my_resources.my_twitter_resource
},
)
def twitter_share_daily_assets(context):
# Retrieve daily summary assets
today = date.today().strftime("%Y%m%d")
protest_movements_map_file_name = "top_protests_map.png"
protest_movements_map_file_path = "assets/daily_summary/" + today + "_" + protest_movements_map_file_name
protest_movements_csv_file_path = "assets/daily_summary/" + today + "_top_protests.csv"
protest_movements_map = context.resources.aws_resource.s3_get(
bucket_name = 'discursus-io',
file_path = protest_movements_map_file_path,
object_type = 'png')
image = Image.open(io.BytesIO(protest_movements_map))
image.save(protest_movements_map_file_name)
df_protest_movements = context.resources.aws_resource.s3_get(
bucket_name = 'discursus-io',
file_path = protest_movements_csv_file_path,
object_type = 'csv',
dataframe_conversion = True)
# Upload map to Twitter
twitter_media = context.resources.twitter_resource.upload_media(protest_movements_map_file_name)
context.log.info(twitter_media)
# Create text for tweet
tweet = f""" Here are the top protest movements for {today}.
The {df_protest_movements['movement_name'].iloc[0]} protest movement has been the most active with {df_protest_movements['events_count'].iloc[0]} events captured.
Visit the dashboard for further insights: https://app.hex.tech/bca77dcf-0dcc-4d33-8a23-c4c73f6b11c3/app/d6824152-38b4-4f39-8f5e-c3a963cc48c8/latest"""
# Post tweet
twitter_status = context.resources.twitter_resource.post(tweet, [twitter_media.media_id_string])
context.log.info(twitter_status)
# Return asset
return Output(
value = twitter_status,
metadata = {
"Id": twitter_status.id_str
}
)
As those get published, I can keep track of the tweet’s metadata in my assets catalog.
I could eventually mine more metadata such as the number of favourites, retweets, replies, etc. But for now, that will do.
And that ends the tour of how I ended up publishing assets to Twitter with Dagster and Hex. I hope it’s been as instructive as it’s been fun to put in place. Please do reach out if you have further questions on the implementation.