import asyncio
from zipfile import ZipFile
from pathlib import Path
from datetime import date
from io import BytesIO
import httpx
import polars as pl
import pandas as pd
5)
pl.Config.set_tbl_rows(= 5
pd.options.display.max_rows
= Path("../data/fec")
fec_dir
async def download_and_save_cm(year: str, client: httpx.AsyncClient):
= ["CMTE_ID", "CMTE_NM", "CMTE_PTY_AFFILIATION"]
cm_cols = {"CMTE_PTY_AFFILIATION": pl.Categorical}
dtypes = f"https://www.fec.gov/files/bulk-downloads/20{year}/cm{year}.zip"
url = await client.get(url)
resp with ZipFile(BytesIO(resp.content)) as z:
pl.read_csv("cm.txt"),
z.read(=False,
has_header=[0, 1, 10],
columns=cm_cols,
new_columns="|",
sep=dtypes,
dtypes/ f"cm{year}.pq")
).write_parquet(fec_dir
async def download_and_save_indiv(year: str, client: httpx.AsyncClient):
= {
dtypes "CMTE_ID": pl.Utf8,
"EMPLOYER": pl.Categorical,
"OCCUPATION": pl.Categorical,
"TRANSACTION_DT": pl.Utf8,
"TRANSACTION_AMT": pl.Int32,
}= f"https://www.fec.gov/files/bulk-downloads/20{year}/indiv{year}.zip"
url = await client.get(url)
resp with ZipFile(BytesIO(resp.content)) as z:
pl.read_csv("itcont.txt"),
z.read(=False,
has_header=[0, 11, 12, 13, 14],
columns=list(dtypes.keys()),
new_columns="|",
sep=dtypes,
dtypes="cp1252",
encoding
).with_columns("TRANSACTION_DT").str.strptime(pl.Date, fmt="%m%d%Y", strict=False)
pl.col(
).write_parquet(/ f"indiv{year}.pq"
fec_dir
)
= ["08", "10", "12", "14", "16"]
years if not fec_dir.exists():
fec_dir.mkdir()async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client:
= [download_and_save_cm(year, client) for year in years]
cm_tasks = [download_and_save_indiv(year, client) for year in years]
indiv_tasks = cm_tasks + indiv_tasks
tasks await asyncio.gather(*tasks)
6 Scaling
In this chapter we’ll mostly compare Polars to Dask rather than to Pandas. This isn’t an apples-to-apples comparison, because Dask helps scale Pandas but it might help scale Polars too one day. Dask, like Spark, can run on a single node or on a cluster with thousands of nodes.
Polars doesn’t come with any tooling for running on a cluster, but it does have a streaming mode for larger-than-memory datasets on a single machine. It also uses memory more efficiently than Pandas. These two things mean you can use Polars for much bigger data than Pandas can handle, and hopefully you won’t need tools like Dask or Spark until you’re actually running on a cluster.
I use “Dask” here as a shorthand for dask.dataframe
. Dask does a bunch of other stuff too.
The streaming features of Polars are very new at the time of writing, so approach with caution!
6.1 Get the data
We’ll be using political donation data from the FEC. Warning: this takes a few minutes.
6.2 Simple aggregation
Suppose we want to find the most common occupations among political donors. Let’s assume that this data is too big for your machine’s memory to read it in all at once.
We can solve this using Polars streaming, using Dask’s lazy dataframe or simply using Pandas to read the files one by one and keeping a running total:
# otherwise we can't read categoricals from multiple files
pl.enable_string_cache()= (
occupation_counts_pl / "indiv*.pq", cache=False)
pl.scan_parquet(fec_dir "OCCUPATION").value_counts(parallel=True, sort=True))
.select(pl.col(=True)
.collect(streaming
) occupation_counts_pl
OCCUPATION |
---|
struct[2] |
{"RETIRED",4773715} |
{"NOT EMPLOYED",2715939} |
… |
{"PROFESSOR OF PYSICS",1} |
{"ARTIST/SINGER-SONGWRITER",1} |
import dask.dataframe as dd
from dask import compute
= dd.read_parquet(
occupation_counts_dd / "indiv*.pq", engine="pyarrow", columns=["OCCUPATION"]
fec_dir "OCCUPATION"].value_counts()
)[ occupation_counts_dd.compute()
OCCUPATION
RETIRED 4773715
NOT EMPLOYED 2715939
...
TRANSPORTATION COMPLIANCE DIRECTOR 1
PROASSURANCE 1
Name: count, Length: 579158, dtype: int64
= sorted(fec_dir.glob("indiv*.pq"))
files
= pd.Series(dtype="int64")
total_counts_pd
for year in files:
= pd.read_parquet(year, columns=["OCCUPATION"], engine="pyarrow")
occ_pd = occ_pd["OCCUPATION"].value_counts()
counts = total_counts_pd.add(counts, fill_value=0).astype("int64")
total_counts_pd
100) total_counts_pd.nlargest(
OCCUPATION
RETIRED 4773715
NOT EMPLOYED 2715939
...
SURGEON 25545
OPERATOR 25161
Length: 100, dtype: int64
Polars can handle some larger-than-memory data even without streaming. Thanks to predicate pushdown, we can filter dataframes without reading all the data into memory first. So streaming
mode is most useful for cases where we really do need to read in a lot of data.
6.3 Executing multiple queries in parallel
Often we want to generate multiple insights from the same data, and we need them in separate dataframes. In this case, using collect_all
is more efficient than calling .collect
multiple times, because Polars can avoid repeating common operations like reading the data.
Let’s compute the average donation size, the total donated by employer and the average donation by occupation:
%%time
= pl.scan_parquet(fec_dir / "indiv*.pq")
indiv_pl = indiv_pl.select(pl.col("TRANSACTION_AMT").mean())
avg_transaction_lazy_pl = (
total_by_employer_lazy_pl "EMPLOYER")
indiv_pl.drop_nulls("EMPLOYER")
.group_by("TRANSACTION_AMT").sum()])
.agg([pl.col("TRANSACTION_AMT", descending=True)
.sort(10)
.head(
)= (
avg_by_occupation_lazy_pl "OCCUPATION")
indiv_pl.group_by("TRANSACTION_AMT").mean()])
.agg([pl.col("TRANSACTION_AMT", descending=True)
.sort(10)
.head(
)
= pl.collect_all(
avg_transaction_pl, total_by_employer_pl, avg_by_occupation_pl
[avg_transaction_lazy_pl, total_by_employer_lazy_pl, avg_by_occupation_lazy_pl],=True,
streaming=False, # cannot use CSE with streaming
comm_subplan_elim )
CPU times: user 11.8 s, sys: 2.37 s, total: 14.2 s
Wall time: 4.77 s
%%time
= (
indiv_dd / "indiv*.pq", engine="pyarrow")
dd.read_parquet(fec_dir # pandas and dask want datetimes but this is a date col
.assign(=lambda df: dd.to_datetime(df["TRANSACTION_DT"], errors="coerce")
TRANSACTION_DT
)
)= indiv_dd["TRANSACTION_AMT"].mean()
avg_transaction_lazy_dd = (
total_by_employer_lazy_dd "EMPLOYER", observed=True)["TRANSACTION_AMT"].sum().nlargest(10)
indiv_dd.groupby(
)= (
avg_by_occupation_lazy_dd "OCCUPATION", observed=True)["TRANSACTION_AMT"].mean().nlargest(10)
indiv_dd.groupby(
)= compute(
avg_transaction_dd, total_by_employer_dd, avg_by_occupation_dd
avg_transaction_lazy_dd, total_by_employer_lazy_dd, avg_by_occupation_lazy_dd )
CPU times: user 22.4 s, sys: 3.28 s, total: 25.7 s
Wall time: 15.3 s
The Polars code above tends to be ~3.5x faster than Dask on my machine, which if anything is a smaller speedup than I expected.
We should also profile memory usage, since it could be the case that Polars is just running faster because it’s reading in bigger chunks. According to the fil
profiler, the Dask example’s memory usage peaks at 1450 MiB, while Polars uses ~10% more than that.
Before I forget, here are the results of our computations:
6.3.1 avg_transaction
avg_transaction_pl
TRANSACTION_AMT |
---|
f64 |
563.97184 |
avg_transaction_dd
563.9718398183915
6.3.2 total_by_employer
total_by_employer_pl
EMPLOYER | TRANSACTION_AMT |
---|---|
cat | i32 |
"RETIRED" | 1023306104 |
"SELF-EMPLOYED" | 834757599 |
… | … |
"FAHR, LLC" | 166679844 |
"CANDIDATE" | 75187243 |
total_by_employer_dd
EMPLOYER
RETIRED 1023306104
SELF-EMPLOYED 834757599
...
FAHR, LLC 166679844
CANDIDATE 75187243
Name: TRANSACTION_AMT, Length: 10, dtype: int32
6.3.3 avg_by_occupation
avg_by_occupation_pl
OCCUPATION | TRANSACTION_AMT |
---|---|
cat | f64 |
"CHAIRMAN CEO &… | 1.0233e6 |
"PAULSON AND CO… | 1e6 |
… | … |
"PERRY HOMES" | 500000.0 |
"CHIEF EXECUTIV… | 500000.0 |
avg_by_occupation_dd
OCCUPATION
CHAIRMAN CEO & FOUNDER 1.023333e+06
PAULSON AND CO., INC. 1.000000e+06
...
OWNER, FOUNDER AND CEO 5.000000e+05
CHIEF EXECUTIVE OFFICER/PRODUCER 5.000000e+05
Name: TRANSACTION_AMT, Length: 10, dtype: float64
6.4 Filtering
Let’s filter for only the 10 most common occupations and compute some summary statistics:
6.4.1 avg_by_occupation, filtered
Getting the most common occupations:
= (
top_occupations_pl
occupation_counts_pl.select("OCCUPATION")
pl.col("OCCUPATION")
.struct.field(
.drop_nulls()10)
.head(
)
.to_series()
) top_occupations_pl
OCCUPATION |
---|
cat |
"RETIRED" |
"NOT EMPLOYED" |
… |
"EXECUTIVE" |
"ENGINEER" |
= occupation_counts_dd.head(10).index
top_occupations_dd top_occupations_dd
CategoricalIndex(['RETIRED', 'NOT EMPLOYED', 'ATTORNEY', 'PHYSICIAN',
'HOMEMAKER', 'PRESIDENT', 'PROFESSOR', 'CONSULTANT',
'EXECUTIVE', 'ENGINEER'],
categories=['PUBLIC RELATIONS CONSULTANT', 'PRESIDENT', 'PHYSICIAN', 'SENIOR EXECUTIVE', ..., 'VICE PRESIDENT - FUEL PROCUREMENT', 'INFORMATION TECHNOLOGY SPECI', 'SR MANAGER, PROJECT PLANNING', 'PROASSURANCE'], ordered=False, dtype='category', name='OCCUPATION')
= (
donations_pl_lazy filter(pl.col("OCCUPATION").is_in(top_occupations_pl.to_list()))
indiv_pl."OCCUPATION")
.group_by("TRANSACTION_AMT").mean())
.agg(pl.col(
)= pl.collect_all(
total_avg_pl, occupation_avg_pl "TRANSACTION_AMT").mean()), donations_pl_lazy],
[indiv_pl.select(pl.col(=True,
streaming=False
comm_subplan_elim )
= (
donations_dd_lazy "OCCUPATION"].isin(top_occupations_dd)]
indiv_dd[indiv_dd["OCCUPATION", observed=True)["TRANSACTION_AMT"]
.groupby(
.mean()
.dropna()
)= compute(
total_avg_dd, occupation_avg_dd "TRANSACTION_AMT"].mean(), donations_dd_lazy
indiv_dd[ )
6.4.2 Plotting
These results are small enough to plot:
= (
ax
occupation_avg_pl
.to_pandas()"OCCUPATION")
.set_index(
.squeeze()=False)
.sort_values(ascending="k", width=0.9)
.plot.barh(color
)= ax.get_ylim()
lim *lim, color="C1", linewidth=3)
ax.vlines(total_avg_pl, "Average donation"])
ax.legend([set(xlabel="Donation Amount", title="Average Donation by Occupation") ax.
[Text(0.5, 0, 'Donation Amount'),
Text(0.5, 1.0, 'Average Donation by Occupation')]
= occupation_avg_dd.sort_values(ascending=False).plot.barh(color="k", width=0.9)
ax = ax.get_ylim()
lim *lim, color="C1", linewidth=3)
ax.vlines(total_avg_dd, "Average donation"])
ax.legend([set(xlabel="Donation Amount", title="Average Donation by Occupation") ax.
[Text(0.5, 0, 'Donation Amount'),
Text(0.5, 1.0, 'Average Donation by Occupation')]
6.5 Resampling
Resampling is another useful way to get our data down to a manageable size:
= (
daily_pl "TRANSACTION_DT", "TRANSACTION_AMT"])
indiv_pl.select([
.drop_nulls()"TRANSACTION_DT")
.sort("TRANSACTION_DT", every="1d")
.group_by_dynamic("TRANSACTION_AMT").sum())
.agg(pl.col(filter(
."TRANSACTION_DT")
pl.col(2011, 1, 1), date(2017, 1, 1), closed="left")
.is_between(date(
)"TRANSACTION_AMT") / 1000)
.with_columns(pl.col(=True)
.collect(streaming
)= (
ax
daily_pl.select("TRANSACTION_DT").cast(pl.Datetime), "TRANSACTION_AMT"]
[pl.col(
)
.to_pandas()"TRANSACTION_DT")
.set_index(
.squeeze()=(12, 6))
.plot(figsize
)set(ylim=0, title="Daily Donations", ylabel="$ (thousands)") ax.
[(0.0, 83407.5242),
Text(0.5, 1.0, 'Daily Donations'),
Text(0, 0.5, '$ (thousands)')]
= (
daily_dd "TRANSACTION_DT", "TRANSACTION_AMT"]]
indiv_dd[[
.dropna()"TRANSACTION_DT")["TRANSACTION_AMT"]
.set_index("D")
.resample(sum()
."2011":"2016"]
.loc[1000)
.div(
.compute()
)
= daily_dd.plot(figsize=(12, 6))
ax set(ylim=0, title="Daily Donations", ylabel="$ (thousands)") ax.
/home/user/mambaforge/envs/modern-polars/lib/python3.11/site-packages/partd/pandas.py:138: FutureWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
elif is_datetime64tz_dtype(block):
/home/user/mambaforge/envs/modern-polars/lib/python3.11/site-packages/partd/pandas.py:138: FutureWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
elif is_datetime64tz_dtype(block):
[(0.0, 83407.5242),
Text(0.5, 1.0, 'Daily Donations'),
Text(0, 0.5, '$ (thousands)')]
6.6 Joining
Polars joins work in streaming mode. Let’s add join the donations data with the committee master data, which contains information about the committees people donate to.
= (
cm_pl # This data is small so we don't use streaming.
# Also, .last isn't available in lazy mode.
/ "cm*.pq")
pl.read_parquet(fec_dir # Some committees change their name, but the ID stays the same
"CMTE_ID", maintain_order=True).last()
.group_by(
) cm_pl
CMTE_ID | CMTE_NM | CMTE_PTY_AFFILIATION |
---|---|---|
str | str | cat |
"C00000042" | "ILLINOIS TOOL … | null |
"C00000059" | "HALLMARK CARDS… | "UNK" |
… | … | … |
"C90017336" | "LUDWIG, EUGENE… | null |
"C90017542" | "CENTER FOR POP… | null |
= (
cm_dd # This data is small but we use dask here as a
# convenient way to read a glob of files.
/ "cm*.pq")
dd.read_parquet(fec_dir
.compute()# Some committees change their name, but the
# ID stays the same.
# If we use .last instead of .nth(-1),
# we get the last non-null value
"CMTE_ID", as_index=False)
.groupby(-1)
.nth(
) cm_dd
CMTE_ID | CMTE_NM | CMTE_PTY_AFFILIATION | |
---|---|---|---|
7 | C00000794 | LENT & SCRIVNER PAC | UNK |
15 | C00001156 | MICHIGAN LEAGUE OF COMMUNITY BANKS POLITICAL A... | NaN |
... | ... | ... | ... |
17649 | C99002396 | AMERICAN POLITICAL ACTION COMMITTEE | NaN |
17650 | C99003428 | THIRD DISTRICT REPUBLICAN PARTY | REP |
28467 rows × 3 columns
Merging:
= indiv_pl.filter(
indiv_filtered_pl "TRANSACTION_DT").is_between(
pl.col(2007, 1, 1), date(2017, 1, 1), closed="both"
date(
)
)= indiv_filtered_pl.join(cm_pl.lazy(), on="CMTE_ID") merged_pl
= indiv_dd[
indiv_filtered_dd "TRANSACTION_DT"] >= pd.Timestamp("2007-01-01"))
(indiv_dd[& (indiv_dd["TRANSACTION_DT"] <= pd.Timestamp("2017-01-01"))
]= dd.merge(indiv_filtered_dd, cm_dd, on="CMTE_ID") merged_dd
Daily donations by party:
= (
party_donations_pl "TRANSACTION_DT", "CMTE_PTY_AFFILIATION"])
merged_pl.group_by(["TRANSACTION_AMT").sum())
.agg(pl.col("TRANSACTION_DT", "CMTE_PTY_AFFILIATION"])
.sort([=True)
.collect(streaming )
= (
party_donations_dd
("TRANSACTION_DT", "CMTE_PTY_AFFILIATION"])[
merged_dd.groupby(["TRANSACTION_AMT"
sum()
].
)
.compute()
.sort_index() )
Plotting daily donations:
= (
ax
party_donations_pl
.pivot(="TRANSACTION_DT", columns="CMTE_PTY_AFFILIATION", values="TRANSACTION_AMT"
index1:, :]
)[
.select("TRANSACTION_DT"), pl.col(pl.Int32).rolling_mean(30, min_periods=0)]
[pl.col(
)
.to_pandas()"TRANSACTION_DT")
.set_index("DEM", "REP"]]
[[=["C0", "C3"], figsize=(12, 6), linewidth=3)
.plot(color
)set(title="Daily Donations (30-D Moving Average)", xlabel="Date") ax.
[Text(0.5, 1.0, 'Daily Donations (30-D Moving Average)'), Text(0.5, 0, 'Date')]
= (
ax
party_donations_dd"CMTE_PTY_AFFILIATION")
.unstack(1:]
.iloc["30D")
.rolling(
.mean()"DEM", "REP"]]
[[=["C0", "C3"], figsize=(12, 6), linewidth=3)
.plot(color
)set(title="Daily Donations (30-D Moving Average)", xlabel="Date") ax.
[Text(0.5, 1.0, 'Daily Donations (30-D Moving Average)'), Text(0.5, 0, 'Date')]