An example of using Spark Structured Streaming

This snippet will monitor two directories and join the data from them when there is a new CSV file in any directory.

The join operation is implemented by Spark SQL which is easy to use (for DBA), and also easy to maintain.

Some articles said if the Spark process restart after failed, the ‘checkpoint’ would help it to continue work from last uncompleted position. I tried it in my local computer, and noticed that it do make some duplicated rows after restart. This is a severe problem for production environment so I will check it in next testings.

A problem of using Pyspark SQL

Here is the code:

It will report error after running ‘cat xxx.py|bin/pyspark’:

I used to think it was because ‘2’ is a string, so I changed ‘row’ to be ‘[2, 29, 29, 29]’. But the error also changed to:

Then I searched on google, and find this article. Looks like I forgot to transfer ‘list’ of python to ‘RDD’ of Apache Spark.
But at last, I found the real reason: I just need to add ‘[]’ between my ‘list’!
The right code is here:

Some problems about using AWS DMS

AWS DMS is a new type of service used to migrate data from different types of database and data-warehouse. I met some problems when trying to use it in production environment.

Problem 1. When using a MySQL server of AWS RDS as the source of a replication task. It reported errors after started the task:

The failure message looks terrible. But at least I can find this doc to follow. After changed the configurations as below:

binlog_format ROW
binlog_checksum NONE
binlog_row_image FULL

the error still existed.
The real answer is in here since I used RDS instead of self-managed MySQL. After I add one line Terraform code to enable “automatic backups”:

the replication task began to work without the error.

Problem 2. Running replication task for a while to export data from MySQL to AWS Redshift. A new error log appeared in Redshift load logs:

Why masteruser is not authorized? The answer is here. Below is the Terraform code:

Then I had giiven “dms_assume_role” two Trusty Entities



Problem 3. There was still a error in Redshift load log (so many errors in AWS DMS…):

Error Type Raw Field Value
Invalid timestamp format or value [YYYY-MM-DD HH24:MI:SS] timestamp 0000-00-00 00:00:00

Seems the answer is here. Therefore I added “acceptanydate=true;timeformat=auto” into the “extra connection settings” in Redshift endpoint. But the error just changed to:

Error Type Raw Field Value
Invalid data timestamp 0000-00-00 00:00:00

After searching for almost two days, I found that the reason is in the schema of Redshift, which is automatically created by AWS DMS replication task.

Since the schema doesn’t allow “mydate” column to be null but the “acceptanydate=true” is trying to transfer “0000-00-00 00:00:00 to null”, the final error is “Invalid data” for Redshift.
The solution for this problem is: create table of Redshift manually to let “mydate” column to be “nullable”, and change the working mode of replication task to “TRUNCATE_BEFORE_LOAD”.

Processing date and time in AWS Redshift

Since AWS Redshift don’t have function like FROM_UNIX(), it’s much more weird to get formatted time from a UNIX timestamp (called ‘epoch’ in Reshift):

Ref: https://stackoverflow.com/questions/39815425/how-to-convert-epoch-to-datetime-redshift

If we want to see the statistics result group by hours:

Some tips about using AWS Glue

Configure about data format
To use AWS Glue, I write a ‘catalog table’ into my Terraform script:

But after using PySpark script to access this table, it reports:

Seems we can’t use ‘OpenCSVSerde’. Actually, the correct answer is:

The version of zeppelin
When using zeppelin to run PySpark script, it reports error:

According to the document:

The latest release of Apache Zeppelin, 0.8.x, is not supported. Download the older release named zeppelin-0.7.3-bin-all.tgz from the download page and follow the installation instructions.

Google Cloud Summit 2019

      No Comments on Google Cloud Summit 2019

Yesterday I joined the Google Cloud Summit 2019 in Sydney.

The meeting place is quite huge. And there are lot of booths from different partners of Google Cloud.





The keynote was quite abstract and a little boring, so I chose to walk around different booths for fun. Here are some useful conversations and information I collected:
[HashiCorp: company for Terraform and Nomad]
Q: How short could Terraform support a new service in cloud provider, such as Lake Formation in AWS?
A: It depends on requirements from users.
Q: Only users who paid for enterprise version of Terraform?
A: No. All users, include who use open source version, will be considered to support a new service in cloud. We published new version of Terraform quarterly, although we can’t make sure every new services in this quarter will be included.

[Confluent: company for Apache Kafka]
Q: Could I use ksql in Kafka as standard SQL?
A: Currently, no. ksql only supports self-defined syntax in Kafka. It looks really like SQL, but it’s actually another language.
Q: Could I use ksql to access a table in MySQL?
A: Yes. You can export a table in MySQL to be a ‘kstream‘. Then you can use ksql to access this ‘kstream’.

[Tableau: you know what I mean…]
Q: What are the new updates for Tableau in recent one year?
A: We published a new function called ‘Ask Data’. You can type in query with natural language, and it will translate them to tableau query, by using state-of-the-art NLP technologies.

After I type in some query like ‘avg price in Manly’, it worked very well. But if I type in query like ‘top 5 price near Chatswood’, Tableau failed to get the right answer.

A: You know, NLP is really hard so we only support a range of anonymous for query words.

[elastic: company for ElasticSearch and Kibana]
Q: What’s the biggest cluster of ElasticSearch in production?
A: Well, a lot of big companies use quit big ElasticSearch clusters, such as Netflix, eBay. But we don’t know which one is the biggest because they won’t tell us every details of their clusters 🙂

[Google Cloud]
Q: Is there a product in Google Cloud that could continually import data from MySQL and export them to Cloud Storage or BigTable?
A: Yes. Cloud Data Fusion will be your best choice.

In the booth of ASUS (it produced a lot of chrome books for Google), I noticed the Dev Board which contain a edge-TPU.





The demo use “mobilenet_ssd v2” as the backbone for object detection. Just as my choice.

Accelerate the speed of data loading in PyTorch

I got a desktop computer to train deep learning model last week. The GPU is GTX1050TI with 4GB memory which is enough for basic training on object detection. But the CPU is too old. Therefore when I run the training process, the idle of CPU is 0%. I need to reduce the burden of it.

I tried DALI from Nvidia. Although confessed that it is powerful, I also noticed that DALI is too specific to be used for customer dataset. For example, if I want to use complicate label structures more than just ‘bounding box’ coordinates, I can’t find any code example to in DALI to meet this requirement. By the way, the GPU memory in my computer is not big enough, so if moved computation burden from CPU to GPU, I would have to reduce batch size for training. That’s not a good option too.

Yesterday, from this post, I saw this suggestion:

You can use jpeg4py, library dedicated to encode big jpeg files much faster than PIL. Just read image using this library, then transform it to PIL.

After changed my code from using ‘cv.imread()’ to jpeg4py.JPEG().decode()’, the average training time for 1000 batches in my mode boosted improved 700 seconds to 670 seconds. This is just the simple and useful solution I need.

The uneasy way to implement SSDLite by myself

SSDLite is a variant of Single Shot Multi-box Detection. It uses MobileNetV2 instead of VGG as backbone. Thus it can make detection extremely fast. I was trying to implement SSDLite from the code base of ssd.pytorch. Although it’s not a easy work, I finally learn a lot from the entire process.
First, I just replace VGG with MobileNetV2 in the code. However the loss will stop to reduce after a while of training. Without knowing the reason, I have to compare my code with another open source project ssds.pytorch, to try to find the cause.
Very soon, I noticed that unlike VGG backbone, which built detection framework from 38×38 feature map, the MobileNetV2 use 19×19 feature map as its first detection layer.

“For MobileNetV1, we follow the setup in [33]. For MobileNetV2, the first layer of SSDLite is attached to the expansion of layer 15 (with output stride of 16).”

From: MobileNetV2: Inverted Residuals and Linear Bottlenecks

After changed my code as the description of this paper, the loss still couldn’t reduce in training.

In the next three weeks, I tried a lot of methods: change the aspect ratios, use SGDR to replace SGD, change the number of default boxes, even modifying the structure of neural network to be identical to to ssds.pytorch. But none of them solves the problem. There is another weird phenomenon: when I run prediction on my model, it usually gives random output for detection.

Just until last week, I noticed that my model size is about 10MB but the ssds.pytorch’s is 18MB. Why do they have different model size if their models is exactly the same? Through this clue, I eventually get the cause: a large part of my model hasn’t been back-propagated at all!
My old code only implements the forward() of MobileNetV2 which is not enough for the whole model. Therefore I add nn.ModuleList() to build model from a list of layers, as this patch:

Only the nn.ModuleList() will take all layers into back-propagation process and keep them as model weights. Otherwise, the weights will be randomly init and just use for forwarding — that’s why I get random output before.

I think I should be more carefully when adding FPN into my model in the future.

A tip about Terraform

      No Comments on A tip about Terraform

Terraform is a interesting (in my opinion) tool to implement Infrastructure-as-Code. When I first used it to write production script at yesterday, I met a error report:

After a while of searching on Google, I got the cause: it can’t find my AWS credential in my computer.
Actually I have ‘~/.aws/credentials’ file, and the ‘access_key_id’, ‘secret_access_key’ are already in it. Like this:

So why can’t Terraform get the credential? The reason is in the ‘provider’ section:

I set the ‘profile’ to ‘analytics’ at first, so the Terraform tried to find something looks like ‘[analytics]’ in ‘~/.aws/credentials’ file, and it failed. The correct way is just set ‘profile’ in ‘provider’ section to ‘default’.

Some ideas about building streaming ETL on AWS

After discussed with technical support guys from AWS, I get more information about how to use all the service of AWS to build a streaming ETL architecture, step by step.
The main architecture could be described by the diagram below:


AWS

AWS S3 is the de-facto data lake. All the data, no matter from AWS RDS or AWS Dynamo or other custom ways, could be written into AWS S3 by using some specific format, such as Apache Parquet or Apache ORC (CSV format is not recommend because it’s not suitable for data scan and data compression). Then, data engineers could use AWS Glue to extract the data from AWS S3, transform them (using PySpark or something like it), and load them into AWS Redshift.
For some frequently-used data, they could also be put in AWS Redshift for optimised query. When it is needed to join tables from both AWS S3 and AWS Redshift, we could also use AWS Redshift Spectrum.

BTW, I also joined a workshop about DataBricks’ new Unified Data Analytics and Machine Learning Platform which is built on AWS. It contains

1. Delta Lake for data storage and schema enforcement.
2. Notebook to let user directly write code and run them to process and analyze data by Apache Spark. Just like Jupyter Notebook.
3. MLFlow use above data to train machine learning model.

I used Apache Spark for learning about 4 years ago. At that time, I even need to build java/scala package by myself, upload and run it. Debugging is tedious because I can only scan logs of CLI again and again to find mistakes in code. But now, Databricks give a much more convenient solution for the data scientists and developers.


Databricks

Someone who is interesting in this platform could try free edition of it https://databricks.com/try-databricks