Building a Twitter GenServer with ExTwitter part 2

Posted on Tue 13 September 2016 in elixir

Premise boilerplate ;)

This article tackles my learning experience in building a GenServer process that talks with Twitter. I am learning the Elixir language in my evenings, so bear with me and please comment if you will find inaccuracies in this article.

I will not comment every line of code of my experiment, however feel free to drop a comment if there's something that triggers your interest.

Objective

This is the second part of building a little GenServer that talks to Twitter by taking advantage of the ExTwitter elixir module.

It will focus on the actual implementation of the GenServer callbacks that are handling the communication with Twitter, being a stream or a normal Twitter search.

You can find the first part of the article here

I could take advantage of special effects... however here is the GenServer implementation!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def handle_call(%{search: topic}, _from, state) do
  tweets = ExTwitter.search(topic)
  {:reply, tweets, Map.put(state, :tweets, tweets)}
end

def handle_call(:entries, _from, state) do
  {:reply, Enum.reverse(Map.get(state, :tweets, [])), state}
end

def handle_call(:stop_stream, _from, %{stream: stream_pid, tweets: tweets}) do
  ExTwitter.stream_control(stream_pid, :stop)
  Process.exit(stream_pid, :normal)
  {:reply, :ok, %{tweets: tweets}}
end

def handle_call(:stop_stream, _from, state) do
  {:reply, :stream_not_started, state}
end

def handle_call(%{start_stream: topic, timer: milliseconds}, _from, state) do
  {:reply, :ok, Map.put(state, :timer, schedule_work(topic, milliseconds))}
end

# Stream already started? just carry on with the state
def handle_info(%{fetch_tweets: _}, %{stream: _} = state)  do
  {:noreply, state}
end

def handle_info(%{fetch_tweets: topic}, state) do
  parent = self()
  pid = spawn_link fn ->
    configure_extwitter()
    for tweet <- ExTwitter.stream_filter([track: topic], :infinity) do
      send parent, {:tweet, tweet}
    end
  end
  {:noreply, Map.put(state, :stream, pid)}
end

def handle_info({:tweet, tweet}, state) do
  tweets = [tweet|Map.get(state, :tweets, [])]
  {:noreply, Map.put(state, :tweets, tweets)}
end

def handle_info(:purge_tweets, state) do
  schedule_cleanup()
  tweets = Map.get(state, :tweets, [])
  |> Enum.take(@max_keep_tweets)
  {:noreply, Map.put(state, :tweets, tweets), :hibernate}
end

Woah, that was a big hit! Let's break it down again...

1
2
3
4
def handle_call(%{search: topic}, _from, state) do
  tweets = ExTwitter.search(topic)
  {:reply, tweets, Map.put(state, :tweets, tweets)}
end

If you remember our GenServer interface described in part 1, we had a GenServer.call(via_tuple(namespace), %{search: topic}) in the search function. When calling Tweetyodel.Worker.search("my_tweets", "#myelixirstatus") a synchronous message will be sent to our GenServer and handled by this callback.

It's doing nothing special, as you can see we pattern match the map that is used as a payload in the GenServer.call function. We execute a blocking search and we:

1
{:reply, tweets, Map.put(state, :tweets, tweets)}

Which means that we :reply instantly, the tweets that we searched and we "save" in the state of the GenServer the tweets in the GenServer state map that we initialized in init.

That's what Tweetyodel.Worker.search("ma' namespace", "#myelixirstatus") does.

Simple as that.

Let's have a look to another snippet

1
2
3
def handle_call(:entries, _from, state) do
  {:reply, Enum.reverse(Map.get(state, :tweets, [])), state}
end

Again we have a sync blocking GenServer call that sends immediately a :reply. For instance it returns the tweets that are stored in the "belly" of the GenServer. The GenServer's state

1
2
3
def handle_call(%{start_stream: topic, timer: milliseconds}, _from, state) do
  {:reply, :ok, Map.put(state, :timer, schedule_work(topic, milliseconds))}
end

This bit starts to be (maybe! :P) more interesting, since we start a timer and we will decide when the Twitter stream will start.

The schedule_work function calls Process.send_after(self(), %{fetch_tweets: topic}, milliseconds) which will send back to the GenServer a message after some number of milliseconds with the %{fetch_tweets: topic} map so that another callback could handle the timer

Why a timer? I just wanted to play with timers ;) It's a pet project after all no?

The "core" of our GenServer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def handle_info(%{fetch_tweets: topic}, state) do
  parent = self()
  pid = spawn_link fn ->
    configure_extwitter()
    for tweet <- ExTwitter.stream_filter([track: topic], :infinity) do
      send parent, {:tweet, tweet}
    end
  end
  {:noreply, Map.put(state, :stream, pid)}
end

Since I wanted a separate process to handle the stream to not block the GenServer process, in this snippet I spawn_link another process linked to the parent

This will "walk through" the possibly infinite stream of tweets and when the stream has data, it will send back to the GenServer a message for every new tweet.
Take a look to the send parent, {:tweet, tweet} statement.

We save also the pid of the stream to interact with it later on.

And voila'

1
2
3
4
def handle_info({:tweet, tweet}, state) do
  tweets = [tweet|Map.get(state, :tweets, [])]
  {:noreply, Map.put(state, :tweets, tweets)}
end

handle_info({:tweet, tweet}, state) matches the messages send by the spawn(ed)_link process and we prepend the tweet to the map state.

Since we are there we also update the state of the GenServer to enable us to query for new entries from the Twitter stream.

And if we get bored of this stream of tweets?!?

Tweetyodel.Worker.stop_stream(namespace) to the rescue!

1
2
3
4
5
def handle_call(:stop_stream, _from, %{stream: stream_pid, tweets: tweets}) do
  ExTwitter.stream_control(stream_pid, :stop)
  Process.exit(stream_pid, :normal)
  {:reply, :ok, %{tweets: tweets}}
end

Which will match the stream_pid from the GenServer state (if present) and the tweets and of course the "invocation" :stop_stream atom.

This particular handle_call will stop the stream and kill the child process that we spawned with spawn_link to start from scratch. It will preserve the GenServer's state although, so that we could fetch entries still.

Infinite tweets!! ROAR

What will happen if we will continue to accumulate tweets? We will become a fat tweety for sure and maybe explode.

1
2
3
4
5
6
def handle_info(:purge_tweets, state) do
  schedule_cleanup()
  tweets = Map.get(state, :tweets, [])
  |> Enum.take(@max_keep_tweets)
  {:noreply, Map.put(state, :tweets, tweets), :hibernate}
end

That's why in init I call schedule_cleanup which, again, uses Process.send_after to schedule :purge_tweets When :purge_tweets is matched, we re-schedule another :purge_tweets and we Enum.take(@max_keep_tweets). For instance, we might want to keep only the last 100 tweets as an example.

I am also replying with the :hibernate atom which forces a full sweep garbage collection until we will receive other activity in the GenServer process.

Ta-da! That's it! There's is a little bit more "boilerplate" in the repository

Thank you for having followed me until now! ;)