Currency arbitrage on Spark and Scala -> part 1

With all the interest over the last few years surrounding cryptocurrencies, paired with their naturally open nature, I thought this might be a nice topic for a series on currency arbitrage. Arbitrage, the practice of exploiting differences in prices of the same asset across multiple markets, is an area of finance particularly well suited to a computational approach. The theory is simple enough; we take an asset, say pork bellies, and find someone selling at one price (£1) and someone buying at another (£2) and simply buy from the first to immediately sell to the second, profiting ourselves £1. Applying this to currencies, the natural extension of this is to start applying this idea across multiple pairs; say we have 3 currencies GBP, USD & JPY (with quotes GPB/USD = 1.5, USD/JPY = 1, JPY/GBP = 1.5) if we trade these simultaneously (or very quickly) GBP->USD->JPY->GBP, we’d end up with more GBP than we started with, risk-free.

In this series we’re going to look at developing an approach to pull market data for cryptocurrencies and identify arbitrage opportunities within this. This is quite a big topic so I’m going to spit this into a couple of posts, first laying down some theory and getting a starter implementation up and running, then we’ll look into writing a feed for the data and feeding this into our calculator.

Modeling the problem

We’re going to be looking at currency pairs through the lens of graphs where the vertices will represent each currency and the edge weights will represent the quote values. Visualy, the example listed in the intro would look something like this:

At it’s core, if we model our currencies as a graph, we’re able to identify arbitrage opportunities by identifying cycles that when the weights are multiplied together, yield a result > 1.

Thankfully for us, graph theory is an extremely mature topic and there are a host of algorithms we can leverage to approach our problem; in this case we’re going to explore the Bellman-Ford algorithm. Bellman-Ford is a really elegant little algorithm and a surprisingly simple exercise in dynamic programming, identifying the shortest path from a specific node to each of the nodes within the graph by progressively building out a picture of potential shortcuts. Clocking in with a worst case complexity of O(|V||E|), it’s a fair bit slower than algorithms like Dijkstra’s [running in O(|E|+(|V|log|V|))] however, critically in this case, it’s able to cope with (and with a few small modifications, identify) negative weight cycles.

Take the following graph:

Bellman-Ford runs V-1 (at most, in this case we don’t need to run this many) relaxations on each node to find the shortest distance to it. In our case we’d have something like this [A->0, B->5, C->9, D->-1].

If we update this so that the weights in our cycle (B->C->D->B) sum to a negative figure, eg:

we can now run this as many times as we want, our algorithm will never terminate at a fixed point; we’ve found a negative weight cycle.

Attempting to apply this to an example using currency pairs, for example the following:

doesn’t quite help us, due to the fact that Bellman-Ford identifies negative sum cycles and we’re looking for positive multiplicative cycles; if we look at our example above, there’s clearly an arbitrage opportunity (ETH -> USD -> BTC ->ETH) however a standard Bellman-Ford won’t pick up any negative weight cycles. Fortunately there’s an easy fix for this; by taking the log of each of our weights prior to processing, we can sum them as normal to find our negative cycles, then exponentiate our result at a later point if we want to calculate our yield. In the above case this looks something like:

scala> val (a,b,c) = (log(1.6), log(0.7), log(1.1))
a: Double = 0.47000362924573563
b: Double = -0.35667494393873245
c: Double = 0.09531017980432493

scala> val logSum = a+b+c
logSum: Double = 0.20863886511132812

scala> math.pow(math.E, logSum)
res9: Double = 1.232

scala> val target = 1.6 * 0.7 * 1.1 // quick sense check that we're able to recover our product successfully
target: Double = 1.232

As Bellman-Ford allows us to identify where we have negative weight cycles, all we need to do at this point is take the negative log of our weights before we run our algorithm, and it will function as normal.

Show me the money

My eventual aim with this series is to try to apply a distributed approach to the problem using Spark, so I’m going to use the GraphX data structures provided in Spark from the start. Our initial implementation is just going be a classic Bellman-Ford and looks like this:

import org.apache.spark.graphx.{Edge, Graph, VertexId}

import scala.annotation.tailrec

case class Reduction(distances: Map[VertexId, Double], previous: Map[VertexId, VertexId])

class BellmanFord {

  def relaxNonSpark(source: VertexId, graph: Graph[String, Double]) = {
    val verticesCount = graph.vertices.count()
    val edges = graph
      .edges
      .collect()

    @tailrec
    def relax(passes: Long, reduction: Reduction): Reduction =
      if (passes == 0) {
        reduction
      }
      else {
        relax(passes - 1, relaxOnce(edges, reduction))
      }

    relax(verticesCount, Reduction(Map(source -> 0d), Map[VertexId, VertexId]()))
  }

  def relaxOnce(edges: Array[Edge[Double]], reduction: Reduction) = {
    edges
      .foldLeft(reduction)((reduction, edge) => {
        val distances = reduction.distances
        val previous = reduction.previous
        (distances.get(edge.srcId), distances.get(edge.dstId)) match {
          case (None, _) => reduction
          case (Some(distanceToSrc), None) =>
            Reduction(distances + (edge.dstId -> (distanceToSrc + edge.attr)), previous + (edge.dstId -> edge.srcId))
          case (Some(distanceToSrc), Some(distanceToDst)) =>
            if (distanceToSrc + edge.attr < distanceToDst)
              Reduction(distances + (edge.dstId -> (distanceToSrc + edge.attr)), previous + (edge.dstId -> edge.srcId))
            else
              reduction
        }
      })
  }
}

And some tests to show it works:

import com.typesafe.scalalogging.Logger
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.scalatest.FunSuite

import scala.math.log

class BellmanFordTest extends FunSuite {

  private val logger = Logger[BellmanFordTest]

  private def vertexId(string: String) = string.hashCode.toLong

  private def vertexIdPair(string: String) = (string.hashCode.toLong, string)

  private val spark = SparkSession.builder
    .master("local[*]")
    .appName("Sample App")
    .getOrCreate()

  private val sc = spark.sparkContext

  private val vertices: RDD[(VertexId, String)] =
    sc.parallelize(Seq(vertexIdPair("A"), vertexIdPair("B"), vertexIdPair("C"), vertexIdPair("D")))

  private val bellman = new BellmanFord()

  test("Graph without negative cycle doesn't change with extra relaxations") {

    val edgesNonNeg: RDD[Edge[Double]] = sc.parallelize(Seq(
      Edge(vertexId("A"), vertexId("B"), 5),
      Edge(vertexId("A"), vertexId("D"), 3),
      Edge(vertexId("B"), vertexId("C"), 3),
      Edge(vertexId("C"), vertexId("D"), -10)
    ))

    val graphWithNoNegativeCycles = Graph(vertices, edgesNonNeg)
    val sourceVertexId = graphWithNoNegativeCycles.vertices.first()._1
    val reduction = bellman.relaxNonSpark(sourceVertexId, graphWithNoNegativeCycles)
    val additionalReduction = bellman.relaxOnce(edgesNonNeg.collect(), reduction)
    logger.info(s"Reduction distances: ${reduction.distances}")
    logger.info(s"Reduction previous map: ${reduction.previous}")
    assert(reduction === additionalReduction)
  }

  test("Graph containing negative cycle does change with extra relaxations") {
    val edgesWithNegativeCycle: RDD[Edge[Double]] = sc.parallelize(Seq(
      Edge(vertexId("A"), vertexId("B"), 5),
      Edge(vertexId("A"), vertexId("D"), 3),
      Edge(vertexId("B"), vertexId("C"), 3),
      Edge(vertexId("C"), vertexId("D"), -10),
      Edge(vertexId("D"), vertexId("C"), 4)
    ))

    val graphWithNegativeCycles = Graph(vertices, edgesWithNegativeCycle)
    val sourceVertexId = graphWithNegativeCycles.vertices.first()._1
    val reduction = bellman.relaxNonSpark(sourceVertexId, graphWithNegativeCycles)
    val additionalReduction = bellman.relaxOnce(edgesWithNegativeCycle.collect(), reduction)
    assert(reduction !== additionalReduction)
  }

  // note this test isn't complete, there's graphs this would fail for, this is just to validate our test data
  test("Graph with negative cycles under multiplication, but not addition, does not change with extra relaxation without preprocessing") {
    val edgesWithNegativeCycle: RDD[Edge[Double]] = sc.parallelize(Seq(
      Edge(vertexId("A"), vertexId("B"), 0.8),
      Edge(vertexId("B"), vertexId("C"), 0.7),
      Edge(vertexId("C"), vertexId("D"), 1.1),
      Edge(vertexId("D"), vertexId("A"), 1.2),
      Edge(vertexId("D"), vertexId("B"), 1.6)
    ))

    val graphWithNegativeCycles = Graph(vertices, edgesWithNegativeCycle)
    val sourceVertexId = graphWithNegativeCycles.vertices.first()._1
    val reduction = bellman.relaxNonSpark(sourceVertexId, graphWithNegativeCycles)
    val additionalReduction = bellman.relaxOnce(edgesWithNegativeCycle.collect(), reduction)
    assert(reduction === additionalReduction)
  }

  test("Graph with negative cycles under multiplication changes with extra relaxation when preprocessing applied") {
    val edgesWithNegativeCycle: RDD[Edge[Double]] = sc.parallelize(Seq(
      Edge(vertexId("A"), vertexId("B"), -log(0.8)),
      Edge(vertexId("B"), vertexId("C"), -log(0.7)),
      Edge(vertexId("C"), vertexId("D"), -log(1.1)),
      Edge(vertexId("D"), vertexId("A"), -log(1.2)),
      Edge(vertexId("D"), vertexId("B"), -log(1.6))
    ))

    val graphWithNegativeCycles = Graph(vertices, edgesWithNegativeCycle)
    val sourceVertexId = graphWithNegativeCycles.vertices.first()._1
    val reduction = bellman.relaxNonSpark(sourceVertexId, graphWithNegativeCycles)
    val additionalReduction = bellman.relaxOnce(edgesWithNegativeCycle.collect(), reduction)
    assert(reduction !== additionalReduction)
  }
}

As you can see, we pass our graph into relaxNonSpark with a given sourceId and our graph then recursively relax our edges N-1 times. As we reduce we’re passing down a Reduction object, which contains a map of each node to it’s (current) distance from source, and a map of each node to the predecessor from which it was last updated. Note: There are a number of optimisations we can make to our implementation straight off the bat, such as terminating early upon seeing no changes to our Reduction object between relaxations, however I’ve purposefully left these out for now.

Conclusion

At this point we’ve seen how we can approach currency exchange from the perspective of graphs, and how to leverage this approach in such a way that we can identify opportunities for arbitrage. In the next part we’ll write a small web-scraper to pull some real cryptocurrency data and apply this to our implementation to identify arbitrage opportunities, before building on this to leverage spark to distribute our problem.

Quickstart with Zeppelin, Scala, Spark & Docker

Recently, I’ve been working on a problem that required doing some exploratory analysis on some of our more high throughput datasources. Where we’ve had to do this kind of work in the past the approach has typically been to reach for the usual suspects; python and pandas, probably running in jupyter notebooks. Whilst I’ve written quite a bit of python over the years and will occasionally reach for it to whip up a script for this or that, I find the lack of type safety for anything more involved than bashing together a script to automate something simple really frustrating, so I decided to try out Apache Spark using Scala. After a quick trial of running some code in both Apache Zeppelin and Jupyter using the spylon-kernal, Zeppelin quickly emerged victorious (in my mind) and so far has been an absolute joy to work with.

The purpose of this post is not to provide yet another x vs y technology breakdown (there’s plenty of those on the topic already), but rather to private a brief intro on how to get up and running quickly with a Zeppelin/ Spark/ Scala stack.

Installation

For a full local installation of zeppelin the official docs recommend that you download the binaries and run these against a (local or otherwise) spark installation, however for trialing out the stack and for situations where you don’t need a full scale cluster, it’s docker to the rescue.

Let’s create our docker-compose.yml:

$ mkdir zeppelin && cd $_ && touch docker-compose.yml

In this we want to insert the following:

version: "3.3"
services:
  zeppelin:
    image: apache/zeppelin:0.9.0
    environment:
      - ZEPPELIN_LOG_DIR=/logs
      - ZEPPELIN_NOTEBOOK_DIR=/notebook
    ports:
      - 8080:8080
    volumes: # to persist our data once we've destroyed our containers
      - ./notebook:/notebook 
      - ./logs:/logs

Now all we need to do now is start up our container:

$ docker-compose up

Wait a minute to let docker start up our container, then navigate to 127.0.0.1:8080 and you should see that Zeppelin has started up:

Speaking to spark

The first thing we’ll want to do is set up a SparkSession, which we’re going to be using as our main entry point; by default zeppelin injects a SparkContext into our environment in a variable named sc, from which we can create our session.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.config(sc.getConf).getOrCreate()

From here let’s start doing something more useful. Rather than going through yet another exercise in map-reduce-to-count-word-frequencies-in-a-string, let’s pop across to the https://ourworldindata.org GitHub repo, and grab some sample data; there’s lots of data on the current COVID pandemic, so let’s use this. I’ll include the actual files used in the GitHub repo for this, but the following link should get you the most up to date data at their repo here.

Spark has multiple API’s we can use to interact with our data, namely Resilient Distributed Datasets (RDD’s), Dataframes and Datasets. Dataframes and Datasets were introduced as of Spark 2.0, and will be the API’s we’re using here. To load our csv data and see our new schema we simply need to call:

val df = spark.read
         .format("csv")
         .option("header", "true") // first line in file is our headers
         .option("mode", "DROPMALFORMED")
         .option("inferSchema", "true") // automatically infers the underlying types, super helpful
         .load("/notebook/vaccinations.csv")

df.printSchema

Let’s start with something really simple, and look to see which country as issued the most vaccinations so far:

val totalVaccinations = df
    .groupBy("location")
    .sum("daily_vaccinations")
    .sort(desc("sum(daily_vaccinations)")) 

// alternatively if we want to use named fields we could also do something like this
df.groupBy("location")
    .agg(sum("daily_vaccinations") as "vaccinationsSum")
    .sort(desc("vaccinationsSum")) 

totalVaccinations.show(10)

This gives us a high level picture of the sum of our efforts, but say we want to find out how this has developed over time, we might start by collecting the total vaccinations together and sorting them by date so we can track how this changes; something like this:

val vaccinations = df.select("location", "date", "total_vaccinations")
        .withColumn("date", to_date(col("date"),"yyyy-MM-dd")) // convert our dates into a more usable format
        .sort("date")
        .sort("location")
vaccinations.createOrReplaceTempView("vaccinations") // we'll use this later

This is obviously rather a lot of data so we can filter this slightly to try to see what’s going on:

vaccinations
    .filter(col("location") === "United States" ||  col("location") === "United Kingdom").sort("location", "date")
    .show(100)
+--------------+----------+------------------+
|      location|      date|total_vaccinations|
+--------------+----------+------------------+
|United Kingdom|2020-12-20|            669674|
|United Kingdom|2020-12-21|              null|
|United Kingdom|2020-12-22|              null|
|United Kingdom|2020-12-23|              null|
|United Kingdom|2020-12-24|              null|
|United Kingdom|2020-12-25|              null|
|United Kingdom|2020-12-26|              null|
|United Kingdom|2020-12-27|            996616|
|United Kingdom|2020-12-28|              null|
|United Kingdom|2020-12-29|              null|
|United Kingdom|2020-12-30|              null|
|United Kingdom|2020-12-31|              null|
|United Kingdom|2021-01-01|              null|
|United Kingdom|2021-01-02|              null|
|United Kingdom|2021-01-03|           1389655|
|United Kingdom|2021-01-04|              null|
|United Kingdom|2021-01-05|              null|
|United Kingdom|2021-01-06|              null|
|United Kingdom|2021-01-07|              null|
|United Kingdom|2021-01-08|              null|
|United Kingdom|2021-01-09|              null|
|United Kingdom|2021-01-10|           2677971|
|United Kingdom|2021-01-11|           2843815|
|United Kingdom|2021-01-12|           3067541|
|United Kingdom|2021-01-13|           3356229|
|United Kingdom|2021-01-14|           3678180|
|United Kingdom|2021-01-15|           4006440|
|United Kingdom|2021-01-16|           4286830|
|United Kingdom|2021-01-17|           4514802|
|United Kingdom|2021-01-18|           4723443|
|United Kingdom|2021-01-19|           5070365|
|United Kingdom|2021-01-20|           5437284|
|United Kingdom|2021-01-21|           5849899|
|United Kingdom|2021-01-22|           6329968|
|United Kingdom|2021-01-23|           6822981|
|United Kingdom|2021-01-24|           7044048|
|United Kingdom|2021-01-25|           7325773|
|United Kingdom|2021-01-26|           7638543|
|United Kingdom|2021-01-27|           7953250|
|United Kingdom|2021-01-28|           8369438|
|United Kingdom|2021-01-29|           8859372|
|United Kingdom|2021-01-30|           9468382|
|United Kingdom|2021-01-31|           9790576|
|United Kingdom|2021-02-01|          10143511|
| United States|2020-12-20|            556208|
| United States|2020-12-21|            614117|
| United States|2020-12-22|              null|
| United States|2020-12-23|           1008025|
| United States|2020-12-24|              null|
| United States|2020-12-25|              null|
| United States|2020-12-26|           1944585|
| United States|2020-12-27|              null|
| United States|2020-12-28|           2127143|
| United States|2020-12-29|              null|
| United States|2020-12-30|           2794588|
| United States|2020-12-31|              null|
| United States|2021-01-01|              null|
| United States|2021-01-02|           4225756|
| United States|2021-01-03|              null|
| United States|2021-01-04|           4563260|
| United States|2021-01-05|           4836469|
| United States|2021-01-06|           5306797|
| United States|2021-01-07|           5919418|
| United States|2021-01-08|           6688231|
| United States|2021-01-09|              null|
| United States|2021-01-10|              null|
| United States|2021-01-11|           8987322|
| United States|2021-01-12|           9327138|
| United States|2021-01-13|          10278462|
| United States|2021-01-14|          11148991|
| United States|2021-01-15|          12279180|
| United States|2021-01-16|              null|
| United States|2021-01-17|              null|
| United States|2021-01-18|              null|
| United States|2021-01-19|          15707588|
| United States|2021-01-20|          16525281|
| United States|2021-01-21|          17546374|
| United States|2021-01-22|          19107959|
| United States|2021-01-23|          20537990|
| United States|2021-01-24|          21848655|
| United States|2021-01-25|          22734243|
| United States|2021-01-26|          23540994|
| United States|2021-01-27|          24652634|
| United States|2021-01-28|          26193682|
| United States|2021-01-29|          27884661|
| United States|2021-01-30|          29577902|
| United States|2021-01-31|          31123299|
| United States|2021-02-01|          32222402|
| United States|2021-02-02|          32780860|
+--------------+----------+------------------+

Still, this is hardly ideal and leads us nicely into the fantastic data visualisation support that zeppelin gives us. In the block where we generated our dataframe containing the total vaccinations by data and country earlier, you’ll see we also created a view using the createOrReplaceTempView function; this creates a view against which we can run good old fashioned sql to query and visualise our data. Simply add in %spark.sql in at the top of your block and you’re good to go:

%spark.sql
/* note that we need to order by date here 
 even though we've already sorted our data frame, if we don't to this 
 zeppelin will group dates where we have values for each element in the group together 
 and our graphs will come out with the order messed up */
select location, date, total_vaccinations from vaccinations order by date

Note that you’ll need to click into settings and select which fields we’re going to be using and for what:

Whist this output is naturally very busy, and a little overwhelming on my 13" MacBook, we’re immediately getting a decent view into our data. Picking a few countries at random we can slim this down to allow us to compare whomever we wish:

val filterList = Seq("United States", "United Kingdom", "France")
val filtered = vaccinations
                    .filter(col("location").isin(filterList : _*))
                    .sort("location", "date")
filtered.createOrReplaceTempView("filtered")
%spark.sql
select * from filtered sort by date

Easy!

One final thing I’d like to touch on. So far we’ve been dealing only with Spark Dataframes, the weakly typed cousin of the Dataset. Whilst I surprisingly really like the stringly-typed nature of the Dataframe, particularly for the speed at which it allows us to play around with the data, for anything other than exploratory analysis or small scale projects this is obviously inadequate. Fortunately Spark adding this information in is trivial. All we need to do is define the shape of our object, and pass this into as as our type parameter.

import sqlContext.implicits._ // import so we can use $ rather than col("some_col")

// note we're defining peopleVaccinated as an Optional here, preventing any nasty surprises
case class Details(
    location: String, 
    isoCode:String, 
    peopleVaccinated: Option[Int], 
    date: java.sql.Timestamp
)

val detailsDs = df.select(
        $"location", 
        $"iso_code" as "isoCode", 
        $"people_vaccinated" as "peopleVaccinated",
        $"date")
    .as[Details]

val ukResults = detailsDs.filter(details => details.location.equals("United Kingdom"))

Additionally, if we supply type information and use Dataset’s, Zeppelin provides us with some autocomplete functionality.

Whilst this support does move us in the right direction, it is a bit sluggish to be honest and isn’t really a substitute for a real IDE.

Conclusion

So far I’ve had a pretty good experience with Zeppelin and I really like what it’s got to offer; the tooling does exactly what I want from it and by running in docker it’s easy to get an environment spun up in a minutes. Whilst the out-of-the-box experience isn’t up to par with a proper IDE, if you’ve got an IntelliJ Ultimate licence, the integration with Big Data Tools brings this in line. Even for simple data analysis, comparing to tools like python/ pandas, the Scala/Spark/Zeppelin stack is a great choice for most use-cases, where we’d typically end up using a python/pandas stack.