Source code for pyclarify.views.dataframe

# Copyright 2023 Searis AS

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from itertools import compress
from datetime import datetime
from pydantic import ConfigDict, field_validator, BaseModel, Extra
from typing import ForwardRef, List, Dict, Optional
from pyclarify.__utils__.auxiliary import local_import
from pyclarify.__utils__.time import is_datetime, parse_datetime, time_to_string
from pyclarify.fields.query import SelectionFormat
from pyclarify.fields.constraints import (
    InputID,
    ResourceID,
    IntegrationID,
    NumericalValuesType,
)
from pyclarify.query.query import ResourceQuery, DataQuery


DataFrame = ForwardRef("DataFrame")


[docs]class DataFrame(BaseModel): """ DataFrame structure maps to data structure used in the API for saving time series. Supports merging with other Clarify DataFrame objects and can convert to and from Pandas.DataFrame. Parameters ---------- series: Dict[InputID, List[Union[None, float, int]]] Map of input ids to Array of data points to insert by Input ID. The length of each array must match that of the times array. To omit a value for a given timestamp in times, use the value null. times: List of timestamps Either as a python datetime or as YYYY-MM-DD[T]HH:MM[:SS[.ffffff]][Z or [±]HH[:]MM]]] to insert. Example ------- >>> from pyclarify import DataFrame >>> data = DataFrame( ... series={"INPUT_ID_1": [1, 2], "INPUT_ID_2": [3, 4]}, ... times=["2021-11-01T21:50:06Z", "2021-11-02T21:50:06Z"] ... ) """ times: List[datetime] = None series: Dict[InputID, NumericalValuesType] = None @field_validator("times", mode="before") @classmethod def use_custom_datetime_converter(cls, v): """ :meta private: """ return [parse_datetime(t) for t in v] @field_validator("series") @classmethod def convert_numpy_to_native(cls, v): """ :meta private: """ if isinstance(v, Dict): for key, value in v.items(): v[key] = [None if x != x else x for x in value] return v
[docs] def to_pandas(self): """Convert the instance into a pandas DataFrame. Returns ------- pandas.DataFrame: The pandas DataFrame representing this instance. Example ------- >>> from pyclarify import DataFrame >>> data = DataFrame( ... series={"INPUT_ID_1": [1, 2], "INPUT_ID_2": [3, 4]}, ... times=["2021-11-01T21:50:06Z", "2021-11-02T21:50:06Z"] ... ) >>> data.to_pandas() ... INPUT_ID_1 INPUT_ID_2 ... 2021-11-01 21:50:06+00:00 1.0 3.0 ... 2021-11-02 21:50:06+00:00 2.0 4.0 """ pd = local_import("pandas") df = pd.DataFrame(self.series) df.index = self.times return df
[docs] @classmethod def from_dict(cls, data): """ Converts dictionary to pyclarify.DataFrame. Handles series and flat dictionaries. No need to define time column as only one time column is accepted. """ from copy import deepcopy _data = deepcopy(data) keys = list(_data.keys()) # flatten dict if "series" in keys: series = _data.pop("series").items() _data.update(series) keys = list(_data.keys()) # Check for duplicate axis # find non nan values val_arr = [-1] * len(keys) for i, v in enumerate(_data.values()): for _v in v: if _v == _v: val_arr[i] = _v possible_indexes = [is_datetime(v) for v in val_arr] time_keys = list(compress(keys, possible_indexes)) if sum(possible_indexes) > 1: raise ValueError( f"Unambiguous time index! {time_keys} could be index. Use `time_col` variable or set time to index." ) if sum(possible_indexes) == 0: raise ValueError("No time variable in the data. Can not convert.") times = data[time_keys[0]] try: return DataFrame( times=times, series={key: _data[key] for key in keys if key not in time_keys}, ) except: raise ValueError( f'Could not parse dictionary. "{time_keys[0]}" was used as time axis.' )
[docs] @classmethod def from_pandas(cls, df, time_col=None): """Convert a pandas DataFrame into a Clarify DataFrame. Parameters ---------- df: pandas.DataFrame The pandas.DataFrame object to cast to pyclarify.DataFrame. time_col: str, default None A string denoting the column containing the time axis. If no string is given it is assumed to be the index of the DataFrame. Returns ------- pyclarify.DataFrame: The Clarify DataFrame representing this instance. Example ------- >>> from pyclarify import DataFrame >>> import pandas as pd >>> df = pd.DataFrame(data={"INPUT_ID_1": [1, 2], "INPUT_ID_2": [3, 4]}) >>> df.index = ["2021-11-01T21:50:06Z", "2021-11-02T21:50:06Z"] >>> DataFrame.from_pandas(df) ... DataFrame( ... times=[ ... datetime.datetime(2021, 11, 1, 21, 50, 6, tzinfo=datetime.timezone.utc), ... datetime.datetime(2021, 11, 2, 21, 50, 6, tzinfo=datetime.timezone.utc)], ... series={ ... 'INPUT_ID_1': [1.0, 2.0], ... 'INPUT_ID_2': [3.0, 4.0] ... } ... ) With specific time column. >>> from pyclarify import DataFrame >>> import pandas as pd >>> df = pd.DataFrame(data={ ... "INPUT_ID_1": [1, 2], ... "INPUT_ID_2": [3, 4], ... "timestamps": ["2021-11-01T21:50:06Z", "2021-11-02T21:50:06Z"] ...}) >>> DataFrame.from_pandas(df, time_col="timestamps") ... DataFrame( ... times=[ ... datetime.datetime(2021, 11, 1, 21, 50, 6, tzinfo=datetime.timezone.utc), ... datetime.datetime(2021, 11, 2, 21, 50, 6, tzinfo=datetime.timezone.utc)], ... series={ ... 'INPUT_ID_1': [1.0, 2.0], ... 'INPUT_ID_2': [3.0, 4.0] ... } ... ) """ pd = local_import("pandas") if isinstance(df, pd.DataFrame): series = df.to_dict(orient="list") if isinstance(df, pd.Series): if df.name is not None: series = {df.name: list(df.values)} else: raise ValueError("The series you are converting does not have a name.") if time_col: times = df[time_col].values series.pop(time_col) else: if is_datetime(df.index.values[0]): times = df.index.values else: import warnings warnings.warn( "No obvious time index! Attempting to select based on data.", stacklevel=2, ) possible_indexes = [is_datetime(c) for c in df.values[0]] if sum(possible_indexes) == 0: raise ValueError("No time variable in the data. Can not convert.") col = df.columns[possible_indexes] if sum(possible_indexes) > 1: raise ValueError( f"Unambiguous time index! {list(df.columns[possible_indexes])} could be index. Use `time_col` variable or set time to index." ) else: times = df[col[0]].values series.pop(col[0]) warnings.warn(f'Choosing "{col[0]}" as time axis.', stacklevel=2) return cls(times=list(times), series=series)
[docs] @classmethod def merge(cls, data_frames) -> "DataFrame": """ Method for merging 2 or more Clarify Data Frames. Mapping overlapping signal names to single series. Concatenates timestamps of all data frames. Inserts none value to series not containing entry at a given timestamp. Parameters ---------- data_frames : List[DataFrame] A Clarify DataFrame or a list of Clarify Data_Frames Returns ------- DataFrame : DataFrame Merged data frame of all input data frames and self Example ------- Merging two data frames. >>> df1 = DataFrame( ... series={"INPUT_ID_1": [1, 2], "INPUT_ID_2": [3, 4]}, ... times=["2021-11-01T21:50:06Z", "2021-11-02T21:50:06Z"] ... ) >>> df2 = DataFrame( ... series={"INPUT_ID_1": [5, 6], "INPUT_ID_3": [7, 8]}, ... times=["2021-11-01T21:50:06Z", "2021-11-03T21:50:06Z"] ... ) >>> merged_df = DataFrame.merge([df1, df2]) >>> merged_df.to_pandas() ... INPUT_ID_2 INPUT_ID_1 INPUT_ID_3 ... 2021-11-01 21:50:06+00:00 3.0 5.0 7.0 ... 2021-11-02 21:50:06+00:00 4.0 2.0 NaN ... 2021-11-03 21:50:06+00:00 NaN 6.0 8.0 Warning ----- Notice from the example above that when time series have overlapping timestamps the last data frame overwrites the first. >>> df1 = DataFrame( ... series={"INPUT_ID_1": [1, 2]}, ... times=["2021-11-01T21:50:06Z", "2021-11-02T21:50:06Z"] ... ) >>> df2 = DataFrame( ... series={"INPUT_ID_1": [5, 6]}, ... times=["2021-11-01T21:50:06Z", "2021-11-03T21:50:06Z"] ... ) >>> DataFrame.merge([df1, df2]) ... INPUT_ID_1 ... 2021-11-01 21:50:06+00:00 5.0 <-- ... 2021-11-02 21:50:06+00:00 2.0 ... 2021-11-03 21:50:06+00:00 6.0 >>> DataFrame.merge([df2, df1]) ... INPUT_ID_1 ... 2021-11-01 21:50:06+00:00 1.0 <-- ... 2021-11-02 21:50:06+00:00 2.0 ... 2021-11-03 21:50:06+00:00 6.0 """ if not isinstance(data_frames, List): raise ValueError( "The input data frames needs to be a list containing at least one Clarify DataFrame" ) for df in data_frames: if not isinstance(df, cls): raise ValueError( f"Expected Clarify Data_Frames in list but got {df.__class__()}" ) signals = [key for df in data_frames for key in df.series.keys()] signals = list(set(signals)) cdf_dict = {} for cdf in data_frames: for signal, values in list(cdf.series.items()): for value, time in zip(values, cdf.times): cdf_dict.setdefault(time, []).append((signal, value)) times = sorted(list(cdf_dict.keys())) # make sure not to reference pointers signal_values = [[None] * len(times) for i in range(len(signals))] for i, time in enumerate(times): for value in cdf_dict[time]: signal_values[signals.index(value[0])][i] = value[1] series = {} for signal, values in zip(signals, signal_values): series[signal] = values return cls(times=times, series=series)
def __add__(self, other): try: if isinstance(other, DataFrame): data = DataFrame.merge([self, other]) return data elif isinstance(other, dict): data = DataFrame.merge([self, DataFrame.from_dict(other)]) else: data = DataFrame.merge([self, DataFrame.from_pandas(other)]) except TypeError as e: raise TypeError(source=self, other=other) from e model_config = ConfigDict(json_encoders={datetime: time_to_string}, extra="forbid")
DataFrame.model_rebuild() class InsertParams(BaseModel): """ :meta private: """ integration: IntegrationID data: DataFrame class CreateSummary(BaseModel): """ :meta private: """ id: ResourceID created: bool model_config = ConfigDict(extra="forbid") class InsertResponse(BaseModel): """ :meta private: """ signalsByInput: Dict[InputID, CreateSummary] model_config = ConfigDict(extra="forbid") class DataFrameParams(BaseModel): """ :meta private: """ query: Optional[ResourceQuery] data: Optional[DataQuery] include: Optional[List[str]] = [] format: Optional[SelectionFormat] = SelectionFormat(dataAsArray=False)