Jailbreak Difficult Data Sources: A Guide on PySpark DataSource API

Apache Spark has been famous for a long time with its rich plethora of data source connectors ranging from columnar files, open table formats, databases via JDBC connections and message brokers. Nevertheless, it has been somewhat difficult to extract data from systems that have interfaces like GraphQL,  REST APIs and mainframes. Spark has the option to write custom data connectors but it has to be written in Java/Scala and was not an easy endeavour.

Other traditional approaches to solve the same problem involves adding additional moving parts to the solution such as Azure Data Factory or Apache NiFi (self-hosted or via some cloud offering) or writing custom code and definitely going for a 3rd party offering like Fivetran to ingest the data.

With the introduction of Python Data Source API in Spark 4.0, the story has changed. There is an easy interface now to connect to other systems and pull data in batch or streaming mode. This opens the door to many system design simplifications and removal of clunky moving parts.

In this walkthrough, we will cover:

  • Build basic Python code to connect to BlueSky REST API

  • Implement a PySpark streaming data source

  • Build a near real-time Databricks workflow to leverage the new data source

The objective is to introduce data engineers to PySpark DataSource API and deeply understand its mechanics especially in streaming mode. You will also get to know a few other nice options about Databricks like the Free Edition. PySpark DataSource API is not bound to Databricks as it is a plain feature in OSS Spark so you can still use it in other Spark environments. Databricks is chosen here for learning purposes as an easy entry point to the Spark ecosystem.


Requirements

The only requirement needed is a Databricks Free Edition account. Please note that that's not the old trial (i.e. community) edition offering. This one is different as you don’t need to provide a credit card, and it covers a more broader range of Databricks features compared to the community edition. Head over to this page to sign up. A paid Databricks account will also work fine but the free edition is most useful for new starters.

Part 1: Exploring the Bluesky Jetstream

Use Case Introduction

Bluesky is a social network that is very similar to other networks like Mastadoon related to decentralisation, openness and user control. Bluesky has its own communication protocol called AT protocol but they also surface the firehose over an easily accessible interface called Jetstream which offers a WebSocket to consume the firehose of the network. The goal is to connect to that firehose via WebSockets, grab data for collections like posts/likes/reposts and make that available as a Spark streaming data source. Then the Spark streaming dataframe will be used to build medallion architecturew layers like bronze layer for raw data and silver layer for cleansed data.

Alright, let’s get our hands dirty with some code.

 Experimenting with WebSockets

In the data science/engineering space, experimentation is a fundamental part of the day to day work. Databricks Free/Paid editions have super-convenient tools to get started quickly with no fuss. Head over to your Databricks free edition home page and create a new notebook and give it a name like “Jetstream Experiment”.

Databricks Free Edition offers only serverless compute so there is no need to go create interactive clusters. For consistent repeatable behaviour, let’s update the notebook environment version to be on version 3.

The environment version for serverless compute is equivalent to the idea of Databricks runtime version for classic clusters.


From the right sidebar, click on the environments button and make sure version 3 is selected or select it otherwise then click Apply.

Back to the notebook and in the first cell, install the WebSockets Python library because Jetstream surfaces the data over WebSockets.

%pip install websockets
%restart_python

The documentation about the Jetstream servers can be found here. We will use one of the US-West servers and it is totally public so there is no need for API keys.

Create a new cell with the following code snippet.

import time
from typing import List
from websockets.sync.client import connect

def consume_bluesky_messages(collection: str, start_time: int, end_time: int) -> List[str]:
        messages = []
        base_uri = "wss://jetstream1.us-west.bsky.network/subscribe?"
        uri = f"{base_uri}wantedCollections={collection}&cursor={start_time * 1_000_000}"
        with connect(uri) as websocket:
            try:
                while  int(time.time()) < end_time:
                    message =  websocket.recv(timeout=1)
                    messages.append(message)
            except Exception as e:
                print("Error:", e)

        return messages

By the way, you may notice if you start to write the code manually inside the notebook you will get AI-assisted code suggestions out of the box for free. AI is everywhere you cannot escape it. Anyway, here is a quick explanation of the function:

  1. We start with a few imports but the most important one is the “connect” function of the WebSockets library. WebSockets library provides asynchronous and synchronous options and we will stick with the synchronous mode for simplicity.

  2. Next we define the function to take a Bluesky collection name (e.g. posts) and a range of time to grab the messages for.

  3. Then an empty messages list is defined along with the base URI of one of the Jetstream servers.

  4. The final URI of the WebSocket connection is an F-string with the collection name and the start time. Jetstream expects the epoch value to be in micro seconds hence the passed value has to be multiplied by 1 million.

  5. Then a WebSocket connection is opened and messages are streamed via that connection until the current time is greater than the passed end_time parameter.

Jetstream abstracts the AT protocol and provides an easy WebSocket connection that returns JSON messages.

Once the function above is defined, it can be called to get messages for new posts published starting the current time till 10 seconds ahead in the future.

import json
from pprint import pprint

now = int(time.time())
ten_seconds_ahead = int(time.time()) + 10

messages = consume_bluesky_messages("app.bsky.feed.post", now, ten_seconds_ahead)

print(f"{len(messages)} messages received")
if len(messages) > 0:
    first_message = json.loads(messages[0])
    pprint(first_message)


It will be apparent later why the function start and end time parameters are designed like that but for now you should get output like the following.

Within 10 seconds, 364 messages were received from the posts collection.

The notebook experience on Databricks makes it easy to try new ideas quickly and get familiar with new integrations. Now that we have a way to grab data from Bluesky, let’s see how to develop a Spark streaming data source on top of it.

Part 2 : Implementing the streaming Python Data Source

The Basics of the Python Data Source API

As I mentioned before, the only way to connect to any non-standard Spark source was to implement a connector in Java or Scala which takes time and requires a not so common skillset. The Python Data Source API is a new feature introduced in Spark 4.0, enabling developers to read from custom data sources and write to custom data sinks in Python. By the way, this API is usually named “PySpark custom data sources” on Databricks documentation website but both refer to the same thing.


It offers a batch option and a streaming option. It also comes with the plumbing to efficiently load the data in parallel. For example, when reading from a Kafka topic, data reading is done against all the partitions of a certain topic. The same idea applies to things like sharded database tables. Many systems distribute the data across multiple locations for various reasons. We will emulate the parallelised data load via loading data from multiple Bluesky collections for the same reader. Databricks introduced more sophisticated features like predicate push down to the batch reader recently.

Here is a high-level overview of the API but it will be crystal clear how things work once we start the real implementation.

  • Develop a class inheriting from the DataSource class and defining the methods:

    • @classmethod name() → custom source name used in spark.read or spark.readStream.

    • schema() → returned schema of the source.

    • reader(...) → for batch reads.

    • writer(...) → for batch writing.

    • streamReader(...) or simpleStreamReader(...)  → for streaming reads.

    • streamWriter(...) → for streaming writes.

  • If batch read is needed, implement a corresponding DataSourceReader subclass with a read(...) method that yields row tuples.

  • For streaming reads, use either:

    • The partition‑aware API (DataSourceStreamReader) with full offset management.

    • Or the SimpleDataSourceStreamReader—suitable for low‐throughput REST-style sources without partition parallelism.

  • Similarly, for streaming or batch writes, implement DataSourceWriter or DataSourceStreamWriter.

All these classes must be serializable (i.e. composed of primitive‐type dictionaries) so Spark can distribute them across workers reliably.

I normally understand better by doing, so let’s build a partition-aware streaming data reader for Bluesky.

Bluesky Streaming Reader

Create a new notebook and give it a proper name like “bluesky-stream-reader”, switch it to environment version 3. Using the same approach of installing dependencies using the pip magic (%pip) inline inside the notebook may not work or be reliable in our case here. Basically, dependencies should be registered on the level of the environment because they have to be available for each worker that will participate in the Spark job. What we have done before was code running on the driver node (more or less) but now the same code needing WebSockets may run on multiple machines.

Open the same “Environment” configuration from the sidebar and register WebSockets in the dependencies section and click Apply

Add the following code as the notebook first cell.

from dataclasses import dataclass
from pyspark.sql.datasource import InputPartition

@dataclass
class BlueskyCollectionPartition(InputPartition):
     collection: str
     start_time: int
     end_time: int

It will be more clear later why we need this class but in a nutshell it is required to enable parallelism while reading data from the remote source. In most cases, it should have some sort of a partition key which is the collection name in our case plus some pointers to deterministically specify which records to pull from the source. This class will be serialised and saved in the checkpoint files and I will show that later. Now let’s have a look at the stream reader implementation which will be the biggest code snippet in this article but I will explain it in detail so don’t panic.

import sys
import time
from typing import Dict, Iterator, Tuple, List
from pyspark.sql.datasource import DataSourceStreamReader

class BlueskyStreamDataSourceReader(DataSourceStreamReader):
    BATCH_DURATION = 20

    def __init__(self, schema, options):
        self.schema = schema
        self.options = options
        self.collections = [collection.strip() for collection in self.options["collections"].split(",")]

    def initialOffset(self) -> dict:
        now = int(time.time())
        return { collection: now for collection in self.collections}

    def latestOffset(self) -> dict:
        look_ahead_limit = (int(time.time()) + BlueskyStreamDataSourceReader.BATCH_DURATION)
        latest_offsets = { collection: look_ahead_limit for collection in self.collections}
        return latest_offsets

    def partitions(self, start: dict, end: dict):
        partitions = [
            BlueskyCollectionPartition(collection, start[collection], end[collection]) 
            for collection in self.collections
        ]
        return partitions

    def commit(self, end: dict):
        pass

    def consume_bluesky_messages(self, collection: str, start_time: int, end_time: int) -> List[str]:
        from websockets.sync.client import connect
        messages = []
        base_uri = "wss://jetstream1.us-east.bsky.network/subscribe?"
        uri = f"{base_uri}wantedCollections={collection}&cursor={start_time * 1_000_000}"
        with connect(uri) as websocket:
            try:
                while  int(time.time()) < end_time:
                    message =  websocket.recv(timeout=1)
                    messages.append(message)
            except Exception as e:
                print("Error:", e)
        return messages

    def read(self, partition: BlueskyCollectionPartition) -> Iterator[Tuple]:
        collection = partition.collection
        start_time = partition.start_time
        end_time = partition.end_time
        messages = self.consume_bluesky_messages(collection, start_time, end_time)
        
        for message in messages:
            yield tuple([message, collection])
  1. The code starts with a group of imports but the core thing is the class DataSourceStreamReader. Next, the class BlueskyStreamDataSourceReader is defined and it inherits from the base class DataSourceStreamReader

  2. Then a BATCH_DURATION class level variable is defined which controls how long we keep streaming data from Bluesky within each micro-batch.

  3. The class init function is pretty standard but the core thing is splitting a comma-separated value of the collections to load. This value will come from the DataSource class which will be covered shortly. 

  4. The initialOffset function tells what is the starting point of the stream execution. In our case, it would be the current time but the function returns it as a key value pair per each collection. This data source is a partition-aware source so offsets have to be per partition.

  5. The latestOffset function is the most important thing to take away from this article. In Spark structured streaming, the engine will need to do one critical thing before reading the data per each micro-batch. That critical thing is to figure out what is the maximum/latest offset to get from the source (globally or per partition). If we take Kafka for example, Spark needs to query the topic and find the latest offset per each topic-partition. Then when reading the messages starts, it will be scoped/limited to all the offsets greater than the offsets from the previous micro-batch till the offsets identified in the current micro-batch. Note that while Spark reads the messages, a few extra new messages might be inserted into the topic but that does not affect the overall behaviour as the current micro-batch has a fixed range of offsets to scan. The same concept applies to our Bluesky case. The function will need to specify a fixed future time at which the consumption from WebSockets will stop. The future time is calculated based on the batch duration. Don’t go for smaller durations because in the initial run of the streaming job, Spark may take some time to initialise things and you might end up with the consume_bluesky_messages called with end time less than current time and nothing will be loaded.

  6. The partitions function takes a couple of dictionaries as parameters. The first one called “start” comes from the checkpoint of the previous micro-batch or the result of the function initialOffset if this is the first batch. The second parameter called “end” comes from the output of the function latestOffset. The function simply creates a list of BlueskyCollectionPartition  instances. For example, one entry of that list could be {collections:”posts”, start_time:10, end_time: 15}.

  7. The commit function performs any needed clean up actions specially if we have things like database or network connections to close. In our case it is a pass.

  8. The function consume_bluesky_messages has the same implementation as the experimentation part done before.

  9. The read function is the one that does the exciting work. It is fed with a single partition instance of type BlueskyCollectionPartition and should return a list of tuples where each tuple has the elements mapping to the required schema which is two columns here. The first column is the collection name and the second column is the retrieved message. This function simply calls consume_bluesky_messages and takes the retrieve list of messages and returns a generator.

Phew! That was quite lengthy but if you can absorb it till here the rest is easy. The next class is kind of a plumbing class.

from pyspark.sql.datasource import DataSource

class BlueskyStreamDataSource(DataSource):
    def __init__(self, options: Dict[str, str]):
        if not "collections" in options:
            raise ValueError("option collections is required")
        if "schema" in options:
            raise ValueError(
                "The schema option is not supported."
                "This data source has a fixed schema."
                "Please remove the schema option from your configuration"
            )
        options["schema"] = "contents string, collection string"
        self.options = options

    @classmethod
    def name(cls):
        return "bluesky"

    def schema(self):
        return self.options["schema"]

    def streamReader(self, schema):
        return BlueskyStreamDataSourceReader(schema, self.options)

spark.dataSource.register(BlueskyStreamDataSource)
  1. The class BlueskyStreamDataSource has to inherit from DataSource to be treated as a Python data source.

  2. The init function does a bit of validation, defines the fixed schema used for this data source and sets class options based on the sent parameter.

  3. The name class method defines the data source name which will be used in spark.readStream calls.

  4. The schema function returns the schema from the options instant field. This function can be used to infer schema for more dynamic use cases but that’s outside the scope of this article although it is not really hard.

  5. The streamReader function does create an instance of the class BlueskyStreamDataSourceReader because this is the class that does the real work.

  6. Finally, there is a registration call to wire up the new data source with the Spark session.

Testing the New Streaming Reader

Congratulations! You have done the hard part and now it is time to harvest the fruits of your hard work.


First, a streaming checkpoint location should be created. We will use a Unity Catalog volume to host the checkpoints. Create a new cell with the below code.

%sql
CREATE VOLUME IF NOT EXISTS workspace.default.checkpoints;

Add a new cell with the below snippet.

collections = "app.bsky.feed.post,app.bsky.feed.like,app.bsky.feed.repost,app.bsky.graph.follow"
reader = spark.readStream.option("collections", collections).format("bluesky")

df = reader.load()
df.display(checkpointLocation = "/Volumes/workspace/default/checkpoints/display")

Notice the “bluesky” data source name used in the readStream call and how the new data source accepts a collections option which is a comma-separated list of the Bluesky collections to extract.

Regarding the idea of checkpointing and how offsets are managed for the current source, we can have a look at the first offset file in the checkpoint folder.

%sh
cat /Volumes/workspace/default/checkpoints/display/offsets/0

The last line (highlighted in the screenshot) shows a dictionary persisted to disk with one key per collection and the value per each key is the max epoch time of the records pulled in that micro-batch. The epoch times are identical per partition but that does not have to be the standard. For example when loading data from Kafka, each topic partition most likely has a different offset value.

Till here, we have a working custom streaming data source implemented in Python. It’s partition aware and uses coding constructs that can be tried in isolation and unit tested as well.

It’s time now to put this ingestion code into an end to end pipeline to load the Bluesky data into the lakehouse architecture.

Part 3 : The End to End Data Pipeline

While production-grade pipelines are usually built using tools like Databricks Asset Bundles, we will use the click-ops approach here for brevity.

The pipeline to be built will be composed of two components:

  • A bronze ingestion job that runs the same streaming notebook code above with a slight tweak. Instead of displaying the data at the end of the notebook, it will be written to a bronze table.

  • A job to build a silver table incrementally, filter records for posts only and extract the post timestamp and text and then translate the text to English if needed


Prepare the Bronze Ingestion Notebook

Take the same notebook we used in the previous part and remove the last cell which is the shell command displaying the checkpoints offset file.

Then replace the contents of the last cell with the following snippet.

collections = "app.bsky.feed.post,app.bsky.feed.like,app.bsky.feed.repost,app.bsky.graph.follow"
reader = spark.readStream.option("collections", collections).format("bluesky")

df = reader.load()
checkpointLocation = f"/Volumes/workspace/default/checkpoints/bronze"

(
    df
        .writeStream
        .format("delta")
        .option("checkpointLocation", checkpointLocation)
        .trigger(availableNow=True)
        .toTable("workspace.default.bronze")
)

This is the same old code but instead of displaying the streaming data, it will write it to a table called “bronze” in the default schema of the workspace catalog.

Create the Databricks Job and Bronze Ingestion Task

  • From the left side menu, click “Jobs & Pipelines”

  • Click the “Create” button and select the option “job”

  • Change the pipeline name to something like “bluesky-pipeline”

  • In the task name, change it to “bronze-ingestion”

  • Keep the task type as notebook and in the “Path” field, click the selector choose the notebook “bluesky-stream-reader”

  • Keep the compute as serverless and click “Create task”

  • Click the “Run now” button in the top right corner of the screen

  • You can track the run that has just started from the tooltip that appeared or from the job page itself

  • Serverless job compute take a bit more time to spin up on the Free Edition compared to paid accounts so give it a few minutes to start

If everything runs fine, you will get the green column indicator and you can click it to jump into the history run of the notebook if you want.

To verify that data has been loaded correctly, Click the “SQL Editor” link in the side left menu and run a query to show table data. You will need to select/start “Serverless SQL warehouse”.

It seems the bronze table does have the right data. It could be enriched by having columns like ingestion time and so on but let’s keep it simple for now.

Create the Silver Table

The bronze table has data for different collections like posts, likes and following activities. Let’s say we want to have a silver table for posts only plus translate any non-English post text to English. The building of the silver table should also be done incrementally as it will be super-inefficient to build the silver table by running the computation on the whole bronze table every time we want to refresh the silver table.

With Databricks Streaming Tables, this task is actually a piece of cake. We have invested good effort building the streaming data source in Python and now it is time to gain the benefits of our streaming ingestion design. The silver table can be built easily using pure SQL and nothing more and that would be also incremental.

Create a new folder in your personal workspace home folder and call it “silver-pipeline” for example. Go inside that folder and create a new notebook and call it something like “build-bluesky-silver-table”. Just below the notebook name/title, change the notebook language to SQL.

Add this code to the first cell.

CREATE OR REFRESH STREAMING TABLE silver AS
WITH raw AS (
  SELECT
    parse_json(contents) AS post
  FROM
    STREAM bronze
  WHERE
    collection = 'app.bsky.feed.post'
),
flattened AS (
  SELECT
    CAST(post:commit:record:langs[0] AS string) as lang,
    CAST(post:commit:record:text AS string) AS text,
    CAST(CAST(post:time_us AS LONG) / 1000000 AS TIMESTAMP) as timestamp
  FROM
    raw
)
SELECT
  timestamp,
  CASE
    WHEN lang = 'en' THEN text
    ELSE ai_translate(text, 'en')
  END AS post_contents
FROM
  flattened 
WHERE
  length(trim(text)) > 0
  • The first part is registering a streaming table to represent the silver table

  • The table definition starts with a CTE to filter the collections for posts only and then convert the JSON payload to VARIANT data type. Note the STREAM keyword used beside the bronze table name. This is critical to run this pipeline in a streaming fashion not as a batch job.

  • The next part of the CTE extracts the language, text and timestamp of the post.

  • The final step is to pick non-empty posts and use a CASE/WHEN expression to come up with post content and translate it on the fly if the content is not in English. Notice how it is so easy to run AI functions in a classic SQL statement.


Next, go to the “Jobs & Pipelines” page from the side menu. Click Create → ETL Pipeline. Give the pipeline a name like “silver-pipeline”. It is the same name as the folder holding the SQL notebook, you know naming is hard. From the creation options choose “Add existing assets”. Select the folder created before in the two text boxes on the dialog then click Add.

While we can run the pipeline from this interface, let’s do a dry run only and keep the execution to the workflow job that loads the bronze table so that we can wire things together. If everything works as expected, the dry run will be successful as below and it will show a silver table box in the pipeline graph section.

Now back to the “Jobs & Pipelines” page, click on the workflow named “bluesky-pipeline” which is the one we created before to load the bronze table. Click on the tasks pane to edit the workflow. Click the “Add task” button and select “Pipeline” from the dropdown menu.

Fill a name for the task and in the “Pipeline” selector, pick the “silver-pipeline” one we created above.

There are two options to run the silver pipeline:

  • Run the whole workflow which will run both the bronze and silver stages

  • Or click on the silver pipeline task box and individually run it which is quite a good feature to selectively run certain parts of a big workflow.

Let’s go with the second option and navigate to the new run details. Please note that it can take a few seconds to start the new run.

Click into the “build-silver-table” task to go into the pipeline run details page. After a few minutes, you should see a successful run indicator plus an indication of the number of records written to the silver table.

You can inspect the data in the silver table from the catalog link in the side menu then navigate to the workspace catalog then the default database. All the records will be in English as evidence that translation has been done for non-English content.

Final Notes on the Pipeline

In the previous section, the steps to build the bronze and silver table were executed individually. That’s not the common way things will usually run in normal operation. You can simply run the main workflow “bluesky-pipeline” that contains the two tasks. The workflow can be also scheduled to run every X minutes but that may be hard on the Free Edition specially with the volume of data being extracted from Jetstream plus all the AI function calling for the translation. Please remember that calling AI functions can get quite expensive specially if they process a large number of records.

Basically, on the Free Edition an error like the following screenshot can happen.

If it happens, just wait for some time and click the "Repair run” button which makes it easy to run only the failed tasks within a pipeline.

I waited about 10 or 15 minutes and tried the repair and it worked just fine. If you go to the silver table page in the catalog and click the “History” table you should see streaming update transactions indicating the silver table being built incrementally. Similar streaming update transactions appear in the bronze table transaction history but with a larger number of records. Notice that the silver table applies a filter on the bronze table to pick only post plus it excludes posts with empty content as well.

Summary

In this tutorial, we have successfully constructed a complete, end-to-end data pipeline using PySpark DataSource API. Starting from scratch, we performed experimentation via notebooks, built a new streaming data source and made it properly operational in a pipeline that leverages AI functions and streaming tables.

The core of the tutorial was building the new data source that pulls data from the Bluesky social network incrementally. The pipeline can be configured to be almost real time if needed by running it on a dedicated 24x7 cluster. There could be other aspects to consider like Bluesky jetstream quota or throttling rules, but those are things to investigate on a case by case basis.

Everything was done using the new Databricks Free Edition so it is totally free (within certain limits for sure). Also all the moving parts are serverless and there were no VMs or complicated infrastructure to set up. The development process can be done using your preferred IDE and also incorporate unit testing for a solid implementation. This is something hard to do with ClickOps tools like Azure Data Factory.

Next Steps and Further Exploration

We could have stored the whole variant type payload in the silver layer and do the translation on demand or something like that. The tutorial did not cover any error handling so that’s something to consider specially with the new Spark structured logging feature. Databricks has solid alerting options so you can build such capability easily. Another important thing to consider is the security aspects of opening an outbound connection from Databricks VPC/VNET to the outside world in this specific case. If your source data lives inside your cloud infrastrcture, then you would be lucky. But if you connect outside your company/customer network perimeter, then further planning and/or coordination will be required.

You have seen how it is easy to write a new custom data source from scratch as long as the source system provides some API to surface its data. This paradigm shift results in obvious system design simplifications and ditching tools with the sole purpose of data extraction and ingestion.

We encourage you to continue experimenting with this powerful toolset and discover what else you can build.

08/05/2025