Something that comes up quite frequently when people start using Spark is "How can I filter my DataFrame using the contents of another DataFrame?". People with SQL experience will immediately look to trying to replicate the following.
SELECT
*
FROM
table_a a
WHERE
EXISTS (SELECT * FROM table_b b WHERE b.Id = a.Id)
So how do you do this in Spark? Well, some people will try to use the Column.isin method which uses varargs, this is okay for a small set of values but if you have a couple of large DataFrames then it's less than optimal as each row needs to be evaluated against the list. So what's the other choice? We can use joins to do the same thing. There are 2 we can use, a SEMI JOIN which is equivalent to our above example of running EXISTS; the other is ANTI JOIN which is equivalent to a NOT EXISTS. Using the above example and keeping the table names as DataFrame names we could re-write this in Scala as:
table_a.join(table_b, Seq("Id"), "left_semi")
These 2 joins are unique in that they only return the output of the left DataFrame, without any content from the right DataFrame.
So what does this look like in practice. Well using Azure Databricks we can quickly create some sample data to try them out. First lets create a couple of DataFrames.
First lets runs a simple query to find heroes which have an arch-enemy.
This uses the SEMI JOIN to keep records in the left DataFrame where there is a matching record in the right DataFrame.
Now, lets have look for heroes who've been a little more active and have removed their arch-enemies (for now).
This time we've used an ANTI JOIN to keep only those records in the left DataFrame where there are no matching records in the right DataFrame.
You'll notice that in the examples the join condition uses the slightly longer form, that's because in this example the columns we're joining on have different names, and also because there is a column in both DataFrames which have the same name.