top of page
Search

Common issues in working with Avro files in Python

Apache Avro is a data serialization format that is commonly used to store files on disk. It is typically used to store files that will be accessed from Spark but Spark is completely independent of Avro. Avro is a row-based format that has several characteristics that make it suitable for evolving data schemas. One benefit of using Avro is that schema and metadata travels with the data. If you have an .avro file, you have the schema of the data as well. The Avro Specification provides easy-to-read and have very detailed information.


Unfortunately using Avro from Python tends to be unnecessarily error-prone, especially for a beginner. In this post, we will describe the common errors that beginners run into and their solutions.


Common issues with Official Avro Packages


Python 2 vs Python 3


Sadly, Python 2 has come to the end of its useful life. You should not be writing new applications in Python. However, the official Avro Getting Started (Python) Guide is only written for Python 2 and will fail with Python 3. The problem goes deeper than merely outdated official documentation.

There are two official python packages for handling Avro, one for Python 2 and one for Python 3. The packages have different names, which is unusual for the python ecosystem.

The first problem that beginners usually hit is that they install the avro package in a Python 3 virtual environment. This, of course, fails.

# Inside a fresh Python 3 virtual environment 
python  --version 
# Python 3.7.3 
# You can successfully install `avro` in a Python 3 virtualenv even though `avro` is not # compatible with Python 3. 
pip install avro  
# Fails when you try to use it! 
python -c "import avro.schema" 
# Traceback (most recent call last): 
#   File "<string>", line 1, in <module> 
#   File "/Users/ankur/.virtualenvs/python3-test-env/lib/python3.7/site-packages/avro/schema.py", line 383 
#     except Exception, e: 
#                     ^ 
# SyntaxError: invalid syntax 

Thankfully, the reverse problem is very unlikely. You should not be able to install avro-python3, which is intended for Python 3, within a Python 2 (virtual) environment, by default.


# Inside a fresh Python 2 virtual environment 
python  --version 
# Python 2.7.16 

# Fails at the installation step itself, thankfully! 
pip install avro-python3 
# DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 won't be maintained after that date. A future version of pip will drop support for Python 2.7. More details about Python 2 support in pip, can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support 
# Collecting avro-python3 
#   Downloading https://files.pythonhosted.org/packages/d1/55/4c2e6fecf06cbaa68e0abaf12e1e965969872ed16da3674e6245cab0d5e2/avro-python3-1.9.0.tar.gz 
# ERROR: Package 'avro-python3' requires a different Python: 2.7.16 not in '>=3.4' 

Even if you install the correct Avro package for your Python environment, the API differs between avro and avro-python3. As an example, for Python 2 (with avro package), you need to use the function avro.schema.parse but for Python 3 (with avro-python3 package), you need to use the function avro.schema.Parse.

While the difference in API does somewhat justify having different package names, this still causes unnecessary confusion. The confusion is exacerbated because the official guide, which still uses Python 2, never mentions that the instructions are only applicable to Python 2.

In the rest of this post, we will only use Python 3 with avro-python3 package because Python 2 is EOL.


Working Example

This is an example usage of avro-python3 in a Python 3 environment.

# Python 3 with `avro-python3` package available
import copy
import json
import avro
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader

# Note that we combined namespace and name to get "full name"
schema = {
    'name': 'avro.example.User',
    'type': 'record',
    'fields': [
        {'name': 'name', 'type': 'string'},
        {'name': 'age', 'type': 'int'}
    ]
}

# Parse the schema so we can use it to write the data
schema_parsed = avro.schema.Parse(json.dumps(schema))

# Write data to an avro file
with open('users.avro', 'wb') as f:
    writer = DataFileWriter(f, DatumWriter(), schema_parsed)
    writer.append({'name': 'Pierre-Simon Laplace', 'age': 77})
    writer.append({'name': 'John von Neumann', 'age': 53})
    writer.close()

# Read data from an avro file
with open('users.avro', 'rb') as f:
    reader = DataFileReader(f, DatumReader())
    metadata = copy.deepcopy(reader.meta)
    schema_from_file = json.loads(metadata['avro.schema'])
    users = [user for user in reader]
    reader.close()

print(f'Schema that we specified:\n {schema}')
print(f'Schema that we parsed:\n {schema_parsed}')
print(f'Schema from users.avro file:\n {schema_from_file}')
print(f'Users:\n {users}')

# Schema that we specified:
#  {'name': 'avro.example.User', 'type': 'record',
#   'fields': [{'name': 'name', 'type': 'string'}, {'name': 'age', 'type': 'int'}]}
# Schema that we parsed:
#  {"type": "record", "name": "User", "namespace": "avro.example",
#   "fields": [{"type": "string", "name": "name"}, {"type": "int", "name": "age"}]}
# Schema from users.avro file:
#  {'type': 'record', 'name': 'User', 'namespace': 'avro.example',
#   'fields': [{'type': 'string', 'name': 'name'}, {'type': 'int', 'name': 'age'}]}
# Users:
#  [{'name': 'Pierre-Simon Laplace', 'age': 77}, {'name': 'John von Neumann', 'age': 53}]


Issues with name, namespace and full name


An interesting thing to note is what happens with the name and namespace fields. The schema we specified has the full name of the schema that has both the name and namespace combined (i.e., 'name': 'avro.example.User'). However, after parsing with avro.schema.Parse(), the name and namespace are separated into individual fields. Further, when we read back the schema from the users.avro file, we also get the name and namespace separated into individual fields.


Avro specification, for some reason, uses the name field for both the full name and the partial name. In other words, the name field can either contain the full name or only the partial name. Ideally, Avro specification should have kept partial_name, namespace, and full_name as separate fields.

This behind-the-scene separation and in-place modification may cause unexpected errors if your code depends on the exact value of name. One common use case is when you’re handling lots of different schemas and you want to identify/index/search by the schema name.


A best practice to guard against possible name errors is to always parse a dict schema into a avro.schema.RecordSchema using avro.schema.Parse(). This will generate the namespace, fullname, and simple_name (partial name), which you can then use with peace of mind.


print(type(schema_parsed)) # <class 'avro.schema.RecordSchema'> print(schema_parsed.avro_name.fullname) # avro.example.User print(schema_parsed.avro_name.simple_name) # User print(schema_parsed.avro_name.namespace) # avro.example 

This problem of name and namespace deepens when we use a third-party package called fastavro, as we will see in the next section.


Common Issues with Third-party Avro Packages


fastavro


While avro-python3 is the official Avro package, it appears to be relatively slow. This is because it is written purely in python. In comparison, fastavro uses C extensions (with regular Cython) making it much faster. Another benefit of using fastavro is that you can install it the same way in both Python 2 and Python 3. The fastavro API is also the same for both Python 2 and 3.

We will use fastavro 0.22.7 for the following discussion. First, let’s see how to use the fastavro.parse_schema(). Unlike avro.schema.Parse(), fastavro.parse_schema() reads in a schema dictionary and outputs another schema dictionary.

import fastavro

# Namespace and name are combined to get "full name"
schema_together = {
    'name': 'avro.example.User',
    'type': 'record',
    'fields': [
        {'name': 'name', 'type': 'string'},
        {'name': 'age', 'type': 'int'}
    ]
}

# Namespace and name are separate
schema_separated = {
    'name': 'User',
    'namespace': 'avro.example',
    'type': 'record',
    'fields': [
        {'name': 'name', 'type': 'string'},
        {'name': 'age', 'type': 'int'}
    ]
}

# fastavro.parse_schema() accepts schema as a dict and returns parsed schema as another dict.
# The parsed schema combines name and namespace into "full name".
schema_together_parsed = fastavro.parse_schema(schema_together)
schema_separated_parsed = fastavro.parse_schema(schema_separated)
print(schema_separated_parsed == schema_together_parsed)
# True
print(schema_separated_parsed)
# {'type': 'record', 'name': 'avro.example.User',
# 'fields': [{'name': 'name', 'type': 'string'},
#            {'name': 'age', 'type': 'int'}],
# '__fastavro_parsed': True}

Parsing a schema dict is not really necessary to write data to disk. However, it does provide two benefits:

  1. It helps us verify that the input schema dict is indeed valid

  2. We get back a canonicalized (as per fastavro) the schema dict as a result

There is, however, one side effect. The parsed schema combines the name and namespace into full name and then stores the full name in the name field. This is the opposite behavior of avro-python3 and just like with avro-python3, this behind-the-scene, in-place modification can cause unexpected errors. Finally, the schema that gets written to disk is whatever schema dict we pass to the fastavro.writer().


# Continued from above

# User data to store.
users = [{'name': 'Pierre-Simon Laplace', 'age': 77},
         {'name': 'John von Neumann', 'age': 53}]

# Experiment 1: Write data using the schema with `name` and `namespace` combined.
with open('users.avro', 'wb') as f:
    fastavro.writer(f, schema_separated_parsed, users)

with open('users.avro', 'rb') as f:
    reader = fastavro.reader(f)
    users_read_back = [user for user in reader]
    metadata = copy.deepcopy(reader.metadata)
    writer_schema = copy.deepcopy(reader.writer_schema)
    schema_from_file = json.loads(metadata['avro.schema'])

print(writer_schema == schema_from_file)
# True
print(schema_from_file)
# {'type': 'record', 'name': 'avro.example.User',
#  'fields': [{'name': 'name', 'type': 'string'}, {'name': 'age', 'type': 'int'}]}
print(writer_schema)
# {'type': 'record', 'name': 'avro.example.User',
#  'fields': [{'name': 'name', 'type': 'string'}, {'name': 'age', 'type': 'int'}]}

fastavro seems to provide two fields that contain the schema: reader.writer_schema and reader.metadata. Metadata (reader.metadata) includes the schema as reader.metadata['avro.schema']. In the above experiment, both these sources of schema provide the exact same schema that has name and namespace combined into a full name. But, this is not always the case, as we will see in the next experiment.

# Continued from above

# Experiment 2: Write data using the schema that has `name` and `namespace` separate.
# Use the unparsed schema that has name and namespace separate
with open('users.avro', 'wb') as f:
    fastavro.writer(f, schema_separated, users)

with open('users.avro', 'rb') as f:
    reader = fastavro.reader(f)
    users_read_back = [user for user in reader]
    metadata = copy.deepcopy(reader.metadata)
    writer_schema = copy.deepcopy(reader.writer_schema)
    schema_from_file = json.loads(metadata['avro.schema'])

print(writer_schema == schema_from_file)
# False
print(schema_from_file)
# {'name': 'User', 'namespace': 'avro.example', 'type': 'record',
#  'fields': [{'name': 'name', 'type': 'string'}, {'name': 'age', 'type': 'int'}]}
print(writer_schema)
# {'type': 'record', 'name': 'avro.example.User',
#  'fields': [{'name': 'name', 'type': 'string'}, {'name': 'age', 'type': 'int'}]}

The above experiment shows that reader.writer_schema and reader.metadata['avro.schema'] differ in whether or not name and namespace are combined together. In both experiments, reader.metadata['avro.schema'] is more faithful to the schema dict we used to actually write the data. Therefore, it’s a good practice to use reader.metadata['avro.schema'] instead of reader.writer_schema to retrieve the schema.


avro-python3 vs fastavro


As we saw, avro-python3 and fastavro have opposite behaviors when it comes to handling name, namespace, and full name.

The handling of name and namespace may not bother you at all or you may find the above difference in behavior unimportant. However, if your code expects either exactly the partial name or exactly the full name, then you will likely encounter errors. One common error is when you prefix the namespace to an already fully qualified name in the name field. You really need to exercise care when handling names and namespaces depending on the package you’re using.


Avro format is not a Dataframe


As we have seen above, Avro format simply requires a schema and a list of records. We don’t need a dataframe to handle Avro files. However, we can write a pandas dataframe into an Avro file or read an Avro file into a pandas dataframe. To begin with, we can always represent a dataframe as a list of records and vice-versa

  1. List of records – pandas.DataFrame.from_records() –> Dataframe

  2. List of records <– pandas.DataFrame.to_dict(orient='records') – Dataframe

Using the two functions above in conjunction with avro-python3 or fastavro, we can read/write dataframes as Avro. The only additional work we would need to do is to convert back and forth between pandas data types and Avro schema types ourselves.


An alternative solution is to use a third-party package called pandavro, which does some of this inter-conversion for us.

import copy
import json
import pandas as pd
import pandavro as pdx
from avro.datafile import DataFileReader
from avro.io import DatumReader

# Data to be saved
users = [{'name': 'Pierre-Simon Laplace', 'age': 77},
         {'name': 'John von Neumann', 'age': 53}]
users_df = pd.DataFrame.from_records(users)
print(users_df)

# Save the data without any schema
pdx.to_avro('users.avro', users_df)

# Read the data back
users_df_redux = pdx.from_avro('users.avro')
print(type(users_df_redux))
# <class 'pandas.core.frame.DataFrame'>

# Check the schema for "users.avro"
with open('users.avro', 'rb') as f:
    reader = DataFileReader(f, DatumReader())
    metadata = copy.deepcopy(reader.meta)
    schema_from_file = json.loads(metadata['avro.schema'])
    reader.close()
print(schema_from_file)
# {'type': 'record', 'name': 'Root',
#  'fields': [{'name': 'name', 'type': ['null', 'string']},
#             {'name': 'age', 'type': ['null', 'long']}]}

In the above example, we didn’t specify a schema ourselves and pandavro assigned the name 'Root' to the schema. We can also provide a schema dict to pandavro.to_avro() function, which will preserve the name and namespace faithfully.


Avro with PySpark


Using Avro with PySpark is currently fraught with an unfortunate list of issues. Let’s see the common issues step-by-step.


Confusing official guide

The official Spark documentation on Avro contains two seemingly contradictory claims. On one hand, the official documentation says

Since Spark 2.4 release, Spark SQL provides built-in support for reading and writing Apache Avro data.

Then, in the next line, it says

The spark-avro module is external and not included in spark-submit or spark-shell by default.

Perhaps, there is sufficient technical difference between the two claims to make them consistent with each other. But, the wording is a bit misleading and confusing. In practical usage, only the second claim is true – you need to provide the spark-avropackage to Spark. You can do this by providing the Maven coordinates in the form groupId:artifactId:versionas follows:


# Example 1 
$SPARK_INSTALLATION/bin/pyspark --packages org.apache.spark:spark-avro_2.12:2.4.4  

# Example 2 
$SPARK_INSTALLATION/bin/pyspark --packages com.databricks:spark-avro_2.11:4.0.0 

You can go to the The Central Repository Search Engine or Maven Repository (recommended) to find the versions. If you provide a Maven coordinate that doesn’t exist on Maven, you will get a dependency error.


You may need to clear the cache in $HOME/.ivy2 to overcome some unknown resolver null issues, as mentioned here and here. You can delete $HOME/.ivy2 folder completely to clear the cache but be aware that you will also delete all other downloaded/installed dependencies if you do so.


spark-avro: Databricks or Apache


The reason why we show two examples in the above snippet is because there are at least two common instances of the spark-avro package currently in use in applications. It appears that the original spark-avro package was written by Databricks and then donated to Apache Spark project. Spark 2.4.0 included support for “built-in” for Avro and updated the spark-avro package to have new functionality and better performance while still retaining backward API compatibility with the older Databricks’ version of spark-avro.


Both versions of spark-avro are available to use. If you’re on Spark 2.4.0 or higher, you should use Apache Spark’s spark-avro. If you’re on Spark 2.4.0 or lower, you need to use the Databricks version.


There is still one minor change you need to make to your code to switch between the Databricks (older) and Apache Spark (newer) versions.


# For Spark 2.4.0 and higher, use Apache Spark's version of spark-avro
df = spark.read.format('avro').load('path/to/avro/data')

# For lower than Spark 2.4.0, use Databricks's version of spark-avro
df = spark.read.format('com.databricks.spark.avro').load('path/to/avro/data')

If you don’t use the correct string in format(), you may see an error like this.


AnalysisException: 'Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;' 

Obviously, you also need to provide the corresponding spark-avro to Spark. As we saw, we can simply provide the correct Maven coordinates to the intended spark-avro package. But, there is one more glitch – even if we provide valid Maven coordinates to spark-avro package that installs successfully, we may see an error. Let’s see this in the next section.


Scala version is important


Note the Scala version in the Maven coordinates. In org.apache.spark:spark-avro_2.12:2.4.4, the Scala version is 2.12 and in com.databricks:spark-avro_2.11:4.0.0 the Scala version is 2.11. If you don’t use the correct Scala version, you will find that the spark-avro package installs correctly and the pyspark shell starts successfully but reading Avro data fails.

# Run pyspark shell with Apache Spark's spark-avro package as mentioned in the official docs
$ $SPARK_INSTALLATION/bin/pyspark --packages org.apache.spark:spark-avro_2.12:2.4.4
# Everything successful!

# Inside the resulting pyspark shell
>>> df = spark.read.format("avro").load("users.avro")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  ...
py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister:
Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    ...
# Failure only when you read the Avro data!

It turns out that the pyspark in the above example was built against Scala 2.11, as shown below. However, we provided a spark-avro package that was built for Scala 2.12.

$ $SPARK_INSTALLATION/bin/pyspark --version 
Welcome to       ____              __      / __/__  ___ _____/ /__     _\ \/ _ \/ _ `/ __/  '_/    /___/ .__/\_,_/_/ /_/\_\   version 2.4.4       /_/  

Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_221 
Branch 
Compiled by user  on 2019-08-27T21:21:38Z 
Revision 
Url 
Type --help for more information. 

This issue is slightly annoying because the user is able to install spark-avro for the wrong Scala version without any indication of error, only to fail at the last moment. Once you know about this issue, it can be easily fixed by simply using the correct spark-avro package for your pyspark’s Scala version.

# Run pyspark shell with the correct Scala version for spark-avro
$ $SPARK_INSTALLATION/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4

# Inside the resulting pyspark shell
>>> df = spark.read.format("avro").load("users.avro")
>>> df.show()
+--------------------+---+
|                name|age|
+--------------------+---+
|Pierre-Simon Laplace| 77|
|    John von Neumann| 53|
+--------------------+---+

A working example, finally!


For this example, we will use Scala 2.11, Spark 2.4.4, and Apache Spark’s spark-avro 2.4.4 within a pysparkshell.

$ $SPARK_INSTALLATION/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4 

Within the pyspark shell, we can run the following code to write and read Avro.

# Data to store 
users = [{'name': 'Pierre-Simon Laplace', 'age': 77}, {'name': 'John von Neumann', 'age': 53}] 

# Create a pyspark dataframe 
users_df = spark.createDataFrame(users, 'name STRING, age INT') 

# Write to a folder named users users_df.write.format('avro').mode("overwrite").save('users-folder') 

# Read the data back 
users_df_redux = spark.read.format('avro').load('./users-folder') 

Conclusion

My initial experience in working with Avro format was error-prone at virtually every step of the way. The name and namespace ambiguity lies in the Avro specification itself. This is further exacerbated by the contrasting behavior of the two most common Avro packages for python: avro-python3 and fastavro. When trying to use the official Avro package for python, the package name and API differences between Python 2 and Python 3 create unnecessary confusion. This makes it difficult to port code over from Python 2. Though we should not be writing Python 2 code, the package name and API differences make it difficult to write code that is both Python 2 and Python 3 compatible.


Using Avro with PySpark comes with its own set of issues that present themselves unexpectedly. In contrast, using parquet, json, or csv with Spark is significantly easier. There is no need to install an external package to use these formats. In that sense, support for parquet, json, or csv is truly built-in.


Hopefully this post can save you some time in working with Avro from Python.


1,819 views0 comments

Comments


Post: Blog2_Post
bottom of page