import math
import warnings
from warnings import simplefilter
import numpy as np
import pandas as pd
import pandera.extensions as extensions
try:
import pandera.pandas as pa
except ModuleNotFoundError:
import pandera as pa
from scipy.spatial import KDTree
from databallpy.features.covered_distance import (
_add_covered_distance_interval,
_parse_intervals,
_validate_inputs,
)
from databallpy.features.differentiate import _differentiate
from databallpy.features.feature_utils import _check_column_ids
from databallpy.features.filters import _filter_data
from databallpy.features.pitch_control import get_pitch_control_single_frame
from databallpy.features.player_possession import (
get_ball_losses_and_updated_gain_idxs,
get_distance_between_ball_and_players,
get_initial_possessions,
get_start_end_idxs,
get_valid_gains,
)
from databallpy.features.pressure import (
calculate_l,
calculate_variable_dfront,
calculate_z,
)
from databallpy.utils.constants import MISSING_INT
from databallpy.utils.logging import create_logger, logging_wrapper
from databallpy.utils.warnings import DataBallPyWarning
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
LOGGER = create_logger(__name__)
logging_wrapper(__file__)
@extensions.register_check_method()
def check_first_frame(df):
ball_alive_mask = df["ball_status"] == "alive"
first_frame = df.loc[ball_alive_mask, "ball_x"].first_valid_index()
check_passed = (
abs(df.loc[first_frame, "ball_x"]) < 7.0
and abs(df.loc[first_frame, "ball_y"]) < 5.0
)
if not check_passed:
x_start = df.loc[first_frame, "ball_x"]
y_start = df.loc[first_frame, "ball_y"]
message = (
"The middle point of the pitch should be (0, 0), "
f"now the kick-off is at ({x_start}, {y_start}). "
"Either the recording has started too late or the ball_status "
"is not set to 'alive' in the beginning. Please check and "
" change the tracking data if desired."
"\n NOTE: The quality of the synchronisation of the tracking "
"and event data might be affected."
)
LOGGER.warning(message)
warnings.warn(message=message, category=DataBallPyWarning)
return True
@extensions.register_check_method()
def check_ball_status(df):
frames_alive = df["ball_status"].value_counts()["alive"]
len_df = len(df[df["gametime_td"] != "Break"])
check_passed = frames_alive > (len_df / 2)
if not check_passed:
message = (
"The ball status is alive for less than half of the"
" full game. Ball status is uses for synchronisation; "
"check the quality of the data before synchronising event and "
"tracking data."
)
LOGGER.warning(message)
warnings.warn(message=message, category=DataBallPyWarning)
return True
@extensions.register_check_method()
def check_all_locations(df):
cols = [x[:-2] for x in df.columns if x.endswith("_x")]
message = None
for col_id in cols:
if f"{col_id}_y" not in df.columns:
message = f"Missing column {col_id}_y. Please check the column names."
break
if not df[f"{col_id}_x"].abs().max() < 65:
message = f"Column {col_id}_x has values outside the pitch dimensions."
break
if not df[f"{col_id}_y"].abs().max() < 45:
message = f"Column {col_id}_y has values outside the pitch dimensions."
break
if message is not None:
LOGGER.warning(message)
warnings.warn(message=message, category=DataBallPyWarning)
return True
class TrackingDataSchema(pa.DataFrameModel):
frame: pa.typing.Series[int] = pa.Field(unique=True)
datetime: pa.typing.Series[object] = pa.Field(nullable=True, coerce=True)
@pa.check("datetime")
def is_timestamp(self, series: pa.typing.Series[object]) -> bool:
return series.dropna().apply(lambda x: isinstance(x, pd.Timestamp)).all()
@pa.check("datetime")
def after_1975(self, series: pa.typing.Series[object]) -> bool:
return (
series.dropna()
.apply(lambda x: x >= pd.Timestamp("1975-01-01", tz=x.tzinfo))
.all()
)
@pa.check("datetime")
def before_now(self, series: pa.typing.Series[object]) -> bool:
return series.dropna().apply(lambda x: x <= pd.Timestamp.now(tz=x.tzinfo)).all()
ball_x: pa.typing.Series[float] = pa.Field(ge=-62.5, le=62.5, nullable=True)
ball_y: pa.typing.Series[float] = pa.Field(ge=-45, le=45, nullable=True)
ball_z: pa.typing.Series[float] = pa.Field(ge=-5, le=45, nullable=True)
ball_status: pa.typing.Series[str] = pa.Field(isin=["alive", "dead"], nullable=True)
team_possession: pa.typing.Series[str] = pa.Field(nullable=True)
class Config:
check_first_frame = ()
check_ball_status = ()
check_all_locations = ()
[docs]
class TrackingData(pd.DataFrame):
"""This is the tracking data class. It contains the tracking data for every
frame as well as the provider and frame_rate. Additionaly it contains some
basic functions to add columns to the tracking data or manipulate existing columns
Args:
tracking_data (pd.DataFrame): tracking data of the game
provider (str): provider of the tracking data
frame_rate (int): framerate of the tracking data
"""
def __init__(
self,
*args,
provider: str = "unspecified",
frame_rate: int | float = MISSING_INT,
**kwargs,
):
super().__init__(*args, **kwargs)
self._provider = provider
self._frame_rate = frame_rate
def __getstate__(self):
state = self.__dict__.copy()
state["_provider"] = self._provider
state["_frame_rate"] = self._frame_rate
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._provider = state.get("_provider", "unspecified")
self._frame_rate = state.get("_frame_rate", MISSING_INT)
@property
def _constructor(self):
def wrapper(*args, provider=self.provider, frame_rate=self.frame_rate, **kwargs):
return TrackingData(
*args, provider=provider, frame_rate=frame_rate, **kwargs
)
return wrapper
@property
def provider(self):
return self._provider
@provider.setter
def provider(self, _):
raise AttributeError("Cannot set provider attribute of tracking data")
@property
def frame_rate(self):
return self._frame_rate
@frame_rate.setter
def frame_rate(self, _):
raise AttributeError("Cannot set frame rate attribute of tracking data")
[docs]
def add_velocity(
self,
column_ids: str | list[str],
filter_type: str | None = None,
window_length: int = 7,
polyorder: int = 2,
max_velocity: float = np.inf,
allow_overwrite: bool = False,
) -> None:
"""Function that adds velocity columns to the tracking data based on the position
columns
Args:
self
column_ids (str | list[str]): columns for which velocity should be calculated.
filter_type (str, optional): filter type to use. Defaults to None.
Options are `moving_average` and `savitzky_golay`.
window_length (int, optional): window size for the filter. Defaults to 7.
polyorder (int, optional): polynomial order for the filter. Defaults to 2.
max_velocity (float, optional): maximum value for the velocity.
Defaults to np.inf.
allow_overwrite (bool): Whether or not it is allowed to overwrite existing values
Note: if "_velocity" exists, but "_vx" and "_vy" not, and allow_overwrite is
set to False, "_vx" and "_vy" will be computed and added, but "_velocity"
is kept the same, and therefore does not correspond with the other values.
Defaults to False.
Returns:
None
Raises:
ValueError: if filter_type is not one of `moving_average`, `savitzky_golay`,
or None.
Note:
The function will delete the columns in input_columns with the velocity if
they already exist.
"""
if isinstance(column_ids, str):
column_ids = [column_ids]
if filter_type not in ["moving_average", "savitzky_golay", None]:
raise ValueError(
"filter_type should be one of: 'moving_average', "
f"'savitzky_golay', None, got: {filter_type}"
)
_differentiate(
self,
new_name="velocity",
metric="",
frame_rate=self.frame_rate,
filter_type=filter_type,
window=window_length,
poly_order=polyorder,
column_ids=column_ids,
max_val=max_velocity,
inplace=True,
allow_overwrite=allow_overwrite,
)
[docs]
def add_acceleration(
self,
column_ids: str | list[str],
filter_type: str | None = None,
window_length: int = 25,
polyorder: int = 2,
max_acceleration: float = np.inf,
allow_overwrite: bool = False,
) -> None:
"""Function that adds acceleration columns to the tracking data based on the
position columns.
Args:
column_ids (str | list[str]): Columns for which acceleration should be calculated.
filter_type (str, optional): Filter type to use. Defaults to None.
Options are `moving_average` and `savitzky_golay`.
window_length (int, optional): Window size for the filter. Defaults to 25.
polyorder (int, optional): Polynomial order for the filter. Defaults to 2.
max_acceleration (float, optional): Maximum value for the acceleration. Defaults to np.inf.
allow_overwrite (bool): Whether or not it is allowed to overwrite existing values.
Returns:
None
Raises:
ValueError: If filter_type is not one of `moving_average`, `savitzky_golay`, or None.
ValueError: If velocity was not found in the DataFrame for the input_columns.
Note:
If "_acceleration" exists, but "_ax" and "_ay" do not, and allow_overwrite is False,
"_ax" and "_ay" will be computed and added, but "_acceleration" is kept unchanged.
Therefore, it may not correspond with the other values.
The function will delete acceleration columns if they already exist.
"""
if isinstance(column_ids, str):
column_ids = [column_ids]
if filter_type not in ["moving_average", "savitzky_golay", None]:
raise ValueError(
"filter_type should be one of: 'moving_average', "
f"'savitzky_golay', None, got: {filter_type}"
)
for column_id in column_ids:
if (
column_id + "_vx" not in self.columns
or column_id + "_vy" not in self.columns
):
raise ValueError(
f"Velocity was not found for {column_id} in the DataFrame. "
" Please calculate velocity first using get_velocity() function."
)
_differentiate(
self,
new_name="acceleration",
metric="v",
frame_rate=self.frame_rate,
filter_type=filter_type,
window=window_length,
poly_order=polyorder,
column_ids=column_ids,
max_val=max_acceleration,
inplace=True,
allow_overwrite=allow_overwrite,
)
[docs]
def add_individual_player_possession(
self,
pz_radius: float = 1.5,
bv_threshold: float = 5.0,
ba_threshold: float = 10.0,
min_frames_pz: int = 0,
) -> None:
"""Function to calculate the individual player possession based on the tracking
data. The method uses the methodology of the paper of Vidal-Codina et al. (2022):
"Automatic Event Detection in Football Using Tracking Data".
Args:
self.
pz_radius (float, optional): The radius of the possession zone constant.
Defaults to 1.5.
bv_threshold (float, optional): The ball velocity threshold in m/s.
Defaults to 5.0.
ba_threshold (float, optional): The ball angle threshold in degrees.
Defaults to 10.0.
min_frames_pz (int, optional): The minimum number of frames that the ball
has to be in the possession zone to be considered as a possession.
Defaults to 0.
Returns:
None
"""
if "ball_velocity" not in self.columns:
raise ValueError(
"The tracking data should have a column 'ball_velocity'. Use the "
"add_velocity function to add the ball velocity."
)
distances_df = get_distance_between_ball_and_players(self)
initial_possession = get_initial_possessions(pz_radius, distances_df)
possession_start_idxs, possession_end_idxs = get_start_end_idxs(
initial_possession
)
valid_gains = get_valid_gains(
self,
possession_start_idxs,
possession_end_idxs,
bv_threshold,
ba_threshold,
min_frames_pz,
)
valid_gains_start_idxs, ball_losses_idxs = get_ball_losses_and_updated_gain_idxs(
possession_start_idxs, possession_end_idxs, valid_gains, initial_possession
)
possession = np.full(len(self), None, dtype=object)
for start, end in zip(valid_gains_start_idxs, ball_losses_idxs):
possession[start:end] = initial_possession[start]
alive_mask = self["ball_status"] == "alive"
possession[~alive_mask] = None
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=pd.errors.PerformanceWarning)
self["player_possession"] = possession
[docs]
def get_covered_distance(
self,
column_ids: list[str],
velocity_intervals: tuple[float, ...] | tuple[tuple[float, ...], ...] = (),
acceleration_intervals: tuple[float, ...] | tuple[tuple[float, ...], ...] = (),
start_idx: int | None = None,
end_idx: int | None = None,
) -> pd.DataFrame:
"""Calculates the distance covered based on the velocity magnitude at each frame.
This function requires the `add_velocity` function to be called. Optionally,
it can also calculate the distance covered within specified velocity and/or
acceleration intervals.
Args:
self.
column_ids (list[str]): columns for which covered distance should be
calculated
velocity_intervals (optional): tuple that contains the velocity interval(s).
Defaults to ()
acceleration_intervals (optional): tuple that contains the acceleration
interval(s). Defaults to ()
start_idx (int, optional): start index of the tracking data. Defaults to None.
end_idx (int, optional): end index of the tracking data. Defaults to None
Returns:
pd.DataFrame: DataFrame with the covered distance for each player. The
columns are the player_ids and the rows are the covered distance for each
player. If velocity_intervals or acceleration_intervals are provided, the
columns will be the player_ids and the intervals. The rows will be the
covered distance for each player within the specified intervals.
Notes:
The function requires the velocity for every player calculated with the
add_velocity function. The acceleration for every player depends on the
presence of acceleration intervals in the input
"""
_validate_inputs(
self,
column_ids,
self.frame_rate,
acceleration_intervals,
start_idx,
end_idx,
)
column_ids = sorted(column_ids)
velocity_intervals = (
_parse_intervals(velocity_intervals) if len(velocity_intervals) > 0 else []
)
acceleration_intervals = (
_parse_intervals(acceleration_intervals)
if len(acceleration_intervals) > 0
else []
)
result_dict = (
{"total_distance": []}
| {
f"total_distance_velocity_{interval[0]}_{interval[1]}": []
for interval in velocity_intervals
}
| {
f"total_distance_acceleration_{interval[0]}_{interval[1]}": []
for interval in acceleration_intervals
}
)
tracking_data_velocity = pd.concat(
[self[player_id + "_velocity"] for player_id in column_ids], axis=1
).fillna(0)
tracking_data_velocity.columns = tracking_data_velocity.columns.str.replace(
"_velocity", ""
)
distance_per_frame = tracking_data_velocity / self.frame_rate
start_idx = start_idx if start_idx is not None else self.index[0]
end_idx = end_idx if end_idx is not None else self.index[-1]
distance_per_frame = distance_per_frame.loc[start_idx:end_idx]
self = self.loc[start_idx:end_idx]
result_dict["total_distance"] = distance_per_frame.sum().values
for intervals, interval_name in zip(
[velocity_intervals, acceleration_intervals], ["velocity", "acceleration"]
):
if len(intervals) > 0:
result_dict = _add_covered_distance_interval(
result_dict,
interval_name,
self,
distance_per_frame,
intervals,
column_ids,
)
return pd.DataFrame(result_dict, index=column_ids)
[docs]
def filter_tracking_data(
self,
column_ids: str | list[str],
filter_type: str = "savitzky_golay",
window_length: int = 7,
polyorder: int = 2,
) -> None:
"""Function to filter tracking data in specified DataFrame columns.
Args:
self.
column_ids (str| list[str]): List of column IDs to apply the filter to.
filter_type (str, optional): Type of filter to use. Defaults to
"savitzky_golay". Options: {"moving_average", "savitzky_golay"}.
window_length (int, optional): Window length of the filter. Defaults to 7.
polyorder (int, optional): Polyorder to use when the savitzky_golay filter
is selected. Defaults to 2.
Returns:
None
"""
if isinstance(column_ids, str):
column_ids = [column_ids]
_check_column_ids(self, column_ids)
if not isinstance(window_length, int):
raise TypeError(
f"window_length should be of type int, not {type(window_length)}"
)
if not isinstance(polyorder, int):
raise TypeError(f"polyorder should be of type int, not {type(polyorder)}")
if filter_type not in ["moving_average", "savitzky_golay"]:
raise ValueError(
"filter_type should be one of: 'moving_average', 'savitzky_golay'"
f", got: {filter_type}"
)
xy_columns = [
col
for col in self.columns
if "".join(col.split("_")[:-1]) in column_ids and col[-1] in ["x", "y"]
]
for col in xy_columns:
if filter_type == "savitzky_golay":
self[col] = _filter_data(
self[col].to_numpy(),
filter_type="savitzky_golay",
window_length=window_length,
polyorder=polyorder,
)
elif filter_type == "moving_average":
self[col] = np.convolve(
self[col], np.ones(window_length) / window_length, mode="same"
)
[docs]
def get_pressure_on_player(
self,
index: int,
column_id: str,
pitch_size: list[float, float],
d_front: str | float = "variable",
d_back: float = 3.0,
q: float = 1.75,
) -> np.array:
"""
Function to calculate the pressure in accordance with "Visual Analysis of Pressure
in Soccer", Adrienko et al (2016). In short: pressure is determined as the sum of
pressure of all opponents, which is a function of the angle and the distance to the
player. This function calculates the pressure for a single player.
Args:
self.
index: int, index of the frame for which to analyse pressure.
column_id: str, column name of which player to analyse.
pitch_size: list, length and width of the pitch.
d_front: numeric or str, distance in meters of the front of the pressure oval
if "variable": d_front will be variable based on the location on
the field from the article of Mat Herold et al (2022).
d_back: float, dinstance in meters of the back of the pressure oval.
q: float, quotient of how fast pressure should increase/decrease as distance.
to the player changes.
Returns:
np.array: pressure on player of the specified frame.
"""
if index not in self.index:
raise ValueError(f"index should be in game.tracking_data.index, not {index}")
td_frame = self.loc[index, :]
if d_front == "variable":
d_front = calculate_variable_dfront(
td_frame, column_id, pitch_length=pitch_size[0]
)
team = column_id[:4]
opponent_team = "away" if team == "home" else "home"
tot_pressure = 0
player_xy = [td_frame[column_id + "_x"], td_frame[column_id + "_y"]]
for opponent_column_id in [
x[:-2] for x in td_frame.index if opponent_team in x and "_x" in x
]:
opponent_xy = [
td_frame[opponent_column_id + "_x"],
td_frame[opponent_column_id + "_y"],
]
player_opponent_distance = math.dist(player_xy, opponent_xy)
# opponent not close enough to exert pressure on the player
if player_opponent_distance > max([d_front, d_back]):
continue
z = calculate_z(
td_frame, column_id, opponent_column_id, pitch_length=pitch_size[0]
)
variable_l = calculate_l(d_back, d_front, z)
current_pressure = (
pd.to_numeric(
(1 - player_opponent_distance / variable_l), errors="coerce"
).clip(0)
** q
* 100
)
current_pressure = 0 if pd.isnull(current_pressure) else current_pressure
tot_pressure += current_pressure
return tot_pressure
[docs]
def get_pitch_control(
self,
pitch_dimensions: list[float, float],
n_x_bins: int = 106,
n_y_bins: int = 68,
start_idx: int | None = None,
end_idx: int | None = None,
) -> np.ndarray:
"""
Calculate the pitch control surface for a given period of time. The pitch control
surface is the sum of the team influences of the two teams. The team influence is
the sum of the individual player influences of the team. The player influence is
calculated using the statistical technique presented in the article "Wide Open
Spaces" by Fernandez & Born (2018). It incorporates the position, velocity, and
distance to the ball of a given player to determine the influence degree at each
location on the field. The bivariate normal distribution is utilized to model the
player's influence, and the result is normalized to obtain values within a [0, 1]
range.
The values are then passed through a sigmoid function to obtain the pitch control
values within a [0, 1] range. Values near 1 indicate high pitch control by the home
team, while values near 0 indicate high pitch control by the away team.
Args:
self.
pitch_dimensions (list[float, float]): The dimensions of the pitch.
n_x_bins (int, optional): The number of cells in the width (x) direction.
Defaults to 106.
n_y_bins (int, optional): The number of cells in the height (y) direction.
Defaults to 68.
start_idx (int, optional): The starting index of the period. Defaults to None.
end_idx (int, optional): The ending index of the period. Defaults to None.
Returns:
np.ndarray: 3d pitch control values across the grid.
Size is (len(tracking_data), grid[0].shape[0], grid[0].shape[1]).
"""
start_idx = self.index[0] if start_idx is None else start_idx
end_idx = self.index[-1] if end_idx is None else end_idx
tracking_data = self.loc[start_idx:end_idx]
pitch_control = np.zeros(
(len(tracking_data), n_y_bins, n_x_bins), dtype=np.float32
)
# precompute player ball distances
col_ids = [
x[:-2]
for x in tracking_data.columns
if ("home" in x or "away" in x) and x[-2:] == "_x"
]
player_ball_distances = pd.DataFrame(columns=col_ids, index=tracking_data.index)
for col_id in col_ids:
player_ball_distances[col_id] = np.linalg.norm(
tracking_data[[f"{col_id}_x", f"{col_id}_y"]].values
- tracking_data[["ball_x", "ball_y"]].values,
axis=1,
)
for i, idx in enumerate(tracking_data.index):
pitch_control[i] = get_pitch_control_single_frame(
tracking_data.loc[idx],
pitch_dimensions,
n_x_bins,
n_y_bins,
player_ball_distances=player_ball_distances.loc[idx],
)
return np.array(pitch_control)
[docs]
def get_approximate_voronoi(
self,
pitch_dimensions: list[float, float],
n_x_bins: int = 106,
n_y_bins: int = 68,
start_idx: int | None = None,
end_idx: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Find the nearest player to each cell center in a grid of cells covering the
pitch.
Args:
self.
pitch_dimensions (list[float, float]): The dimensions of the pitch.
n_x_bins (int, optional): The number of cells in the width (x) direction.
Defaults to 106.
n_y_bins (int, optional): The number of cells in the height (y) direction.
Defaults to 68.
start_idx (int, optional): The starting index of the period. Defaults to None.
end_idx (int, optional): The ending index of the period. Defaults to None.
Returns:
tuple[np.ndarray, np.ndarray]: The distances to the nearest player for each
cell center and the column ids of the nearest player. If tracking_data is
a pd.Series, the shape will be (n_y_bins x n_x_bins), otherwise
(len(tracking_data) x n_y_bins x n_x_bins).
"""
start_idx = self.index[0] if start_idx is None else start_idx
end_idx = self.index[-1] if end_idx is None else end_idx
tracking_data = self.loc[start_idx:end_idx]
pitch_length, pitch_width = pitch_dimensions
x_bins = np.linspace(-pitch_length / 2, pitch_length / 2, n_x_bins + 1)
y_bins = np.linspace(-pitch_width / 2, pitch_width / 2, n_y_bins + 1)
cell_centers_x, cell_centers_y = np.meshgrid(
x_bins[:-1] + np.diff(x_bins) / 2, y_bins[:-1] + np.diff(y_bins) / 2
)
all_distances = np.empty(
(len(tracking_data), n_y_bins, n_x_bins), dtype=np.float32
)
all_assigned_players = np.empty(
(len(tracking_data), n_y_bins, n_x_bins), dtype="U7"
)
for i, (_, frame) in enumerate(tracking_data.iterrows()):
player_column_ids = np.array(
[
column[:-2]
for column in frame.index
if column[-2:] in ["_x", "_y"]
and not pd.isnull(frame[column])
and "ball" not in column
]
)
player_positions = np.array(
[
[frame[column + "_x"], frame[column + "_y"]]
for column in player_column_ids
]
).astype(np.float64)
tree = KDTree(player_positions)
cell_centers = np.column_stack(
(cell_centers_x.ravel(), cell_centers_y.ravel())
)
distances, nearest_player_indices = tree.query(cell_centers)
all_assigned_players[i] = player_column_ids[nearest_player_indices].reshape(
n_y_bins, n_x_bins
)
all_distances[i] = distances.reshape(n_y_bins, n_x_bins)
if all_distances.shape[0] == 1:
all_distances = all_distances[0]
all_assigned_players = all_assigned_players[0]
return all_distances, all_assigned_players
[docs]
def add_team_possession(
self, event_data: pd.DataFrame, home_team_id: int, allow_overwrite: bool = False
) -> None | pd.DataFrame:
"""Function to add a column 'team_possession' to the tracking data, indicating
which team has possession of the ball at each frame, either 'home' or 'away'.
Raises:
ValueError: If the tracking and event data are not synchronised.
ValueError: If the home_team_id is not in the event data.
Args:
self
event_data (EventData): Event data for a game
home_team_id (int): The ID of the home team.
allow_overwrite (bool, optional): If "team_possession" column has non null
values, allow_overwrite should be set to true before the function is
executed. Defaults to False.
Returns:
None
"""
if not pd.isnull(self["team_possession"]).all() and not allow_overwrite:
warnings.warn(
"The 'team_possession' column is not empty. If you want to overwrite "
"the column, set allow_overwrite=True.",
category=DataBallPyWarning,
stacklevel=2,
)
return
if "event_id" not in self.columns:
raise ValueError(
"Tracking and event data are not synchronised, please synchronise the"
" data first"
)
if home_team_id not in event_data.team_id.unique():
raise ValueError(
"The home team ID is not in the event data, please check"
" the home team ID"
)
on_ball_events = ["pass", "dribble", "shot"]
current_team_id = event_data.loc[
~pd.isnull(event_data["databallpy_event"]), "team_id"
].iloc[0]
start_idx = 0
self["team_possession"] = None
for event_id in [x for x in self.event_id if x != MISSING_INT]:
event = event_data[event_data.event_id == event_id].iloc[0]
if (
event["databallpy_event"] in on_ball_events
and event.team_id != current_team_id
and event.is_successful == 1
):
end_idx = self[self.event_id == event_id].index[0]
team = "home" if current_team_id == home_team_id else "away"
self.loc[start_idx:end_idx, "team_possession"] = team
current_team_id = event.team_id
start_idx = end_idx
last_team = "home" if current_team_id == home_team_id else "away"
self.loc[start_idx:, "team_possession"] = last_team
[docs]
def add_dangerous_accessible_space(
self, mask: pd.Series = None, **kwargs
) -> None | pd.DataFrame:
"""Function to add a column 'dangerous_accessible_space' to the tracking data,
indicating the accessible space weighted by the expected value (measured by xG) of the respective location.
Warning: Can be expensive, only use for frames that are needed.
SOURCE:
Jonas Bischofberger, Arnold Baca. Dangerous Accessible Space: A Unified Model of Space and Value in Team Sports,
21 August 2025, PREPRINT (Version 1) available at Research Square [https://doi.org/10.21203/rs.3.rs-6932689/v1]
Args:
mask (Series): Boolean filter to calculate fewer values.
Returns:
None
"""
try:
import accessible_space
except ImportError:
raise ImportError(
"This function requires the accessible-space package. Please run `pip install 'accessible-space>=2.0.13'` "
"Or install databallpy using `pip install 'databallpy[accessible-space]'`"
)
mask = pd.Series(True, index=self.index) if mask is None else mask
col_ids = [
x[:-2] for x in self.columns if x.endswith("_x") and not x.startswith("ball")
]
if not all([f"{col_id}_vx" in self.columns.to_list() for col_id in col_ids]):
raise ValueError(
"To dangerous accessible space you need to add velocities of all players. Try using the"
" game.tracking_data.add_velocity method to do so."
)
if "player_possession" not in self.columns.to_list():
raise ValueError(
"To dangerous accessible space you need to add the inidividual player possession column. Try using the"
" game.tracking_data.add_individual_player_possession method to do so."
)
self["team_in_possession"] = (
self["player_possession"]
.str.startswith("home")
.map({True: "home", False: "away"})
)
td_long = self[mask].to_long_format()
td_long["team"] = td_long["column_id"].str[:4]
res = accessible_space.interface.get_dangerous_accessible_space(
td_long,
frame_col="frame",
player_col="column_id",
team_col="team",
x_col="x",
y_col="y",
vx_col="vx",
vy_col="vy",
team_in_possession_col="team_in_possession",
period_col="period_id",
player_in_possession_col="player_possession",
ball_player_id="ball",
**kwargs,
)
td_long.loc[
~pd.isnull(td_long["team_in_possession"]), "dangerous_accessible_space"
] = res.das
del res
td_long = td_long[["frame", "dangerous_accessible_space"]].drop_duplicates()
self["dangerous_accessible_space"] = self.merge(
td_long, on="frame", how="left", validate="one_to_one"
)["dangerous_accessible_space"]
self.drop(columns="team_in_possession", inplace=True)