Is it possible to add more tracks to a running stream instead of creating a new one?

Apr 5, 2014 at 9:11 AM
Dear linvi,

First I need to thank you for your appreciated work,

I need to do something like this
if(_savedStream == null)
{
     _savedSream = Stream.CreateFilteredStream();
      
      /// add tracks and handle events
  
     _savedSream.StartStreamMatchingAnyCondition();       
}
else
{
_savedSream.AddTrack(newTrack);
_savedSream.StartStreamMatchingAnyCondition();       
}
Coordinator
Apr 5, 2014 at 11:37 PM
Hi,

It is not, you need to stop the stream, add you parameters and restart the stream.
It is not possible to have the stream continue with new parameters because the request is currently running.

So just stop the stream, add your tracks and restart the stream.

Linvi
Apr 6, 2014 at 7:15 AM
Thanks linivi for your quick reply,

do you mean something like
_savedSream.StopStream();
_savedSream.AddTrack(newTrack);
_savedSream.StartStreamMatchingAnyCondition(); 
Coordinator
Apr 7, 2014 at 12:58 AM
Yes exactly.
Apr 13, 2014 at 10:05 AM
Many Thanks ,

I just need to share this work with you and other guys, I just wanted that the stream to listen to any upcoming requests for adding new tracks to it
without breaking your design, so I have made the following modifications in the following classes:

In the TrackedStream base class I have added the following property
public BlockingCollection<Dictionary<int, string>> MoreTracks { get; set; } 
it is a blocking collection for new keywords that should be based to StreamResultGenerator's StartStream method by FilteredStream's StartStreamMatchingAnyCondion method to be observed there:
public void StartStreamMatchingAnyCondition()
        {
            Func<HttpWebRequest> generateWebRequest = () => _oAuthToken.GetQueryWebRequest(GenerateORFilterQuery(), HttpMethod.POST);

            Action<string> tweetReceived = json =>
            {
                var tweet = _tweetFactory.GenerateTweetFromJson(json);
                if (tweet == null)
                {
                    TryInvokeGlobalStreamMessages(json);
                    return;
                }

                var matchingTrackAndActions = _streamTrackManager.GetMatchingTracksAndActions(tweet.Text);
                var matchingTracks = matchingTrackAndActions.Select(x => x.Item1);
                var machingLocationAndActions = GetMatchedLocations(tweet.Coordinates);
                var matchingLocations = machingLocationAndActions.Select(x => x.Key);

                CallMultipleActions(tweet, matchingTrackAndActions.Select(x => x.Item2));
                CallMultipleActions(tweet, machingLocationAndActions.Select(x => x.Value));
                CallFollowerAction(tweet);

                RaiseMatchingTweetReceived(new MatchedTweetReceivedEventArgs(tweet, matchingTracks));
                this.Raise(MatchingTweetAndLocationReceived, new MatchedTweetAndLocationReceivedEventArgs(tweet, matchingTracks, matchingLocations));
            };

            if (MoreTracks != null)
                _streamResultGenerator.StartStream(tweetReceived, generateWebRequest, MoreTracks);

            _streamResultGenerator.StartStream(tweetReceived, generateWebRequest);
        } 
it's an overload for the StartStream method and here's the implementation:
public void StartStream(Func<string, bool> processObject, Func<HttpWebRequest> generateWebRequest, BlockingCollection<Dictionary<int, string>> moreTracks)
        {
            if (IsRunning)
            {
                throw new OperationCanceledException(Resources.Stream_IllegalMultipleStreams);
            }

            if (processObject == null)
            {
                throw new NullReferenceException(Resources.Stream_ObjectDelegateIsNull);
            }

            _lastException = null;
            _streamState = StreamState.Resume;
            this.Raise(StreamStarted);

            HttpWebRequest webRequest = generateWebRequest();
            _currentReader = InitWebRequest(webRequest);

            if (_lastException != null)
            {
                _streamState = StreamState.Stop;
            }

            int errorOccured = 0;
            while (StreamState != StreamState.Stop)
            {
                if (StreamState == StreamState.Pause)
                {
                    Thread.Sleep(STREAM_RESUME_DELAY);
                    continue;
                }

                try
                {
                    if (moreTracks.Count > 0)
                    {
                        StopStream(moreTracks.Take());
                        break;
                    }
                        
                    string jsonResponse = _currentReader.ReadLine();

                    #region Error Checking

                    if (jsonResponse == null)
                    {
                        if (errorOccured == 0)
                        {
                            ++errorOccured;
                        }
                        else if (errorOccured == 1)
                        {
                            ++errorOccured;
                            webRequest.Abort();
                            _currentReader = InitWebRequest(webRequest);
                        }
                        else if (errorOccured == 2)
                        {
                            ++errorOccured;
                            webRequest.Abort();
                            webRequest = generateWebRequest();
                            _currentReader = InitWebRequest(webRequest);
                        }
                        else
                        {
                            Trace.WriteLine("Twitter API is not accessible");
                            break;
                        }
                    }
                    else
                    {
                        errorOccured = 0;
                    }

                    #endregion

                    if (jsonResponse == String.Empty)
                    {
                        continue;
                    }

                    if (StreamState == StreamState.Resume && !processObject(jsonResponse))
                    {
                        StreamState = StreamState.Stop;
                        break;
                    }
                }
                catch (WebException wex)
                {
                    _exceptionHandler.AddWebException(wex, String.Empty);
                }
                catch (Exception ex)
                {
                    _lastException = ex;

                    if (ex is IOException)
                    {
                        if (StreamState == StreamState.Stop)
                        {
                            return;
                        }

                        if (ex.Message == "Unable to read data from the transport connection: The connection was closed.")
                        {
                            _currentReader = InitWebRequest(webRequest);
                        }

                        try
                        {
                            _currentReader.ReadLine();
                            _lastException = null;
                        }
                        catch (IOException ex2)
                        {
                            if (ex2.Message == "Unable to read data from the transport connection: The connection was closed.")
                            {
                                Trace.WriteLine("Streamreader was unable to read from the stream!");
                            }
                        }
                        catch (ObjectDisposedException)
                        {
                            // StopStream has been called
                            _lastException = null;
                        }
                    }

                    break;
                }
            }

            if (webRequest != null)
            {
                webRequest.Abort();
            }

            if (_currentReader != null)
            {
                _currentReader.Dispose();
            }

            StreamState = StreamState.Stop;
        }
if more tracks need to be added and passed by the implementing methods it will stop the stream and passing the new keywords back as arguments for StreamStoppedEvent to decide to start the stream again with the new keywords.
public void StopStream(Dictionary<int, string> moreTracks)
        {
            _lastException = null;
            _streamState = StreamState.Stop;
            var streamExceptionEventArgs = new StreamExceptionEventArgs(moreTracks);
            this.Raise(StreamStopped, streamExceptionEventArgs);
        }
stream.StreamStopped += (sender, args) =>
            {

                // If new keywords need to be added to stream then add them and restart this stream
                if (args.NewTracksToAdd != null)
                {
                    //_logger.Log(logPath, LogType.Normal, Thread.CurrentThread.ManagedThreadId.ToString(), "SocialTrackerAgent", "UpdateStream", _params, "Void", string.Join(",", args.NewTracksToAdd));

                    args.NewTracksToAdd.Foreach(current =>
                    {
                        stream.AddTrack(current.Value);
                        handler.ExecuteNonQueryEx("Update KEYWORDS set FOLLOWING=1 where ID=" + current.Key);
                       // ProcObj.ExecuteNonQuerySqlString("Update KEYWORDS set FOLLOWING=1 where ID=" + current.Key, "orcl", "socialtracker", "socialtracker");
                    }
                      );

                }

                else
                {
                    lock (_lockObj)
                    {
                        //_params.Add("SenderStream", _stream.ToString());
                        _logger.Log(logPath, LogType.Exception, Thread.CurrentThread.ManagedThreadId.ToString(), "SocialTrackerAgent", "StreamStopped", _params, "Void", args.Exception.ToString());
                    }
                }


                // Pause This Stream for 2 seconds
                Thread.Sleep(2 * 1000);
                stream.StartStreamMatchingAllConditions();
            };
May 27, 2014 at 3:23 PM
Edited May 27, 2014 at 3:32 PM
Hi Linvi,

I am using following code to stop an already running filtered stream and then resume it back with new track added.
// declared golbally
static IFilteredStream filteredStream = Tweetinvi.Stream.CreateFilteredStream();

// in static void main() method
filteredStream.StartStreamMatchingAnyCondition();

// in another static method
filteredStream.StopStream();
filteredStream.AddTrack("@newmention");
filteredStream.StartStreamMatchingAnyCondition();
As soon as the last line executes, I get following error:
Value can not be Null. Parameter name: source
Also I have tried using following instead of filteredStream.StartStreamMatchingAnyCondition();:
filteredStream.ResumeStream();
But after adding new track, it just STOPS tracking any tweets from any of already tracked users (@mentions). And after some time, the stream automatically ended up with unknown error.

I have noticed something strange here while debugging, that, when adding new track to the stream, all values in the Tracks dictionary were null. e.g.
filteredStream.Tracks[0].Key    // is equal to '@mention' where as
filteredStream.Tracks[0].Value  // is equal to null, for every track
So what is the most reliable way to add tracks to a running stream and keep the stream stable as well.
(although I have already implemented the suggestions for resuming the stream when stopped)

Regards.
Sep 14, 2014 at 12:08 AM
Hi shakeelmanjum,

Were you able to solve this? I'm running into what seems like the same issue and can't seem to find a definitive solution. Any help/insight/advice would be greatly appreciated.

Thanks for taking the time

My posting -> https://tweetinvi.codeplex.com/discussions/567315