Skip to content

Expression profiling (legacy mode) #936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions activitysim/abm/models/summarize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

import logging
import os
from pathlib import Path

import numpy as np
import pandas as pd

from activitysim.core import expressions, workflow
from activitysim.core import expressions, timing, workflow
from activitysim.core.configuration.base import PreprocessorSettings, PydanticReadable
from activitysim.core.los import Network_LOS

Expand Down Expand Up @@ -353,6 +354,12 @@ def summarize(
}
)

if state.settings.expression_profile:
perf_log_file = Path(trace_label + ".log")
else:
perf_log_file = None
performance_timer = timing.EvalTiming(perf_log_file)

for i, row in spec.iterrows():
out_file = row["Output"]
expr = row["Expression"]
Expand All @@ -361,15 +368,19 @@ def summarize(
if out_file.startswith("_"):
logger.debug(f"Temp Variable: {expr} -> {out_file}")

locals_d[out_file] = eval(expr, globals(), locals_d)
with performance_timer.time_expression(expr):
locals_d[out_file] = eval(expr, globals(), locals_d)
continue

logger.debug(f"Summary: {expr} -> {out_file}.csv")

resultset = eval(expr, globals(), locals_d)
with performance_timer.time_expression(expr):
resultset = eval(expr, globals(), locals_d)
resultset.to_csv(
state.get_output_file_path(
os.path.join(output_location, f"{out_file}.csv")
),
index=False,
)

performance_timer.write_log(state)
9 changes: 8 additions & 1 deletion activitysim/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import numpy as np

from activitysim.core import chunk, config, mem, tracing, workflow
from activitysim.core import chunk, config, mem, timing, tracing, workflow
from activitysim.core.configuration import FileSystem, Settings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -423,6 +423,13 @@ def run(args):
if memory_sidecar_process:
memory_sidecar_process.stop()

if state.settings.expression_profile:
# generate a summary of slower expression evaluation times
# across all models and write to a file
analyze = timing.AnalyzeEvalTiming(state)
analyze.component_report()
analyze.subcomponent_report()

return 0


Expand Down
49 changes: 30 additions & 19 deletions activitysim/core/assign.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import logging
from builtins import object, zip
from collections import OrderedDict
from pathlib import Path

import numpy as np
import pandas as pd

from activitysim.core import chunk, util, workflow
from activitysim.core import chunk, timing, util, workflow

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -275,6 +276,12 @@ def to_series(x):

assert assignment_expressions.shape[0] > 0

if state.settings.expression_profile:
perf_log_file = Path(trace_label + ".log")
else:
perf_log_file = None
performance_timer = timing.EvalTiming(perf_log_file)

trace_assigned_locals = trace_results = None
if trace_rows is not None:
# convert to numpy array so we can slice ndarrays as well as series
Expand Down Expand Up @@ -311,24 +318,25 @@ def to_series(x):
n_randoms += 1
assignment_expressions.loc[expression_idx, "expression"] = expression
if n_randoms:
try:
random_draws = state.get_rn_generator().normal_for_df(
df, broadcast=True, size=n_randoms
)
except RuntimeError:
pass
else:
_locals_dict["random_draws"] = random_draws
with performance_timer.time_expression("<RANDOM DRAWS>"):
try:
random_draws = state.get_rn_generator().normal_for_df(
df, broadcast=True, size=n_randoms
)
except RuntimeError:
pass
else:
_locals_dict["random_draws"] = random_draws

def rng_lognormal(random_draws, mu, sigma, broadcast=True, scale=False):
if scale:
x = 1 + ((sigma * sigma) / (mu * mu))
mu = np.log(mu / (np.sqrt(x)))
sigma = np.sqrt(np.log(x))
assert broadcast
return np.exp(random_draws * sigma + mu)
def rng_lognormal(random_draws, mu, sigma, broadcast=True, scale=False):
if scale:
x = 1 + ((sigma * sigma) / (mu * mu))
mu = np.log(mu / (np.sqrt(x)))
sigma = np.sqrt(np.log(x))
assert broadcast
return np.exp(random_draws * sigma + mu)

_locals_dict["rng_lognormal"] = rng_lognormal
_locals_dict["rng_lognormal"] = rng_lognormal

sharrow_enabled = state.settings.sharrow

Expand Down Expand Up @@ -356,7 +364,8 @@ def rng_lognormal(random_draws, mu, sigma, broadcast=True, scale=False):

if is_temp_singular(target) or is_throwaway(target):
try:
x = eval(expression, globals(), _locals_dict)
with performance_timer.time_expression(expression):
x = eval(expression, globals(), _locals_dict)
except Exception as err:
logger.error(
"assign_variables error: %s: %s", type(err).__name__, str(err)
Expand Down Expand Up @@ -384,7 +393,8 @@ def rng_lognormal(random_draws, mu, sigma, broadcast=True, scale=False):

# FIXME should whitelist globals for security?
globals_dict = {}
expr_values = to_series(eval(expression, globals_dict, _locals_dict))
with performance_timer.time_expression(expression):
expr_values = to_series(eval(expression, globals_dict, _locals_dict))

if sharrow_enabled:
if isinstance(expr_values.dtype, pd.api.types.CategoricalDtype):
Expand Down Expand Up @@ -459,4 +469,5 @@ def rng_lognormal(random_draws, mu, sigma, broadcast=True, scale=False):
inplace=True,
)

performance_timer.write_log(state)
return variables, trace_results, trace_assigned_locals
9 changes: 9 additions & 0 deletions activitysim/core/configuration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,15 @@ def pandas_option_context(self):
else:
yield

performance_log: Path | None = None
"""Log runtime performance to this file.

The runtime performance log shows the time taken to evaluate each
expression in the specification files. It is useful for debugging
performance issues with complex expressions.

If set to None (the default), no performance logging will be done."""

def subcomponent_settings(self, subcomponent: str) -> ComputeSettings:
"""Get the sharrow settings for a particular subcomponent."""
return ComputeSettings(
Expand Down
35 changes: 35 additions & 0 deletions activitysim/core/configuration/top.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,41 @@ def _check_store_skims_in_shm(self):
own that pollutes the collected data.
"""

expression_profile: bool | None = None
"""
Track the runtime of each individual expression in each spec file.

.. versionadded:: 1.4

The default value of this setting is `None`, which sets no global control
of expression profiling, and allows this feature to be turned on or off
for individual components. If set to `True`, all components will have
expression profiling enabled, and the outputs will be written to files named
based on the trace label unless explicitly set in the `compute_settings` for
individual components. If set to `False`, all components will have expression
profiling disabled.

This is generally a developer-only feature and not needed for regular usage.
It will add some overhead to the model run, which is only valuable if you
expect the review the expression runtimes with an eye towards improving them.
Production model users should typically have this set explicitly to `False`.
"""

expression_profile_cutoff: float = 0.1
"""
Minimum runtime for an expression to be included in the expression profile.

.. versionadded:: 1.4

Expressions that take less than this amount of time to evaluate will not be
included in the summary report of expression profiling generated at the end
of a model run. For large scale models, this value can be increased to make
the report file smaller, as only the largest values will typically be of
interest.

This setting has no effect if :py:attr:`expression_profile` is not `True`.
"""

benchmarking: bool = False
"""
Flag this model run as a benchmarking run.
Expand Down
1 change: 1 addition & 0 deletions activitysim/core/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def compute_columns(
df,
_locals_dict,
trace_rows=state.tracing.trace_targets(df),
trace_label=trace_label,
)

if trace_results is not None:
Expand Down
28 changes: 22 additions & 6 deletions activitysim/core/interaction_simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
from builtins import zip
from collections import OrderedDict
from datetime import timedelta
from pathlib import Path
from typing import Mapping

import numpy as np
import pandas as pd

from activitysim.core import chunk, logit, simulate, tracing, util, workflow
from activitysim.core import chunk, logit, simulate, timing, tracing, util, workflow
from activitysim.core.configuration.base import ComputeSettings
from activitysim.core.fast_eval import fast_eval

Expand Down Expand Up @@ -263,14 +264,27 @@ def to_series(x):
exprs = spec.index
labels = spec.index

# init a performance timer if needed
if (
state.settings.expression_profile
and compute_settings.performance_log is None
):
perf_log_file = Path(trace_label + ".log")
elif state.settings.expression_profile is False:
perf_log_file = None
else:
perf_log_file = compute_settings.performance_log
performance_timer = timing.EvalTiming(perf_log_file)

with compute_settings.pandas_option_context():
for expr, label, coefficient in zip(exprs, labels, spec.iloc[:, 0]):
try:
# - allow temps of form _od_DIST@od_skim['DIST']
if expr.startswith("_"):
target = expr[: expr.index("@")]
rhs = expr[expr.index("@") + 1 :]
v = to_series(eval(rhs, globals(), locals_d))
with performance_timer.time_expression(expr):
v = to_series(eval(rhs, globals(), locals_d))

# update locals to allows us to ref previously assigned targets
locals_d[target] = v
Expand All @@ -285,10 +299,11 @@ def to_series(x):
# they have a non-zero dummy coefficient to avoid being removed from spec as NOPs
continue

if expr.startswith("@"):
v = to_series(eval(expr[1:], globals(), locals_d))
else:
v = fast_eval(df, expr, resolvers=[locals_d])
with performance_timer.time_expression(expr):
if expr.startswith("@"):
v = to_series(eval(expr[1:], globals(), locals_d))
else:
v = fast_eval(df, expr, resolvers=[locals_d])

if check_for_variability and v.std() == 0:
logger.info(
Expand Down Expand Up @@ -403,6 +418,7 @@ def to_series(x):
trace_label, "eval.trace_eval_results", trace_eval_results
)

performance_timer.write_log(state)
chunk_sizer.log_df(trace_label, "v", None)
chunk_sizer.log_df(
trace_label, "eval.utilities", None
Expand Down
46 changes: 37 additions & 9 deletions activitysim/core/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
configuration,
logit,
pathbuilder,
timing,
tracing,
util,
workflow,
Expand Down Expand Up @@ -628,6 +629,17 @@ def eval_utilities(
if utilities is None or estimator or sharrow_enabled == "test":
trace_label = tracing.extend_trace_label(trace_label, "eval_utils")

if (
state.settings.expression_profile
and compute_settings.performance_log is None
):
perf_log_file = Path(trace_label + ".log")
elif state.settings.expression_profile is False:
perf_log_file = None
else:
perf_log_file = compute_settings.performance_log
performance_timer = timing.EvalTiming(perf_log_file)

# avoid altering caller's passed-in locals_d parameter (they may be looping)
locals_dict = assign.local_utilities(state)

Expand All @@ -654,10 +666,13 @@ def eval_utilities(
with warnings.catch_warnings(record=True) as w:
# Cause all warnings to always be triggered.
warnings.simplefilter("always")
if expr.startswith("@"):
expression_value = eval(expr[1:], globals_dict, locals_dict)
else:
expression_value = fast_eval(choosers, expr)
with performance_timer.time_expression(expr):
if expr.startswith("@"):
expression_value = eval(
expr[1:], globals_dict, locals_dict
)
else:
expression_value = fast_eval(choosers, expr)

if len(w) > 0:
for wrn in w:
Expand Down Expand Up @@ -686,6 +701,7 @@ def eval_utilities(
expression_values[i] = expression_value
i += 1

performance_timer.write_log(state)
chunk_sizer.log_df(trace_label, "expression_values", expression_values)

if estimator:
Expand Down Expand Up @@ -847,7 +863,9 @@ def eval_utilities(
return utilities


def eval_variables(state: workflow.State, exprs, df, locals_d=None):
def eval_variables(
state: workflow.State, exprs, df, locals_d=None, trace_label: str | None = None
):
"""
Evaluate a set of variable expressions from a spec in the context
of a given data table.
Expand All @@ -874,6 +892,9 @@ def eval_variables(state: workflow.State, exprs, df, locals_d=None):
locals_d : Dict
This is a dictionary of local variables that will be the environment
for an evaluation of an expression that begins with @
trace_label : str
The trace label to use for performance logging. If None, performance
logging is not activated.

Returns
-------
Expand Down Expand Up @@ -908,13 +929,20 @@ def to_array(x):

return a

if state.settings.expression_profile and trace_label:
perf_log_file = Path(trace_label + ".log")
else:
perf_log_file = None
performance_timer = timing.EvalTiming(perf_log_file)

values = OrderedDict()
for expr in exprs:
try:
if expr.startswith("@"):
expr_values = to_array(eval(expr[1:], globals_dict, locals_dict))
else:
expr_values = to_array(fast_eval(df, expr))
with performance_timer.time_expression(expr):
if expr.startswith("@"):
expr_values = to_array(eval(expr[1:], globals_dict, locals_dict))
else:
expr_values = to_array(fast_eval(df, expr))
# read model spec should ensure uniqueness, otherwise we should uniquify
assert expr not in values
values[expr] = expr_values
Expand Down
Loading
Loading