Best Practise for contiguous collection

Jul 25, 2014 at 10:25 AM
I have been using userstream and filterstream and am very impressed with the API. Thanks.

This approach allows me to collect live data as it is presented on Twitter. However the problem I have is when my collecting process is stopped and then later restarted.

What is best practise for catching up with missed data ?

One thing I wondered about was if one needed to switch over between collecting old data and live data (say at the point catching up had been completed) how would one ensure that no events were missed when catch up is stopped and live data is started?

Thanks
Jul 29, 2014 at 2:31 PM
Edited Jul 29, 2014 at 4:15 PM
I am considering doing something along the lines of
                Dim lngMaxID As Long = 500000000000000000
                Dim searchParameter As ITweetSearchParameters = Search.GenerateSearchTweetParameter("obama")
                Do
                    searchParameter.MaxId = lngMaxID - 1
                    tweets = Search.SearchTweets(searchParameter)
                    If tweets.Count > 0 Then lngMaxID = tweets(tweets.Count - 1).Id
                Loop While tweets.Count > 0
But I run out of SearchTweetsLimit rather quickly. I then get a "Object reference not set to an instance of an object." exception once the SearchTweetsLimit reaches 0. It would be nicer to get a specific exception for this.

What is best practise ? Should I be calibrating my read loop to the rate limits for example? i.e. set the number of Tweets and the poll interval such that the rate limit will not be exceeded?

I have also seen that setting the searchParameter.MaximumNumberOfResults to something large (e.g. 5000) works. How is this so when the documentation states that 100 is the limit for this settings ?

How have others approached this ?
Thanks
Jul 30, 2014 at 12:34 AM
Edited Jul 30, 2014 at 12:36 AM
Hi there,

I have spent more than 1h30 and trying to find a solution to recover tweets from the search.
The problem with search of the Twitter Api is that it is very difficult to rely on it.

Furthermore you need to know that even the stream are not done in a timely manner, meaning you that you can receive a tweet with an id lower than the previous one you received (this is happening around 1 time every 20 tweets).

I need to get some sleep now but I will get back to this problem tomorrow.

Linvi
Jul 30, 2014 at 2:24 PM
Edited Jul 30, 2014 at 6:26 PM
Hello there,

I did it! Please use the following code which is an example on how to solve this problem.
private const string SEARCH_KEYWORD = "hello";

public static void Main()
{
    var stream = CreateSafeFilteredStreamWithSingleTrack(SEARCH_KEYWORD);
    var streamThread = new Thread(stream.StartStreamMatchingAllConditions);
    streamThread.Start();

    // We simulate that the stream stopped after running for 5 seconds
    Thread.Sleep(5000);
    stream.StopStream();

    // Ensure that we do not close the application
    streamThread.Join();
}

private static IFilteredStream CreateSafeFilteredStreamWithSingleTrack(string track)
{
    bool restorePreviousData = false;
    long lastTweetIdReceived = -1;

    var stream = Stream.CreateFilteredStream();

    stream.AddTrack(track);

    stream.MatchingTweetReceived += (sender, args) =>
    {
        if (restorePreviousData)
        {
            restorePreviousData = false;
            var sinceId = lastTweetIdReceived;
            var maxId = args.Tweet.Id;

            var restoreThread = new Thread(() => RestorePreviousDataBetween(sinceId, maxId));
            restoreThread.Start();
        }

        lastTweetIdReceived = args.Tweet.Id;
        StoreTweets(args.Tweet);
    };

    stream.StreamStarted += (sender, args) =>
    {
        Console.WriteLine("Stream Started");
        bool didTheStreamReceivedAnyTweetYet = lastTweetIdReceived != -1;

        if (didTheStreamReceivedAnyTweetYet)
        {
            restorePreviousData = true;
        }
    };

    stream.StreamStopped += (sender, args) =>
    {
        const int awaitDuration = 30000;
        Console.WriteLine("Stream Stopped");
        Console.WriteLine("Restarting the stream in {0} seconds", awaitDuration/1000);
        Thread.Sleep(awaitDuration);
        stream.StartStreamMatchingAllConditions();
    };

    return stream;
}

private static ITweetSearchParameters GenerateSearchBeforeMaxId(long maxId)
{
    var search = Search.GenerateTweetSearchParameter(SEARCH_KEYWORD);
    search.MaxId = maxId;
    search.MaximumNumberOfResults = 100;
    search.SearchType = SearchResultType.Recent;
    return search;
}

private static void WaitForDataToBeIndexedByTwitterSearchApi(long maxId)
{
    Console.WriteLine("Waiting for Twitter Search API to index the tweets that have been missed...");

    var search = Search.GenerateTweetSearchParameter(SEARCH_KEYWORD);
    search.MaxId = maxId + 1;
    search.SinceId = maxId - 1;

    var searchResult = Search.SearchTweets(search);

    while (!searchResult.Any())
    {
        Thread.Sleep(10000);
        searchResult = Search.SearchTweets(search);
    }

    Console.WriteLine("Twitter Search API successfully indexed the tweets...");
    // We wait 5 more seconds as the TwitterSearchAPI can index tweets older tweets after more recent tweets
    Thread.Sleep(5000);
}

private static void RestorePreviousDataBetween(long sinceId, long maxId)
{
    Console.WriteLine("Restoring missed data...");
    WaitForDataToBeIndexedByTwitterSearchApi(maxId);

    var tweetHelper = TweetinviContainer.Resolve<ITweetHelper>();

    // Perform an initial search to get all the most recent tweets with the search parameter
    var result = Search.SearchTweets(GenerateSearchBeforeMaxId(maxId)).ToArray();
    // From the result of this search, select the lowest tweet Id (the tweet created first) so that we can go back in time
    var lastTweetId = tweetHelper.GetOldestTweetId(result.Select(x => x.TweetDTO));

    // If the oldest tweet is older than latest tweet we have received during our previous session stop
    while (lastTweetId > sinceId)
    {
        StoreTweets(result.ToArray());
        Console.WriteLine(string.Format("Restoring {0} tweets", result.Length));

        // Get the next set of tweets that matches the search but before the oldest tweet received from the search
        result = Search.SearchTweets(GenerateSearchBeforeMaxId(lastTweetId)).ToArray();
        // Update the id of the oldest tweet
        lastTweetId = tweetHelper.GetOldestTweetId(result.Select(x => x.TweetDTO));
    }

    // Get all the tweets that have been published after the latest tweet we received in our previous session.
    var finalResultSet = result.Where(x => x.Id > sinceId).ToArray();
    Console.WriteLine(string.Format("Restoring {0} tweets", finalResultSet.Length));
    Console.WriteLine("All tweets have now been restored!");
    StoreTweets(finalResultSet);
}

private static void StoreTweets(params ITweet[] tweets)
{
    // Do whatever you want here
}
Regards,
Linvi
Marked as answer by linvi on 7/30/2014 at 7:33 AM
Jul 30, 2014 at 2:58 PM
Edited Jul 30, 2014 at 3:25 PM
Hi Linvi. That is neat. Thank you so much for your time.

One other possible problem is that if the search filter does not generate frequent tweets, it is not possible to restore the missed ones until a new one arrives. Is there a way to get the latest Tweet ID anywhere on twitter so that collection can start as soon as the program is restarted in order that we dont have to wait around for a filterstream match ?

The advantage of the way you have done it of course is that new event tweets dont have to care about whether those events have been collected by the catch up code.

I guess the program could always post or DM itself?

Thanks
Jul 30, 2014 at 3:29 PM
Edited Jul 30, 2014 at 3:30 PM
Well,

I think the code I provided is the best way to go. The reason for that is that there is gap (between 1 and 2 seconds) between the moment that you Start a stream and the moment when you actually receive data from twitter.

I believe that if you are willing to recover the tweets, it is for data consistency and you do not want to miss any tweet from the stream.
Therefore, waiting to the next tweet to be received will allow you to ensure that any tweet published within this gap will be restored and added to your database.

If you do not care about this case here are 2 solutions that will allow you to get the latest tweet published:
// From any account if you do not already have a sample stream running on your machine + account
var stream = Stream.CreateSampleStream();
stream.TweetReceived += (sender, args) =>
{
    var lastPublishedTweetId = args.Tweet.Id;
    stream.StopStream();
    RestorePreviousDataBetween(sinceId, lastPublishedTweetId);
};

// From another test account so that is does not "corrupt" the account profile
var tweet = Tweet.PublishTweet(Guid.NewGuid().ToString());
var lastPublishedTweetId = tweet.Id;
tweet.Destroy();
RestorePreviousDataBetween(sinceId, lastPublishedTweetId);
Linvi
Jul 30, 2014 at 3:35 PM
The second case was what I was thinking of, but I agree that your original proposal it the better solution.
Thanks again.
Jul 30, 2014 at 4:21 PM
Edited Jul 30, 2014 at 5:21 PM
Just going through your code, it all makes sense, but I would be very grateful for some background/code comments to the steps taken in RestorePreviousDataBetween(). In particular what is happening with TweetDTOs. I need to understand in order to port it to VB.NET
Thanks
Jul 30, 2014 at 6:30 PM
I have added some comments to the method. It seems quite straightforward to me.

IMPORTANT
If forgot to mention that when storing the Tweet you need to use UPDATE OR CREATE in your database.
The reason is that in a FilteredStream you can receive older tweets after younger tweets.
It means that restoring from the latest tweet could be restoring tweets that you already received from the Stream.

Feel free to ask me any question if you need.

Linvi

PS: I know you have been using Tweetinvi for quite some time, I will be happy to have any proposal for the next release of Tweetinvi. Please have a look in here : https://tweetinvi.codeplex.com/discussions/554619.
Jul 31, 2014 at 8:51 AM
Thanks for your comments in the code. The problem was trying to use anonymous methods and Lamda functions in VB, and having more strict type checking to deal with too - so this C# code
var tweetHelper = TweetinviContainer.Resolve<ITweetHelper>();
var result = Search.SearchTweets(GenerateSearchBeforeMaxId(maxId)).ToArray();
var lastTweetId = tweetHelper.GetOldestTweetId(result.Select(x => x.TweetDTO));
now looks like this under VB.Net
    Dim tweetHelper As ITweetHelper = TweetinviContainer.Resolve(Of ITweetHelper)()
    Dim result As IEnumerable(Of ITweet) = Search.SearchTweets(GenerateSearchBeforeMaxId(maxId)).ToArray()
    Dim lastTweetId As Long = tweetHelper.GetOldestTweetId(result.Select(Function(x) (x.TweetDTO)))
Thanks.

Two other points.
  1. I will need to consider rate limits for this approach. So I will possibly need to tweak the routine to be able to be reentered later and carry on where it left off once the rate limits are reset.
  2. I was interested in you last comment about duplicates. You are saying that the some of the Tweets in the block of missed data can still be picked up as live events when streams are restarted ? That is bad news. Is this a transitory thing?, i.e. does it looks as if it is a race condition because events are already in the process of being sent to the stream? Or can this happen minutes or even hours later. I need to find a way to deal with this but cannot rely on RDBMS to trap the duplicates. My tweets go into a proprietary queue data structure so I would have to keep a record of all IDs and check for duplicates whenever a new one is presented. But obviously this cannot be done indefinitely - hence my question about whether this is transitory?
Thanks
Jul 31, 2014 at 10:13 AM
Edited Jul 31, 2014 at 10:30 AM
I wonder if Twitter is reliable enough to do this.

I have a set of 44 Tweet IDs which I had previously received via filterstream for the pattern "golden". When I pass the first and the last into the routine, it only comes back with a set of 40. Could there be some difference between how filterstream and Search work ?

The ids I got from filterstream for "golden" are :

493791589899444224
493791614364430336
493791641472225280
493791644253425666
493791648925503488
493791657687781376
493791658870571010
493791672120008706
493791672291966976
493791677480325120
493791679506161664
493791684913012736
493791692873428993
493791696027918336
493791704794021888
493791706840449025
493791710129164288
493791737798983681
493791742454693888
493791748943282177
493791755725066241
493791756946014208
493791758484918272
493791760158445568
493791766978363393
493791784414101504
493791788339978241
493791793608404993
493791794669178881
493791800977408001
493791810922098688
493791817905995777
493791822423269376
493791828370792448
493791835060318209
493791837770231808
493791855704670208
493791860255916033
493791862893735936
493791866437918720
493791872515465216
493791873115652096
493791894065786881

I passed these parameters in
RestorePreviousDataBetween(493791589899444224, 493791894065786881)
and get this output

Restoring missed data...
Waiting for Twitter Search API to index the tweets that have been missed...
Twitter Search API successfully indexed the tweets...
Restoring 40 tweets
All tweets have now been restored!


This is an example of a tweet ID which is within the range above and that filterstream did pick up, but search did not :

{'493791873115652096', 28/07/2014 17:15:31, 'Gursiman Feilden', 'http://t.co/XosE14ELKF 820 «Golden Girls» Strip Boutique', 'English', '31/07/2014 10:25:56'}


Your thoughts please Linvi.
Thanks again.
Jul 31, 2014 at 12:12 PM
Hi,

From my experience I would say that the Search API of Twitter is not reliable.
You just gave an example but there are many others that slipped out of my mind since I last played with it.

The problem is that it is the only solution you have on Twitter to get back to previous tweets published.
Another solution you could use is to have a fallback stream installed on another computer and using another account, but the complexity of such solution is a bit higher.

Also, you need to consider something else:
The streaming API push the Tweets approximately in the history order, such as you can receive the following ids order:

1, 2, 4, 3, STREAM_STOPPED

With such case, the solution I provided will not work for you because it will perform the search with a sinceId of 3.
It means that the tweet with Id 4 that you already retrieved will be part of the search result set.

Consequently, you need to store the 10 last ids you retrieved and ensure during the recovery stage that you do not push these ids into the queue.

Are my explanations clear enough?

Linvi
Jul 31, 2014 at 1:06 PM
Yes very clear. Thanks for your time.

For us it is better to tell customers that they cannot have any data, than to give them duplicates or miss data.

So we will do without for now and hope that Twitter improves things down the line a bit.