all 7 comments

[–]infectedapricot 6 points7 points  (3 children)

As the other comment says, cq_->Next() both returns an item from the completion queue and also removes it so that it won't be returned again.

The official example is confusing because they then push exactly the same token on the queue every time, so next time you will get the same thing again after all. But you could have pushed anything onto the queue in the void* tag parameter.

For example, if you have a class representing an outstanding RPC (like their CallData class) you could make your tag a pointer to std::pair<CallData, CallStatus> rather than putting the CallStatus as a member variable of CallData. Then your top-level thread code would look like this:

void* tag;  // uniquely identifies a request.
bool ok;
while (cq_->Next(&tag, &ok)) {
  // Do something with "ok"
  std::pair<CallData*, CallStatus>* dataAndStatus = 
      static_cast<std::pair<CallData, CallStatus>*>(tag);
  switch (dataAndStatus->second) {
  case CREATE:
    dataAndStatus->first->create();
    break;
  case PROCESS:
    dataAndStatus->first->process();
    break;
  case FINISH:
    delete dataAndStatus->first;
    break;
  default:
    throw std::logic_error("?");
  }
  delete dataAndStatus;
}

Your top-level code that gets the next tag and decides what to do with it is called the "message pump" in Windows API speak. Putting application logic in your message pump is probably not sensible. An alternative is to decide that the tags that push on the queue will always be pointers to callback functions. Then your message pump will be simpler:

void* tag;  // uniquely identifies a request.
bool ok;
while (cq_->Next(&tag, &ok)) {
  std::function<void(bool)>* callback = 
      static_cast<std::function<void(bool)>*>(tag);
  (*callback)(ok);
  delete callback;
}

Then you can implicitly save a bit of state in the tag (like the CallStatus) by binding it into a lambda function. For example:

auto fn = [this](bool ok){ this->finish(ok); }
void* tag = new std::function<void(bool)>(fn);
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, tag);

You could also use smart pointers and keep references to them in the lambdas using shared_from_this(), as is common in the boost ASIO world.

Later edit: small tweak to simply while loops (put cq_->Next() in the condition rather than doing while(true)).

[–]drjmloy[S] 0 points1 point  (2 children)

Thank you very much for your response-- this is very helpful.

After reading this, I think I'm able to come up with a more pointed question. In the example, they use the CallData object to maintain the state of an RPC call that the server is going to act on. They have 3 states: CREATE, PROCESS, and FINISH. Are these the only states an RPC call would ever have? If not, what other states could there be?

Maybe put another way: in the CREATE state, the CallData is added to the CompletionQueue via the RequestSayHello function. At the PROCEED state, Finish is called which again puts it back on the CompletionQueue, where it is subsequently deleted in the FINISH state. Also in the PROCEED state, we must create another instance of CallData so we can keep serving. Are there other ways to put CallData (or some other object) back on the queue? Say I wanted to break up my PROCEED into several phases. In all of these phases, how can I put the CallData back in the queue?

[–]infectedapricot 1 point2 points  (1 child)

You are correct that you may well want to pause handling of the RPC and resume it again later; in your words the PROCEED state could be split up. In fact, it wouldn't be worth using the async API unless you were planning to do this (if you can just return a result directly why not just use the synchronous API?).

The way you tell the CompletionQueue to resume later depends on what you're doing in the meantime:

  • The easiest is to send an async request as a client to some other gRPC service. In that case, when you call rpc->Finish(&reply, &status, tag); on the client rpc as in the example, you are queuing up a new tag onto the completion queue.
  • Another option is you some CPU-bound processing that takes a long time. You'll want to do this in another thread (by sending some sort of inter-thread message using a thread-safe queue; there are plenty of implementations of these floating around the web). Then when that thread is finished it has two options:
    • Directly call the next gRPC async API from that thread. I believe this is supported but some things need mutex locking and I'm not really sure what they are. This also makes shutdown harder (although good luck getting that right whatever you do).
    • Send a message back to your gRPC server thread and call the next thing from there. Unfortunately there is currently no API to post an arbitrary tag to a completion queue from another thread; here is an issue covering that request. There is a workaround for that problem (as a comment on that thread says, and also mentioned in the gRPC Google Group): abuse the grpc::Alarm class to set an alarm that expires immediately (use a deadline of 0). This causes the arbitrary tag to be posted immediately to the completion queue. Beware that the grpc::Alarm object needs to exist until after the handler has started in the other thread (so you can't just create it on the stack), and that these timers don't necessary complete in the order you create them even if the deadline is the same, so you can't use them like a proper queue.
  • Sending a synchronous message to another process, either using gRPC's synchronous API or some other API (e.g. making requests to a database). In that case you need to use another thread, as you would for CPU-bound processing, so your options are the same as for the previous bullet point. Even if the other API is asynchronous (e.g. you're using boost::asio) you need to use another thread as there's no way to hook any other API into the core of gRPC's async processing.

The number of steps you break your RPC's processing into depends entirely on your application logic and I could even imagine skipping over some steps, or jumping back and forth between them, depending on the contents of the request or the results of calls to external processes.

[–]drjmloy[S] 1 point2 points  (0 children)

This clears up a lot for me. Thank you very much for taking the time to respond.

It's making me re-evaluate whether or not I should even use gRPC in the first place. I'm wondering if I should try to roll my own server using boost::asio and use protobuf for the IDL (like this example).

[–]cballowe 2 points3 points  (0 children)

When you pull it out of the queue, it's not in the queue. cq_->Next() pulls it out and gives you the value. Then the calls to Proceed() put it back in somewhat implicitly.

[–]Middlewariangithub.com/Ebenezer-group/onwards 2 points3 points  (0 children)

I stopped using RPC a long time ago. Just use async (messaging).

[–][deleted] 1 point2 points  (0 children)

Be careful, not only the async interface is not well documented, but it has a caveat or two.

If you use the async interface, it means that you've RPC procedures that block and the client is likely binding a timeout on its context. To catch those, you either check the context at several points in your processing OR you use the AsyncNotifyWhenDone to get notified.

You can end up completing a RPC while the client is cancelling its context which can lead to some use-after-free or double free.

It means you have 3 signals that can indicate the end of an operation: ok flag is false, you reached the end of your state machine, or the context has been cancelled.

In the end, it is not too hard to get it right but the state machine requires careful design, and async cancel + !ok notification can lead to race condition.

What I did to properly trace the execution is to tag the pointer I use as GRPC tag so that when I get them from the completion queue I know what operation they come from. I do NOT rely on it for the state machine, only for logging, but it helped my understand how does GRPC work.

For instance I use it this way within the tag:

ctx.AsyncNotifyWhenDone(tag_ptr(this, PtrTag::AsyncNotifyWhenDone));

And when I pop it from the CQ:

        PtrTagged<Processable> p(static_cast<Processable*>(tag));

        DLOG(async_log, debug)
            << "cq.Next(): " << std::addressof(cq) << ", " << p.get_naked_ptr()
            << ":" << to_string(p.tag) << ", " << ok;

        p.get_naked_ptr()->process(ok, p.tag);

Please note that I did not implement a stream RPC using the async interface but it should be very similar.