Skip to content

New Style API

DfSchema

Bases: BaseModel

Main class of the package

Represents a Schema to check (validate) dataframe against. Schema is flavor-agnostic (does not specify what kind of dataframe it is)

Source code in dfschema/core/core.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class DfSchema(BaseModel, extra=Extra.forbid, arbitrary_types_allowed=True):  # type: ignore
    """Main class of the package

    Represents a Schema to check (validate) dataframe against. Schema
    is flavor-agnostic (does not specify what kind of dataframe it is)
    """

    metadata: Optional[MetaData] = Field(
        MetaData(),
        description="optional metadata, including version and protocol version",
    )

    shape: Optional[ShapeSchema] = Field(None, description="shape expectations")
    columns: Optional[List[ColSchema]] = Field([], description="columns expectations")
    additionalColumns: bool = Field(
        True,
        description="if true, Will allow any additional columns not defined in the schema",
    )

    exactColumnOrder: bool = Field(
        False,
        description="if true, will require order of columns to exactly match column order in schema",
    )

    subsets: Optional[List["SubsetSchema"]] = Field(
        None, description="dataframe subset expectations"
    )

    _exception_pool: List[Exception] = PrivateAttr([])
    _summary: bool = PrivateAttr()

    def _summary_error(self) -> DataFrameSummaryError:
        error_list = "\n".join(
            [
                f"- {e.args[0]}"
                for e in self._exception_pool
                if not isinstance(e, SubsetSummaryError)
            ]
        )
        subset_summaries = "\n".join(
            [
                e.args[0]
                for e in self._exception_pool
                if isinstance(e, SubsetSummaryError)
            ]
        )

        txt = "Dataframe validation failed:"
        if error_list:
            txt += f"\n{error_list}"

        # NOTE: subset errors as a subsection
        if subset_summaries:
            txt += f"\n{subset_summaries}"

        return DataFrameSummaryError(txt)

    def validate_column_presence(self, df: pd.DataFrame) -> None:
        schema_col_names = {col.name for col in self.columns}  # type: ignore
        _validate_column_presence(
            df, schema_col_names, additionalColumns=self.additionalColumns, root=self
        )

    def validate_df(self, df: pd.DataFrame, summary: bool = True) -> None:
        """validate Dataframe aganist this schema

        validate dataframe agains the schema as a dictionary. will raise
        either DataFrameSummaryError (if summary=True) or DataFrameValidationError for specific
        problem (if summary=False)

        ### Example
        ```python
        import pandas as pd
        from dfschema import DfSchema

        path = '/schema.json'

        df = pd.DataFrame({'a':[1,2], 'b':[3,4]})
        dfs.DfSchema.from_file(path).validate(df)
        ```

        Args:
            df (pd.DataFrame): A dataframe to validate
            summary (bool): if `False`, raise exception on first violation (faster), otherwise will collect all violations and raise summary exception (slower)


        """
        self._exception_pool = []
        self._summary = summary

        if not isinstance(df, pd.DataFrame):
            raise DataFrameSchemaError(
                f"Data should be `pd.DataFrame`, got `{type(df)}`"
            )

        if self.shape:
            self.shape.validate_df(df, root=self)

        if self.columns:
            self.validate_column_presence(df)

            for col in (col for col in self.columns if col.name in df.columns):
                col.validate_column(df[col.name], root=self)

        if self.subsets:
            for subset in self.subsets:
                subset.validate_df(df=df, root=self)

        if len(self._exception_pool) > 0:
            error = self._summary_error()
            raise error

    def validate_sql(
        self,
        sql: str,
        con,
        read_sql_kwargs: Optional[dict] = None,
        summary: bool = True,
    ) -> None:
        """validate SQL table. Relies on `pandas.read_sql` to infer datatypes

        Right now does not support sampling, but this could be added in the future

        Args:
            sql (str): SQL statement (query) to run
            con (sqlalchemy.connection): connection to the database
            read_sql_kwargs (dict): Optional set of params to pass to `pd.read_sql_kwargs`
            summary (bool): if `False`, raise exception on first violation (faster), otherwise will collect all violations and raise summary exception (slower)
        Returns:
            None
        """
        df = pd.read_sql(sql, con, **(read_sql_kwargs or {}))
        self.validate_df(df, summary=summary)

    @classmethod
    def from_file(cls, path: Union[str, Path]) -> "DfSchema":
        """create DfSchema from file

        Method supports json and yaml formats
        Note: this is a class method, not instance method.
        PyYaml package is required to read yaml.

        Args:
            path (str or Path): path to the file, either json or yaml"
        Returns:
            DfSchema: DfSchema object instance
        """

        if isinstance(path, str):
            path = Path(path)

        try:
            if path.suffix == ".json":
                with path.open("r") as f:
                    schema = json.load(f)
            elif path.suffix in (".yml", ".yaml"):
                try:
                    import yaml

                    with path.open("r") as f:
                        schema = yaml.safe_load(f)
                except ImportError:
                    raise ImportError("PyYaml is required to load yaml files")
            else:
                raise ValueError(
                    f"Unsupported file extension: {path.suffix}, should be one of .json or .yml"
                )
            return cls.from_dict(schema)
        except Exception as e:
            raise DataFrameSchemaError(f"Error loading schema from file {path}") from e

    def to_file(self, path: Union[str, Path]) -> None:
        """write chema to file

        Supports json and yaml.

        Args:
            path (str, Path): path to write file to.
        Returns:
            None
        """
        if isinstance(path, str):
            path = Path(path)

        try:
            schema_dict = self.dict(exclude_none=True)
            if path.suffix == ".json":
                with path.open("w") as f:
                    json.dump(schema_dict, f, indent=4)
            elif path.suffix in (".yml", ".yaml"):
                try:
                    import yaml

                    with path.open("w") as f:
                        yaml.dump(schema_dict, f)
                except ImportError:
                    raise ImportError("PyYaml is required to load yaml files")
            else:
                raise ValueError(
                    f"Unsupported file extension: {path.suffix}, should be one of .json or .yml"
                )

        except Exception as e:
            raise DataFrameSchemaError(f"Error wriging schema to file {path}") from e

    @classmethod
    def from_dict(
        cls,
        dict_: dict,
    ) -> "DfSchema":
        """create DfSchema from dict.

        same as `DfSchema(**dict_)`, but will also migrate old protocol schemas if necessary.

        Args:
            dict_ (dict): dictionary to generate DfSchema from
        Returns:
            DfSchema: instance of DfSchema
        """

        pv = infer_protocol_version(dict_)
        if pv == CURRENT_PROTOCOL_VERSION:
            return cls(**dict_)
        else:
            while pv < CURRENT_PROTOCOL_VERSION:
                dict_, pv = LegacySchemaRegistry[pv](**dict_).migrate()
            return cls(**dict_)

    @classmethod
    def from_df(
        cls,
        df: pd.DataFrame,
        subset_predicates: Optional[List[dict]] = None,
        return_dict: bool = False,
    ) -> Union["DfSchema", dict]:
        """generate DfSchema object from given dataframe.

        By default will generate strict schema that given dataframe should match.
        Do not expect it to generate good schema, rather a scaffolding to build
        one manually from.

        Note: this is a class method, not an instance method.

        Args:
            df (pd.DataFrame): dataframe to generate from
            subset_predicates (List[dict]): Optional list of dictionary predicates to generate subsets from
            return_dict (bool): wether return a dictionary instead of DfSchema instance (mostly for debugging purposes)
        Return:
            Union[DfSchema, dict]: either an instance of a class, or a dictionary
        """

        schema = generate_schema_dict_from_df(df)
        subset_schemas = []
        if subset_predicates:
            for predicate in subset_predicates:
                filtered = SubsetSchema.filter_df(df, predicate)

                subset_schema = generate_schema_dict_from_df(filtered)
                subset_schema["predicate"] = predicate
                subset_schemas.append(subset_schema)

            schema["subsets"] = [subset_schemas]

        if return_dict:
            return schema

        return cls(**schema)

from_df(df, subset_predicates=None, return_dict=False) classmethod

generate DfSchema object from given dataframe.

By default will generate strict schema that given dataframe should match. Do not expect it to generate good schema, rather a scaffolding to build one manually from.

Note: this is a class method, not an instance method.

Parameters:

Name Type Description Default
df pd.DataFrame

dataframe to generate from

required
subset_predicates List[dict]

Optional list of dictionary predicates to generate subsets from

None
return_dict bool

wether return a dictionary instead of DfSchema instance (mostly for debugging purposes)

False
Return

Union[DfSchema, dict]: either an instance of a class, or a dictionary

Source code in dfschema/core/core.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
@classmethod
def from_df(
    cls,
    df: pd.DataFrame,
    subset_predicates: Optional[List[dict]] = None,
    return_dict: bool = False,
) -> Union["DfSchema", dict]:
    """generate DfSchema object from given dataframe.

    By default will generate strict schema that given dataframe should match.
    Do not expect it to generate good schema, rather a scaffolding to build
    one manually from.

    Note: this is a class method, not an instance method.

    Args:
        df (pd.DataFrame): dataframe to generate from
        subset_predicates (List[dict]): Optional list of dictionary predicates to generate subsets from
        return_dict (bool): wether return a dictionary instead of DfSchema instance (mostly for debugging purposes)
    Return:
        Union[DfSchema, dict]: either an instance of a class, or a dictionary
    """

    schema = generate_schema_dict_from_df(df)
    subset_schemas = []
    if subset_predicates:
        for predicate in subset_predicates:
            filtered = SubsetSchema.filter_df(df, predicate)

            subset_schema = generate_schema_dict_from_df(filtered)
            subset_schema["predicate"] = predicate
            subset_schemas.append(subset_schema)

        schema["subsets"] = [subset_schemas]

    if return_dict:
        return schema

    return cls(**schema)

from_dict(dict_) classmethod

create DfSchema from dict.

same as DfSchema(**dict_), but will also migrate old protocol schemas if necessary.

Parameters:

Name Type Description Default
dict_ dict

dictionary to generate DfSchema from

required

Returns:

Name Type Description
DfSchema DfSchema

instance of DfSchema

Source code in dfschema/core/core.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@classmethod
def from_dict(
    cls,
    dict_: dict,
) -> "DfSchema":
    """create DfSchema from dict.

    same as `DfSchema(**dict_)`, but will also migrate old protocol schemas if necessary.

    Args:
        dict_ (dict): dictionary to generate DfSchema from
    Returns:
        DfSchema: instance of DfSchema
    """

    pv = infer_protocol_version(dict_)
    if pv == CURRENT_PROTOCOL_VERSION:
        return cls(**dict_)
    else:
        while pv < CURRENT_PROTOCOL_VERSION:
            dict_, pv = LegacySchemaRegistry[pv](**dict_).migrate()
        return cls(**dict_)

from_file(path) classmethod

create DfSchema from file

Method supports json and yaml formats Note: this is a class method, not instance method. PyYaml package is required to read yaml.

Parameters:

Name Type Description Default
path str or Path

path to the file, either json or yaml"

required

Returns:

Name Type Description
DfSchema DfSchema

DfSchema object instance

Source code in dfschema/core/core.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@classmethod
def from_file(cls, path: Union[str, Path]) -> "DfSchema":
    """create DfSchema from file

    Method supports json and yaml formats
    Note: this is a class method, not instance method.
    PyYaml package is required to read yaml.

    Args:
        path (str or Path): path to the file, either json or yaml"
    Returns:
        DfSchema: DfSchema object instance
    """

    if isinstance(path, str):
        path = Path(path)

    try:
        if path.suffix == ".json":
            with path.open("r") as f:
                schema = json.load(f)
        elif path.suffix in (".yml", ".yaml"):
            try:
                import yaml

                with path.open("r") as f:
                    schema = yaml.safe_load(f)
            except ImportError:
                raise ImportError("PyYaml is required to load yaml files")
        else:
            raise ValueError(
                f"Unsupported file extension: {path.suffix}, should be one of .json or .yml"
            )
        return cls.from_dict(schema)
    except Exception as e:
        raise DataFrameSchemaError(f"Error loading schema from file {path}") from e

to_file(path)

write chema to file

Supports json and yaml.

Parameters:

Name Type Description Default
path str, Path

path to write file to.

required

Returns:

Type Description
None

None

Source code in dfschema/core/core.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def to_file(self, path: Union[str, Path]) -> None:
    """write chema to file

    Supports json and yaml.

    Args:
        path (str, Path): path to write file to.
    Returns:
        None
    """
    if isinstance(path, str):
        path = Path(path)

    try:
        schema_dict = self.dict(exclude_none=True)
        if path.suffix == ".json":
            with path.open("w") as f:
                json.dump(schema_dict, f, indent=4)
        elif path.suffix in (".yml", ".yaml"):
            try:
                import yaml

                with path.open("w") as f:
                    yaml.dump(schema_dict, f)
            except ImportError:
                raise ImportError("PyYaml is required to load yaml files")
        else:
            raise ValueError(
                f"Unsupported file extension: {path.suffix}, should be one of .json or .yml"
            )

    except Exception as e:
        raise DataFrameSchemaError(f"Error wriging schema to file {path}") from e

validate_df(df, summary=True)

validate Dataframe aganist this schema

validate dataframe agains the schema as a dictionary. will raise either DataFrameSummaryError (if summary=True) or DataFrameValidationError for specific problem (if summary=False)

Example
import pandas as pd
from dfschema import DfSchema

path = '/schema.json'

df = pd.DataFrame({'a':[1,2], 'b':[3,4]})
dfs.DfSchema.from_file(path).validate(df)

Parameters:

Name Type Description Default
df pd.DataFrame

A dataframe to validate

required
summary bool

if False, raise exception on first violation (faster), otherwise will collect all violations and raise summary exception (slower)

True
Source code in dfschema/core/core.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def validate_df(self, df: pd.DataFrame, summary: bool = True) -> None:
    """validate Dataframe aganist this schema

    validate dataframe agains the schema as a dictionary. will raise
    either DataFrameSummaryError (if summary=True) or DataFrameValidationError for specific
    problem (if summary=False)

    ### Example
    ```python
    import pandas as pd
    from dfschema import DfSchema

    path = '/schema.json'

    df = pd.DataFrame({'a':[1,2], 'b':[3,4]})
    dfs.DfSchema.from_file(path).validate(df)
    ```

    Args:
        df (pd.DataFrame): A dataframe to validate
        summary (bool): if `False`, raise exception on first violation (faster), otherwise will collect all violations and raise summary exception (slower)


    """
    self._exception_pool = []
    self._summary = summary

    if not isinstance(df, pd.DataFrame):
        raise DataFrameSchemaError(
            f"Data should be `pd.DataFrame`, got `{type(df)}`"
        )

    if self.shape:
        self.shape.validate_df(df, root=self)

    if self.columns:
        self.validate_column_presence(df)

        for col in (col for col in self.columns if col.name in df.columns):
            col.validate_column(df[col.name], root=self)

    if self.subsets:
        for subset in self.subsets:
            subset.validate_df(df=df, root=self)

    if len(self._exception_pool) > 0:
        error = self._summary_error()
        raise error

validate_sql(sql, con, read_sql_kwargs=None, summary=True)

validate SQL table. Relies on pandas.read_sql to infer datatypes

Right now does not support sampling, but this could be added in the future

Parameters:

Name Type Description Default
sql str

SQL statement (query) to run

required
con sqlalchemy.connection

connection to the database

required
read_sql_kwargs dict

Optional set of params to pass to pd.read_sql_kwargs

None
summary bool

if False, raise exception on first violation (faster), otherwise will collect all violations and raise summary exception (slower)

True

Returns:

Type Description
None

None

Source code in dfschema/core/core.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def validate_sql(
    self,
    sql: str,
    con,
    read_sql_kwargs: Optional[dict] = None,
    summary: bool = True,
) -> None:
    """validate SQL table. Relies on `pandas.read_sql` to infer datatypes

    Right now does not support sampling, but this could be added in the future

    Args:
        sql (str): SQL statement (query) to run
        con (sqlalchemy.connection): connection to the database
        read_sql_kwargs (dict): Optional set of params to pass to `pd.read_sql_kwargs`
        summary (bool): if `False`, raise exception on first violation (faster), otherwise will collect all violations and raise summary exception (slower)
    Returns:
        None
    """
    df = pd.read_sql(sql, con, **(read_sql_kwargs or {}))
    self.validate_df(df, summary=summary)

SubsetSchema

Bases: BaseModel

Subset is almost identical to DfSchema, except it is assumed to run validation on a SUBSET of the dataframe. It also has a predicate attribute that defines way to retrieve this subset from the root dataframe.

Also it raises SubsetSummaryError instead of DataFrameSummaryError

Source code in dfschema/core/core.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
class SubsetSchema(BaseModel, extra=Extra.forbid, arbitrary_types_allowed=True):  # type: ignore
    """
    Subset is almost identical to DfSchema,
    except it is assumed to run validation on a SUBSET of the dataframe.
    It also has a `predicate` attribute that defines way to retrieve this subset from
    the root dataframe.

    Also it raises `SubsetSummaryError` instead of `DataFrameSummaryError`

    """

    _predicate_description = """
    predicate to select subset.
    - If string, will be interpreted as query for `df.query()`.
    - If dict, keys should be column names, values should be values to exactly match"""
    predicate: Union[
        dict,
        str,
    ] = Field(..., description=_predicate_description)

    shape: Optional[ShapeSchema] = Field(None, description="shape expectations")
    columns: Optional[List[ColSchema]] = Field([], description="columns expectations")

    additionalColumns: bool = Field(
        True,
        description="if true, Will allow any additional columns not defined in the schema",
    )

    exactColumnOrder: bool = Field(
        False,
        description="if true, will require order of columns to exactly match column order in schema",
    )

    _exception_pool: List[Exception] = PrivateAttr([])
    _summary: bool = PrivateAttr()

    def _summary_error(self, df_shape: tuple) -> SubsetSummaryError:
        error_list = "\n".join([f"- {e.args[0]}" for e in self._exception_pool])

        return SubsetSummaryError(
            f"Subset({self.predicate}, shape:{df_shape}) validation failed:\n{error_list}"
        )

    @staticmethod
    def _filter(df: pd.DataFrame, predicate) -> pd.DataFrame:
        """filter dataframe by predicate"""

        if isinstance(predicate, str):
            return df.query(predicate)

        elif isinstance(predicate, dict):
            mask = pd.Series(True, index=df.index)

            for k, v in predicate.items():
                mask &= df[k] == v
            return df[mask]
        else:
            raise ValueError(f"Unsupported predicate type: {type(predicate)}")

    def validate_column_presence_and_order(self, df: pd.DataFrame) -> None:
        schema_col_names = tuple(col.name for col in self.columns)  # type: ignore

        _validate_column_presence(
            df,
            schema_col_names,
            additionalColumns=self.additionalColumns,
            exactColumnOrder=self.exactColumnOrder,
            root=self,
        )

    def validate_df(self, df: pd.DataFrame, root: DfSchema) -> None:
        """validate Dataframe aganist this schema

        validate dataframe agains the schema as a dictionary. will raise
        either SubsetSummaryError or DataFrameValidationError for specific
        problem

        Args:
            df (pd.DataFrame): A dataframe to validate
        Returns:
            None
        """
        self._exception_pool = []
        self._summary = root._summary

        filtered_df = self._filter(df, self.predicate)

        if self.shape:
            self.shape.validate_df(filtered_df, root=self)

        if self.columns:
            self.validate_column_presence_and_order(filtered_df)

            for col in (col for col in self.columns if col.name in df.columns):
                col.validate_column(filtered_df[col.name], root=self)

        if self._exception_pool:
            summary_ = self._summary_error(filtered_df.shape)
            root._exception_pool.append(summary_)

    @classmethod
    def from_df(
        cls,
        df: pd.DataFrame,
        predicate: Union[dict, str, Callable],
        return_dict: bool = False,
    ) -> Union["SubsetSchema", dict]:
        """generate SubsetSchema object from given dataframe and a predicate

        By default will generate strict schema that given dataframe should match.
        Do not expect it to generate good schema, rather a scaffolding to build
        one manually from.

        Note: this is a class method, not an instance method.

        Args:
            df (pd.DataFrame): dataframe to generate from
            predicate (dict, str, Callable): Predicate to filter by. If string, will use it as an argument to `df.query`.\nIf callable, assumes it to be a function that returns a subset if given a dataframe.\nIf dictionary, will assume keys to be columns and values - sets of possible values.
            return_dict (bool): wether return a dictionary instead of SubsetSchema instance (mostly for debugging purposes)
        Return:
            Union[SubsetSchema, dict]: either an instance of a class, or a dictionary
        """
        filtered_df = cls._filter(df, predicate=predicate)

        schema = generate_schema_dict_from_df(filtered_df)
        schema["predicate"] = predicate

        if return_dict:
            return schema

        return cls(**schema)

from_df(df, predicate, return_dict=False) classmethod

generate SubsetSchema object from given dataframe and a predicate

    By default will generate strict schema that given dataframe should match.
    Do not expect it to generate good schema, rather a scaffolding to build
    one manually from.

    Note: this is a class method, not an instance method.

    Args:
        df (pd.DataFrame): dataframe to generate from
        predicate (dict, str, Callable): Predicate to filter by. If string, will use it as an argument to `df.query`.

If callable, assumes it to be a function that returns a subset if given a dataframe. If dictionary, will assume keys to be columns and values - sets of possible values. return_dict (bool): wether return a dictionary instead of SubsetSchema instance (mostly for debugging purposes) Return: Union[SubsetSchema, dict]: either an instance of a class, or a dictionary

Source code in dfschema/core/core.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
@classmethod
def from_df(
    cls,
    df: pd.DataFrame,
    predicate: Union[dict, str, Callable],
    return_dict: bool = False,
) -> Union["SubsetSchema", dict]:
    """generate SubsetSchema object from given dataframe and a predicate

    By default will generate strict schema that given dataframe should match.
    Do not expect it to generate good schema, rather a scaffolding to build
    one manually from.

    Note: this is a class method, not an instance method.

    Args:
        df (pd.DataFrame): dataframe to generate from
        predicate (dict, str, Callable): Predicate to filter by. If string, will use it as an argument to `df.query`.\nIf callable, assumes it to be a function that returns a subset if given a dataframe.\nIf dictionary, will assume keys to be columns and values - sets of possible values.
        return_dict (bool): wether return a dictionary instead of SubsetSchema instance (mostly for debugging purposes)
    Return:
        Union[SubsetSchema, dict]: either an instance of a class, or a dictionary
    """
    filtered_df = cls._filter(df, predicate=predicate)

    schema = generate_schema_dict_from_df(filtered_df)
    schema["predicate"] = predicate

    if return_dict:
        return schema

    return cls(**schema)

validate_df(df, root)

validate Dataframe aganist this schema

validate dataframe agains the schema as a dictionary. will raise either SubsetSummaryError or DataFrameValidationError for specific problem

Parameters:

Name Type Description Default
df pd.DataFrame

A dataframe to validate

required

Returns:

Type Description
None

None

Source code in dfschema/core/core.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def validate_df(self, df: pd.DataFrame, root: DfSchema) -> None:
    """validate Dataframe aganist this schema

    validate dataframe agains the schema as a dictionary. will raise
    either SubsetSummaryError or DataFrameValidationError for specific
    problem

    Args:
        df (pd.DataFrame): A dataframe to validate
    Returns:
        None
    """
    self._exception_pool = []
    self._summary = root._summary

    filtered_df = self._filter(df, self.predicate)

    if self.shape:
        self.shape.validate_df(filtered_df, root=self)

    if self.columns:
        self.validate_column_presence_and_order(filtered_df)

        for col in (col for col in self.columns if col.name in df.columns):
            col.validate_column(filtered_df[col.name], root=self)

    if self._exception_pool:
        summary_ = self._summary_error(filtered_df.shape)
        root._exception_pool.append(summary_)