import asyncio
from zipfile import ZipFile
from pathlib import Path
from datetime import date
from io import BytesIO
import httpx
import polars as pl
import pandas as pd
5)
pl.Config.set_tbl_rows(= 5
pd.options.display.max_rows
= Path("../data/fec")
fec_dir
async def download_and_save_cm(year: str, client: httpx.AsyncClient):
= ["CMTE_ID", "CMTE_NM", "CMTE_PTY_AFFILIATION"]
cm_cols = {"CMTE_PTY_AFFILIATION": pl.Categorical}
dtypes = f"https://www.fec.gov/files/bulk-downloads/20{year}/cm{year}.zip"
url = await client.get(url)
resp with ZipFile(BytesIO(resp.content)) as z:
pl.read_csv("cm.txt"),
z.read(=False,
has_header=[0, 1, 10],
columns=cm_cols,
new_columns="|",
separator=dtypes,
dtypes/ f"cm{year}.pq")
).write_parquet(fec_dir
async def download_and_save_indiv(year: str, client: httpx.AsyncClient):
= {
dtypes "CMTE_ID": pl.Utf8,
"EMPLOYER": pl.Categorical,
"OCCUPATION": pl.Categorical,
"TRANSACTION_DT": pl.Utf8,
"TRANSACTION_AMT": pl.Int32,
}= f"https://www.fec.gov/files/bulk-downloads/20{year}/indiv{year}.zip"
url = await client.get(url)
resp with ZipFile(BytesIO(resp.content)) as z:
pl.read_csv("itcont.txt"),
z.read(=False,
has_header=[0, 11, 12, 13, 14],
columns=list(dtypes.keys()),
new_columns="|",
separator=dtypes,
dtypes="cp1252",
encoding
).with_columns("TRANSACTION_DT").str.to_date(format="%m%d%Y", strict=False)
pl.col(
).write_parquet(/ f"indiv{year}.pq"
fec_dir
)
= ["08", "10", "12", "14", "16"]
years if not fec_dir.exists():
fec_dir.mkdir()async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client:
= [download_and_save_cm(year, client) for year in years]
cm_tasks = [download_and_save_indiv(year, client) for year in years]
indiv_tasks = cm_tasks + indiv_tasks
tasks await asyncio.gather(*tasks)
6 Scaling
In this chapter we’ll mostly compare Polars to Dask rather than to Pandas. This isn’t an apples-to-apples comparison, because Dask helps scale Pandas but it might help scale Polars too one day. Dask, like Spark, can run on a single node or on a cluster with thousands of nodes.
Polars doesn’t come with any tooling for running on a cluster, but it does have a streaming mode for larger-than-memory datasets on a single machine. It also uses memory more efficiently than Pandas. These two things mean you can use Polars for much bigger data than Pandas can handle, and hopefully you won’t need tools like Dask or Spark until you’re actually running on a cluster.
I use “Dask” here as a shorthand for dask.dataframe
. Dask does a bunch of other stuff too.
The streaming features of Polars are very new at the time of writing, so approach with caution!
6.1 Get the data
We’ll be using political donation data from the FEC. Warning: this takes a few minutes.
6.2 Simple aggregation
Suppose we want to find the most common occupations among political donors. Let’s assume that this data is too big for your machine’s memory to read it in all at once.
We can solve this using Polars streaming, using Dask’s lazy dataframe or simply using Pandas to read the files one by one and keeping a running total:
# otherwise we can't read categoricals from multiple files
pl.enable_string_cache()= (
occupation_counts_pl / "indiv*.pq", cache=False)
pl.scan_parquet(fec_dir "OCCUPATION").value_counts(parallel=True, sort=True))
.select(pl.col(=True)
.collect(streaming
) occupation_counts_pl
OCCUPATION |
---|
struct[2] |
{"RETIRED",1643920} |
{"ATTORNEY",826173} |
{null,620316} |
… |
{"PURNELL MORROW COMPANY",1} |
{"CITY OF BISHOP",1} |
import dask.dataframe as dd
from dask import compute
= dd.read_parquet(
occupation_counts_dd / "indiv*.pq", engine="pyarrow", columns=["OCCUPATION"]
fec_dir "OCCUPATION"].value_counts()
)[ occupation_counts_dd.compute()
OCCUPATION
RETIRED 1643920
ATTORNEY 826173
...
ADUSTON CONSULTING 1
SR, IMMIGRATION PARALEGAL 1
Name: count, Length: 344118, dtype: int64
= sorted(fec_dir.glob("indiv*.pq"))
files
= pd.Series(dtype="int64")
total_counts_pd
for year in files:
= pd.read_parquet(year, columns=["OCCUPATION"], engine="pyarrow")
occ_pd = occ_pd["OCCUPATION"].value_counts()
counts = total_counts_pd.add(counts, fill_value=0).astype("int64")
total_counts_pd
100) total_counts_pd.nlargest(
OCCUPATION
RETIRED 1643920
ATTORNEY 826173
...
ECONOMIST 9336
ENTREPRENEUR 9199
Length: 100, dtype: int64
Polars can handle some larger-than-memory data even without streaming. Thanks to predicate pushdown, we can filter dataframes without reading all the data into memory first. So streaming
mode is most useful for cases where we really do need to read in a lot of data.
6.3 Executing multiple queries in parallel
Often we want to generate multiple insights from the same data, and we need them in separate dataframes. In this case, using collect_all
is more efficient than calling .collect
multiple times, because Polars can avoid repeating common operations like reading the data.
Let’s compute the average donation size, the total donated by employer and the average donation by occupation:
%%time
= pl.scan_parquet(fec_dir / "indiv*.pq")
indiv_pl = indiv_pl.select(pl.col("TRANSACTION_AMT").mean())
avg_transaction_lazy_pl = (
total_by_employer_lazy_pl "EMPLOYER")
indiv_pl.drop_nulls("EMPLOYER")
.group_by("TRANSACTION_AMT").sum()])
.agg([pl.col("TRANSACTION_AMT", descending=True)
.sort(10)
.head(
)= (
avg_by_occupation_lazy_pl "OCCUPATION")
indiv_pl.group_by("TRANSACTION_AMT").mean()])
.agg([pl.col("TRANSACTION_AMT", descending=True)
.sort(10)
.head(
)
= pl.collect_all(
avg_transaction_pl, total_by_employer_pl, avg_by_occupation_pl
[avg_transaction_lazy_pl, total_by_employer_lazy_pl, avg_by_occupation_lazy_pl],=True,
streaming=False, # cannot use CSE with streaming
comm_subplan_elim )
CPU times: user 10.7 s, sys: 1.93 s, total: 12.6 s
Wall time: 4.47 s
%%time
= (
indiv_dd / "indiv*.pq", engine="pyarrow")
dd.read_parquet(fec_dir # pandas and dask want datetimes but this is a date col
.assign(=lambda df: dd.to_datetime(df["TRANSACTION_DT"], errors="coerce")
TRANSACTION_DT
)
)= indiv_dd["TRANSACTION_AMT"].mean()
avg_transaction_lazy_dd = (
total_by_employer_lazy_dd "EMPLOYER", observed=True)["TRANSACTION_AMT"].sum().nlargest(10)
indiv_dd.groupby(
)= (
avg_by_occupation_lazy_dd "OCCUPATION", observed=True)["TRANSACTION_AMT"].mean().nlargest(10)
indiv_dd.groupby(
)= compute(
avg_transaction_dd, total_by_employer_dd, avg_by_occupation_dd
avg_transaction_lazy_dd, total_by_employer_lazy_dd, avg_by_occupation_lazy_dd )
CPU times: user 24.1 s, sys: 1.52 s, total: 25.6 s
Wall time: 19.7 s
The Polars code above tends to be ~3.5x faster than Dask on my machine, which if anything is a smaller speedup than I expected.
We should also profile memory usage, since it could be the case that Polars is just running faster because it’s reading in bigger chunks. According to the fil
profiler, the Dask example’s memory usage peaks at 1450 MiB, while Polars uses ~10% more than that.
Before I forget, here are the results of our computations:
6.3.1 avg_transaction
avg_transaction_pl
TRANSACTION_AMT |
---|
f64 |
1056.45334 |
avg_transaction_dd
1056.4533404188285
6.3.2 total_by_employer
total_by_employer_pl
EMPLOYER | TRANSACTION_AMT |
---|---|
cat | i32 |
"RETIRED" | 694090644 |
"SELF-EMPLOYED" | 561802551 |
"SELF" | 403477909 |
… | … |
"FAHR, LLC" | 76995400 |
"CANDIDATE" | 73542276 |
total_by_employer_dd
EMPLOYER
RETIRED 694090644
SELF-EMPLOYED 561802551
...
FAHR, LLC 76995400
CANDIDATE 73542276
Name: TRANSACTION_AMT, Length: 10, dtype: int32
6.3.3 avg_by_occupation
avg_by_occupation_pl
OCCUPATION | TRANSACTION_AMT |
---|---|
cat | f64 |
"PAULSON AND CO., INC." | 1e6 |
"CO-FOUNDING DIRECTOR" | 875000.0 |
"CO-FOUNDER, DIRECTOR" | 550933.333333 |
… | … |
"CO-PRINCIPAL" | 367000.0 |
"STEPHEN PATRICK LAFFEY" | 333692.0 |
avg_by_occupation_dd
OCCUPATION
PAULSON AND CO., INC. 1000000.0
CO-FOUNDING DIRECTOR 875000.0
...
CO-PRINCIPAL 367000.0
STEPHEN PATRICK LAFFEY 333692.0
Name: TRANSACTION_AMT, Length: 10, dtype: float64
6.4 Filtering
Let’s filter for only the 10 most common occupations and compute some summary statistics:
6.4.1 avg_by_occupation, filtered
Getting the most common occupations:
= (
top_occupations_pl
occupation_counts_pl.select("OCCUPATION")
pl.col("OCCUPATION")
.struct.field(
.drop_nulls()10)
.head(
)
.to_series()
) top_occupations_pl
OCCUPATION |
---|
cat |
"RETIRED" |
"ATTORNEY" |
"PRESIDENT" |
… |
"CONSULTANT" |
"CEO" |
= occupation_counts_dd.head(10).index
top_occupations_dd top_occupations_dd
CategoricalIndex(['RETIRED', 'ATTORNEY', 'PRESIDENT', 'PHYSICIAN', 'HOMEMAKER',
'INFORMATION REQUESTED', 'EXECUTIVE', 'OWNER', 'CONSULTANT',
'CEO'],
categories=['PUBLIC RELATIONS CONSULTANT', 'PRESIDENT', 'PHYSICIAN', 'SENIOR EXECUTIVE', ..., 'PRODUCT DIST', 'EXECUTIVE VICE PRESIDENT, CHIEF COMMUN', 'ACTOR/TEACHER/D', 'SR, IMMIGRATION PARALEGAL'], ordered=False, dtype='category', name='OCCUPATION')
= (
donations_pl_lazy filter(pl.col("OCCUPATION").is_in(top_occupations_pl.to_list()))
indiv_pl."OCCUPATION")
.group_by("TRANSACTION_AMT").mean())
.agg(pl.col(
)= pl.collect_all(
total_avg_pl, occupation_avg_pl "TRANSACTION_AMT").mean()), donations_pl_lazy],
[indiv_pl.select(pl.col(=True,
streaming=False
comm_subplan_elim )
= (
donations_dd_lazy "OCCUPATION"].isin(top_occupations_dd)]
indiv_dd[indiv_dd["OCCUPATION", observed=True)["TRANSACTION_AMT"]
.groupby(
.mean()
.dropna()
)= compute(
total_avg_dd, occupation_avg_dd "TRANSACTION_AMT"].mean(), donations_dd_lazy
indiv_dd[ )
6.4.2 Plotting
These results are small enough to plot:
= (
ax
occupation_avg_pl
.to_pandas()"OCCUPATION")
.set_index(
.squeeze()=False)
.sort_values(ascending="k", width=0.9)
.plot.barh(color
)= ax.get_ylim()
lim *lim, color="C1", linewidth=3)
ax.vlines(total_avg_pl, "Average donation"])
ax.legend([set(xlabel="Donation Amount", title="Average Donation by Occupation") ax.
[Text(0.5, 0, 'Donation Amount'),
Text(0.5, 1.0, 'Average Donation by Occupation')]
= occupation_avg_dd.sort_values(ascending=False).plot.barh(color="k", width=0.9)
ax = ax.get_ylim()
lim *lim, color="C1", linewidth=3)
ax.vlines(total_avg_dd, "Average donation"])
ax.legend([set(xlabel="Donation Amount", title="Average Donation by Occupation") ax.
[Text(0.5, 0, 'Donation Amount'),
Text(0.5, 1.0, 'Average Donation by Occupation')]
6.5 Resampling
Resampling is another useful way to get our data down to a manageable size:
= (
daily_pl "TRANSACTION_DT", "TRANSACTION_AMT"])
indiv_pl.select([
.drop_nulls()"TRANSACTION_DT")
.sort("TRANSACTION_DT", every="1d")
.group_by_dynamic("TRANSACTION_AMT").sum())
.agg(pl.col(filter(
."TRANSACTION_DT")
pl.col(2011, 1, 1), date(2017, 1, 1), closed="left")
.is_between(date(
)"TRANSACTION_AMT") / 1000)
.with_columns(pl.col(=True)
.collect(streaming
)= (
ax
daily_pl.select("TRANSACTION_DT").cast(pl.Datetime), "TRANSACTION_AMT"]
[pl.col(
)
.to_pandas()"TRANSACTION_DT")
.set_index(
.squeeze()=(12, 6))
.plot(figsize
)set(ylim=0, title="Daily Donations", ylabel="$ (thousands)") ax.
[(0.0, 59192.975450000005),
Text(0.5, 1.0, 'Daily Donations'),
Text(0, 0.5, '$ (thousands)')]
= (
daily_dd "TRANSACTION_DT", "TRANSACTION_AMT"]]
indiv_dd[[
.dropna()"TRANSACTION_DT")["TRANSACTION_AMT"]
.set_index("D")
.resample(sum()
."2011":"2016"]
.loc[1000)
.div(
.compute()
)
= daily_dd.plot(figsize=(12, 6))
ax set(ylim=0, title="Daily Donations", ylabel="$ (thousands)") ax.
[(0.0, 59192.97545),
Text(0.5, 1.0, 'Daily Donations'),
Text(0, 0.5, '$ (thousands)')]
6.6 Joining
Polars joins work in streaming mode. Let’s add join the donations data with the committee master data, which contains information about the committees people donate to.
= (
cm_pl # This data is small so we don't use streaming.
# Also, .last isn't available in lazy mode.
/ "cm*.pq")
pl.read_parquet(fec_dir # Some committees change their name, but the ID stays the same
"CMTE_ID", maintain_order=True).last()
.group_by(
) cm_pl
CMTE_ID | CMTE_NM | CMTE_PTY_AFFILIATION |
---|---|---|
str | str | cat |
"C00000042" | "ILLINOIS TOOL WORKS INC. FOR B… | null |
"C00000059" | "HALLMARK CARDS PAC" | "UNK" |
"C00000422" | "AMERICAN MEDICAL ASSOCIATION P… | null |
… | … | … |
"C90017336" | "LUDWIG, EUGENE" | null |
"C90017542" | "CENTER FOR POPULAR DEMOCRACY A… | null |
= (
cm_dd # This data is small but we use dask here as a
# convenient way to read a glob of files.
/ "cm*.pq")
dd.read_parquet(fec_dir
.compute()# Some committees change their name, but the
# ID stays the same.
# If we use .last instead of .nth(-1),
# we get the last non-null value
"CMTE_ID", as_index=False)
.groupby(-1)
.nth(
) cm_dd
CMTE_ID | CMTE_NM | CMTE_PTY_AFFILIATION | |
---|---|---|---|
7 | C00000794 | LENT & SCRIVNER PAC | UNK |
15 | C00001156 | MICHIGAN LEAGUE OF COMMUNITY BANKS POLITICAL A... | NaN |
... | ... | ... | ... |
17649 | C99002396 | AMERICAN POLITICAL ACTION COMMITTEE | NaN |
17650 | C99003428 | THIRD DISTRICT REPUBLICAN PARTY | REP |
28467 rows × 3 columns
Merging:
= indiv_pl.filter(
indiv_filtered_pl "TRANSACTION_DT").is_between(
pl.col(2007, 1, 1), date(2017, 1, 1), closed="both"
date(
)
)= indiv_filtered_pl.join(cm_pl.lazy(), on="CMTE_ID") merged_pl
= indiv_dd[
indiv_filtered_dd "TRANSACTION_DT"] >= pd.Timestamp("2007-01-01"))
(indiv_dd[& (indiv_dd["TRANSACTION_DT"] <= pd.Timestamp("2017-01-01"))
]= dd.merge(indiv_filtered_dd, cm_dd, on="CMTE_ID") merged_dd
Daily donations by party:
= (
party_donations_pl "TRANSACTION_DT", "CMTE_PTY_AFFILIATION"])
merged_pl.group_by(["TRANSACTION_AMT").sum())
.agg(pl.col("TRANSACTION_DT", "CMTE_PTY_AFFILIATION"])
.sort([=True)
.collect(streaming )
= (
party_donations_dd
("TRANSACTION_DT", "CMTE_PTY_AFFILIATION"])[
merged_dd.groupby(["TRANSACTION_AMT"
sum()
].
)
.compute()
.sort_index() )
Plotting daily donations:
= (
ax
party_donations_pl
.pivot(="TRANSACTION_DT", on="CMTE_PTY_AFFILIATION", values="TRANSACTION_AMT"
index1:, :]
)[
.select("TRANSACTION_DT"), pl.col(pl.Int32).rolling_mean(30, min_periods=0)]
[pl.col(
)
.to_pandas()"TRANSACTION_DT")
.set_index("DEM", "REP"]]
[[=["C0", "C3"], figsize=(12, 6), linewidth=3)
.plot(color
)set(title="Daily Donations (30-D Moving Average)", xlabel="Date") ax.
= (
ax
party_donations_dd"CMTE_PTY_AFFILIATION")
.unstack(1:]
.iloc["30D")
.rolling(
.mean()"DEM", "REP"]]
[[=["C0", "C3"], figsize=(12, 6), linewidth=3)
.plot(color
)set(title="Daily Donations (30-D Moving Average)", xlabel="Date") ax.