Google Cloud Dataflow Write To Csv From Dictionary
Solution 1:
Generally you will want to write a function that can convert your original dict
data elements into a csv-formatted string
representation.
That function can be written as a DoFn
that you can apply to your Beam PCollection
of data, which would convert each collection element into the desired format; you can do this by applying the DoFn
to your PCollection
via ParDo
. You can also wrap this DoFn
in a more user-friendly PTransform
.
You can learn more about this process in the Beam Programming Guide
Here is a simple, translatable non-Beam example:
# Our example list of dictionary elements
test_input = [{'label': 1, 'text': 'Here is a sentence'},
{'label': 2, 'text': 'Another sentence goes here'}]
defconvert_my_dict_to_csv_record(input_dict):
""" Turns dictionary values into a comma-separated value formatted string """return','.join(map(str, input_dict.values()))
# Our converted list of elements
converted_test_input = [convert_my_dict_to_csv_record(element) for element in test_input]
The converted_test_input
will look like the following:
['Here is a sentence,1', 'Another sentence goes here,2']
Beam DictToCSV DoFn and PTransform example using DictWriter
from csv import DictWriter
from csv import excel
from cStringIO import StringIO
...
def_dict_to_csv(element, column_order, missing_val='', discard_extras=True, dialect=excel):
""" Additional properties for delimiters, escape chars, etc via an instance of csv.Dialect
Note: This implementation does not support unicode
"""
buf = StringIO()
writer = DictWriter(buf,
fieldnames=column_order,
restval=missing_val,
extrasaction=('ignore'if discard_extras else'raise'),
dialect=dialect)
writer.writerow(element)
return buf.getvalue().rstrip(dialect.lineterminator)
class_DictToCSVFn(DoFn):
""" Converts a Dictionary to a CSV-formatted String
column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
missing_val: The value to be written when a named field from `column_order` is not found in the input element
discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect
"""def__init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
self._column_order = column_order
self._missing_val = missing_val
self._discard_extras = discard_extras
self._dialect = dialect
defprocess(self, element, *args, **kwargs):
result = _dict_to_csv(element,
column_order=self._column_order,
missing_val=self._missing_val,
discard_extras=self._discard_extras,
dialect=self._dialect)
return [result,]
classDictToCSV(PTransform):
""" Transforms a PCollection of Dictionaries to a PCollection of CSV-formatted Strings
column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
missing_val: The value to be written when a named field from `column_order` is not found in an input element
discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect
"""def__init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
self._column_order = column_order
self._missing_val = missing_val
self._discard_extras = discard_extras
self._dialect = dialect
defexpand(self, pcoll):
return pcoll | ParDo(_DictToCSVFn(column_order=self._column_order,
missing_val=self._missing_val,
discard_extras=self._discard_extras,
dialect=self._dialect)
)
To use the example, you would put your test_input
into a PCollection
, and apply the DictToCSV
PTransform
to the PCollection
; you can take the resulting converted PCollection
and use it as input for WriteToText
. Note that you must provide a list or tuple of column names, via the column_order
argument, corresponding to keys for your dictionary input elements; the resulting CSV-formatted string columns will be in the order of the column names provided. Also, the underlying implementation for the example does not support unicode
.
Solution 2:
Based on Andrew's suggestion, here is a ConvertDictToCSV function that I created:
defConvertDictToCSV(input_dict, fieldnames, separator=",", quotechar='"'):
value_list = []
for field in fieldnames:
if input_dict[field]:
field_value = str(input_dict[field])
else:
field_value = ""if separator in field_value:
field_value = quotechar + field_value + quotechar
value_list.append(field_value)
return separator.join(value_list)
This appears to be working well, but would certainly be safer to make use of csv.DictWriter if possible
Post a Comment for "Google Cloud Dataflow Write To Csv From Dictionary"