logo
down
shadow

RavenDB Stream for Unbounded Results - Connection Resilience


RavenDB Stream for Unbounded Results - Connection Resilience

Content Index :

RavenDB Stream for Unbounded Results - Connection Resilience
Tag : chash , By : user183842
Date : November 24 2020, 01:01 AM

should help you out As per the suggestion from @StriplingWarrior I've recreated the solution using Data Subscriptions.
Using this approach I was able to iterate over all 2 million rows (though admittedly with much less processing per item); 2 points here that would have helped when we were trying to implement the same logic using Streams:
using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();

    using (var bulkInsert = store.BulkInsert())
    {
        for (var i = 0; i != recordsToCreate; i++)
        {
            var person = new Person
            {
                Id = Guid.NewGuid(),
                Firstname = NameGenerator.GenerateFirstName(),
                Lastname = NameGenerator.GenerateLastName()
            };

            bulkInsert.Store(person);
        }
    }
}
using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>());

    var personSubscription = store.Subscriptions.Open<Person>(
        subscriptionId, new SubscriptionConnectionOptions()
    {
        BatchOptions = new SubscriptionBatchOptions()
        {
            // Max number of docs that can be sent in a single batch
            MaxDocCount = 16 * 1024,  
            // Max total batch size in bytes
            MaxSize = 4 * 1024 * 1024,
            // Max time the subscription needs to confirm that the batch
            // has been successfully processed
            AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
        },
        IgnoreSubscribersErrors = false,
        ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
    });

    personSubscription.Subscribe(new PersonObserver());

    while (true)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(500));
    }
}
public class PersonObserver : IObserver<Person>
{
    public void OnCompleted()
    {
        Console.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("Error occurred: " + error.ToString());
    }

    public void OnNext(Person person)
    {
        Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'");
    }
}

Comments
No Comments Right Now !

Boards Message :
You Must Login Or Sign Up to Add Your Comments .

Share : facebook icon twitter icon

Node.JS Unbounded Concurrency / Stream backpressure over TCP


Tag : node.js , By : Ben Brown
Date : March 29 2020, 07:55 AM
will be helpful for those in need To answer your first question, I believe your understanding is not accurate -- at least not when piping data between streams. In fact, if you read the documentation for the pipe() function you'll see that it explicitly says that it automatically manages the flow so that "destination is not overwhelmed by a fast readable stream."
The underlying implementation of pipe() is taking care of all of the heavy lifting for you. The input stream (a Readable stream) will continue to emit data events until the output stream (a Writable stream) is full. As an aside, if I remember correctly, the stream will return false when you attempt to write data that it cannot currently process. At this point, the pipe will pause() the Readable stream, which will prevent it from emitting further data events. Thus, the event loop isn't going to fill up and exhaust your memory nor is it going to emit events that are simply lost. Instead, the Readable will stay paused until the Writable stream emits a drain event. At that point, the pipe will resume() the Readable stream.

RavenDB - stream index query results in exception


Tag : chash , By : sgmichelsen
Date : March 29 2020, 07:55 AM
To fix this issue As @nicolai-heilbuth stated in the comments to @jens-pettersson's answer, it seems to be a bug in the RavenDB client libraries from version 3 onwards.
Bug report filed here: http://issues.hibernatingrhinos.com/issue/RavenDB-3916

Total aggregate over an unbounded stream in Dataflow


Tag : development , By : janik
Date : March 29 2020, 07:55 AM
this one helps. Ryan your solution of using a global window and a periodic trigger is the recommended approach. Just make sure you use accumulation mode on the trigger and not discarding mode. The Triggers page should have more information.
Let us know if you need additional help.

RavenDb Stream returning no results


Tag : development , By : fstender
Date : March 29 2020, 07:55 AM
With these it helps So I finally figured this out. Seems the documentation is lacking a few key details in regards to the Streaming API. Basically I was on the right track, I just had to actually create the index earlier in the code. I ended up using the AbstractIndexCreationTask, which I like better than the PutIndex method, although Im not sure if they are doing the same thing or not. In any case, this works:
        var store = new EmbeddableDocumentStore
        {
            DataDirectory = "data"
        };
        store.Initialize();

        **IndexCreation.CreateIndexes(typeof(TestIndex).Assembly, store);**

        using (IDocumentSession session = store.OpenSession())
        {

            for (var t = 1; t < 100; t++)
            {
                var subtest = new Test
                {
                    Id = new Guid(),
                    SubTest = new SubTest
                    {
                        Name = "NewTest",
                        Id = new Guid()
                    }
                };
                session.Store(subtest);
            }
            session.SaveChanges();
        }


        using (IDocumentSession session = store.OpenSession())
        {
            IQueryable<Test> query = session.Query<Test, TestIndex>();

            var enumerator = session.Advanced.Stream(query);

            while (enumerator.MoveNext())
            {
                var t = enumerator.Current.Document;
                System.Console.WriteLine(t.Id);
            }

        }
public class TestIndex : AbstractIndexCreationTask<Test>
{
    public TestIndex()
    {
        this.Map = tests =>
            from t in tests
            select new
            {
                t.Id,
                t.SubTest
            };
    }
}    

Saving a Twitter Sample Stream to RavenDB results in the exception The maximum number of requests (30) allowed for this


Tag : chash , By : UnKnownUser
Date : March 29 2020, 07:55 AM
Any of those help You're creating a single document session, and using that for all tweets that are received. So, if you receive 5000 tweets over the span of an hour, you'll hit this issue.
Two ways to fix this:
static void Main(string[] args)
{
    Auth.SetUserCredentials() // redacted for SO

    var stream = Stream.CreateSampleStream();
    stream.TweetReceived += (sender, theTweet) =>
    {
        var tm = new TwitterModels
        {
            Id = theTweet.Tweet.Id,
            TheTextFromTwitter = theTweet.Tweet.FullText
        };

        using (var session = DocumentStoreHolder.Store.OpenSession())
        { 
           session.Store(tm);
           session.SaveChanges();
        }
    };
    stream.StartStream();             
}       
using (BulkInsertOperation bulkInsert = store.BulkInsert())
{
   stream.TweetReceived += (sender, theTweet) =>
   {
       var tm = new TwitterModels
       {
           Id = theTweet.Tweet.Id,
           TheTextFromTwitter = theTweet.Tweet.FullText
       };


       bulkInsert.Store(tm);
   }
};
Related Posts Related QUESTIONS :
  • EF Core and MySql query is too slow
  • Getting Registered App Display Name from an App Id
  • How to get all variables from a string
  • Delete entity with all childs connected
  • Azure Build agent cant´t find class library referance
  • Initialize Nested Dictionaries in c#
  • .Net Core Binding
  • Generic event test method, preventing code duplication
  • How do I keep the ellipses in the center when the screen is resized
  • How to require a property using JsonSchema.NET?
  • C# XDocument Element/Elements returns null
  • Autofac keyed service with IEnumerable relationship type
  • Installing EntityFramework via NuGet manager
  • Always Check if there is Internet Connection Xamarin forms
  • WCF OneWay service slows down when aspNetCompatibilityEnabled is set to false
  • Can we use JsonRequestBehavior.AllowGet with [HttpPost] attribute?
  • How to customize the Setup wizard with custom forms in Visual Studio setup project
  • C# ASP.NET - Use method from another class to create labels
  • C# List IList or IEnumerable as argument
  • Parsing File with C# And Replace method
  • Losing special unicode characters in encryption (C#)
  • Getting stored procedure returned value instead of row affected
  • How can I construct HTML using NameValuePair in android?
  • Loading a pop up page in ASP.net through a js file
  • How to pass alert or notification message from controller to View?
  • C# to pause, turn on ssas server, backup cube.... how to?
  • How to execute DataTable.Select() for a column of custom class type for a particular element in that C#
  • how to connect mysql8.0 with C#
  • Passing incorrect values into MultiValueConverter by MultiBinding
  • Can i use IEnumerator as Update func?
  • How to convert API Json response to C# Array?
  • Blazor Textfield Oninput User Typing Delay
  • Performing both layout and render transform results in wrong output
  • uwp beforetextchanged cursor moving in front of text
  • How to keep duplicates from a string[] exclude words from a List and print them out
  • .Net Core Strings.Asc/Mid/Chr/Len missing even after importing Microsoft.VisualBasic
  • How to return to previous search page without being asked to Confirm Form Re-submission and keeping the results on ASP.N
  • How set a identity scaffolding item/page how initial page in asp.net MVC core?
  • LINQ isn't calling Dispose on my IEnumerator when using Union and Select, expected behavior or bug?
  • What is "ByteArray.uncompress()" in AS3 equivalent to in C#?
  • Getting a specific letter from a string variable for my simple guessing game for clues
  • Send an email with Outlook without a subject --- dialog box issue
  • passing List<MyModel> from my controller in the "WebInterfaceProject" to the processor method in "D
  • How to convert Word document created from template by OpenXML into MemoryStream?
  • How can I make a single slider that changes the color of an object?
  • Remap JSON parameter in c#
  • What is the difference between "this ref" and "ref this" when talking about C# 7.2 ref extension met
  • Convert OpenSSL encryption into native C#
  • Accessing Properties in Razor Pages
  • How to get SOAP element value
  • Projection after Group
  • C# error cannot convert sytem.text.regularexpressions.match to string
  • Issues with Save/Load System in a Text Based Adventure game made with ScriptableObjects in Unity
  • VS2019 MSBuild.exe - ASP .Net MVC project fails to publish when using PublishProfile, but works when using OutDir parame
  • Does <pages validateRequest="false" /> in Web.config still matter?
  • How to send new request to redirect URL with new access token
  • Attempt to invoke virtual method on a null object reference Xamarin LockScreen
  • "The attribute names could not be inferred from bind attribute 'bind-value'" exception in Blazor
  • How to fix ''System.ArgumentException" in c#?
  • C#. Ref returning delegate for ref extension method
  • shadow
    Privacy Policy - Terms - Contact Us © scrbit.com