Create Dask Bags¶
There are several ways to create Dask bags around your data:
db.from_sequence
¶
You can create a bag from an existing Python iterable:
>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])
You can control the number of partitions into which this data is binned:
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)
This controls the granularity of the parallelism that you expose. By default, Dask will try to partition your data into about 100 partitions.
IMPORTANT: do not load your data into Python and then load that data into a Dask bag. Instead, use Dask Bag to load your data. This parallelizes the loading step and reduces inter-worker communication:
>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)
db.read_text
¶
Dask Bag can load data directly from text files. You can pass either a single file name, a list of file names, or a globstring. The resulting bag will have one item per line and one file per partition:
>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')
This handles standard compression libraries like gzip
, bz2
, xz
, or
any easily installed compression library that has a file-like object.
Compression will be inferred by the file name extension, or by using the
compression='gzip'
keyword:
>>> b = db.read_text('myfile.*.txt.gz')
The resulting items in the bag are strings. If you have encoded data like line-delimited JSON, then you may want to map a decoding or load function across the bag:
>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)
Or do string munging tasks. For convenience, there is a string namespace
attached directly to bags with .str.methodname
:
>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')
db.read_avro
¶
Dask Bag can read binary files in the Avro format if fastavro is installed. A bag can be made from one or more files, with optional chunking within files. The resulting bag will have one item per Avro record, which will be a dictionary of the form given by the Avro schema. There will be at least one partition per input file:
>>> b = db.read_avro('datafile.avro')
>>> b = db.read_avro('data.*.avro')
By default, Dask will split data files into chunks of approximately blocksize
bytes in size. The actual blocks you would get depend on the internal blocking
of the file.
For files that are compressed after creation (this is not the same as the internal “codec” used by Avro), no chunking should be used, and there will be exactly one partition per file:
> b = bd.read_avro('compressed.*.avro.gz', blocksize=None, compression='gzip')
db.from_delayed
¶
You can construct a Dask bag from dask.delayed values using the
db.from_delayed
function. For more information, see
documentation on using dask.delayed with collections.
Store Dask Bags¶
In Memory¶
You can convert a Dask bag to a list or Python iterable by calling compute()
or by converting the object into a list:
>>> result = b.compute()
or
>>> result = list(b)
To Text Files¶
You can convert a Dask bag into a sequence of files on disk by calling the
.to_textfiles()
method:
To Avro¶
Dask bags can be written directly to Avro binary format using fastavro. One file
will be written per bag partition. This requires the user to provide a fully-specified
schema dictionary (see the docstring of the .to_avro()
method).
-
dask.bag.avro.
to_avro
(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)¶ Write bag to set of avro files
The schema is a complex dictionary describing the data, see https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema and https://fastavro.readthedocs.io/en/latest/writer.html . It’s structure is as follows:
{'name': 'Test', 'namespace': 'Test', 'doc': 'Descriptive text', 'type': 'record', 'fields': [ {'name': 'a', 'type': 'int'}, ]}
where the “name” field is required, but “namespace” and “doc” are optional descriptors; “type” must always be “record”. The list of fields should have an entry for every key of the input records, and the types are like the primitive, complex or logical types of the Avro spec ( https://avro.apache.org/docs/1.8.2/spec.html ).
Results in one avro file per input partition.
Parameters: b: dask.bag.Bag
filename: list of str or str
Filenames to write to. If a list, number must match the number of partitions. If a string, must include a glob character “*”, which will be expanded using name_function
schema: dict
Avro schema dictionary, see above
name_function: None or callable
Expands integers into strings, see
dask.bytes.utils.build_name_function
storage_options: None or dict
Extra key/value options to pass to the backend file-system
codec: ‘null’, ‘deflate’, or ‘snappy’
Compression algorithm
sync_interval: int
Number of records to include in each block within a file
metadata: None or dict
Included in the file header
compute: bool
If True, files are written immediately, and function blocks. If False, returns delayed objects, which can be computed by the user where convenient.
kwargs: passed to compute(), if compute=True
Examples
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'value': 100}, ... {'name': 'Bob', 'value': 200}]) >>> schema = {'name': 'People', 'doc': "Set of people's scores", ... 'type': 'record', ... 'fields': [ ... {'name': 'name', 'type': 'string'}, ... {'name': 'value', 'type': 'int'}]} >>> b.to_avro('my-data.*.avro', schema) # doctest: +SKIP ['my-data.0.avro', 'my-data.1.avro']
To DataFrames¶
You can convert a Dask bag into a Dask DataFrame and use those storage solutions.
To Delayed Values¶
You can convert a Dask bag into a list of Dask delayed values and custom storage solutions from there.